Files
mic-after-work-server-impl-…/REALTIME_IMPLEMENTATION_EXAMPLES.md
dahoud 93c63fd600 feat: migration complète vers WebSockets Next + Kafka pour temps réel
- Migration de Jakarta WebSocket vers Quarkus WebSockets Next
- Implémentation de l'architecture Kafka pour événements temps réel
- Ajout des DTOs d'événements (NotificationEvent, ChatMessageEvent, ReactionEvent, PresenceEvent)
- Création des bridges Kafka → WebSocket (NotificationKafkaBridge, ChatKafkaBridge, ReactionKafkaBridge)
- Mise à jour des services pour publier dans Kafka au lieu d'appeler directement WebSocket
- Suppression des classes obsolètes (ChatWebSocket, NotificationWebSocket)
- Correction de l'injection des paramètres path dans WebSockets Next (utilisation de connection.pathParam)
- Ajout des migrations DB pour bookings, promotions, business hours, amenities, reviews
- Mise à jour de la configuration application.properties pour Kafka et WebSockets Next
- Mise à jour .gitignore pour ignorer les fichiers de logs
2026-01-21 13:46:16 +00:00

28 KiB

💻 Exemples d'Implémentation - Temps Réel avec Kafka

📦 Étape 1 : Ajouter les Dépendances

pom.xml

<!-- WebSockets Next (remplace quarkus-websockets) -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-websockets-next</artifactId>
</dependency>

<!-- Kafka Reactive Messaging -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>

<!-- Reactive Messaging HTTP (Bridge Kafka ↔ WebSocket) -->
<dependency>
    <groupId>io.quarkiverse.reactivemessaginghttp</groupId>
    <artifactId>quarkus-reactive-messaging-http</artifactId>
    <version>1.0.0</version>
</dependency>

<!-- JSON Serialization pour Kafka -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-jsonb</artifactId>
</dependency>

🔧 Étape 2 : Configuration application.properties

# ============================================
# Kafka Configuration
# ============================================
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}

# Topic: Notifications
mp.messaging.outgoing.notifications.connector=smallrye-kafka
mp.messaging.outgoing.notifications.topic=notifications
mp.messaging.outgoing.notifications.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.notifications.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

# Topic: Chat Messages
mp.messaging.outgoing.chat-messages.connector=smallrye-kafka
mp.messaging.outgoing.chat-messages.topic=chat.messages
mp.messaging.outgoing.chat-messages.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.chat-messages.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

# Topic: Reactions (likes, comments)
mp.messaging.outgoing.reactions.connector=smallrye-kafka
mp.messaging.outgoing.reactions.topic=reactions
mp.messaging.outgoing.reactions.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.reactions.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

# Topic: Presence Updates
mp.messaging.outgoing.presence.connector=smallrye-kafka
mp.messaging.outgoing.presence.topic=presence.updates
mp.messaging.outgoing.presence.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.presence.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer

# ============================================
# Kafka → WebSocket Bridge (Incoming)
# ============================================
# Consommer depuis Kafka et router vers WebSocket
mp.messaging.incoming.kafka-notifications.connector=smallrye-kafka
mp.messaging.incoming.kafka-notifications.topic=notifications
mp.messaging.incoming.kafka-notifications.group.id=websocket-notifications-bridge
mp.messaging.incoming.kafka-notifications.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.kafka-notifications.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.kafka-notifications.enable.auto.commit=true

mp.messaging.incoming.kafka-chat.connector=smallrye-kafka
mp.messaging.incoming.kafka-chat.topic=chat.messages
mp.messaging.incoming.kafka-chat.group.id=websocket-chat-bridge
mp.messaging.incoming.kafka-chat.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.kafka-chat.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.kafka-chat.enable.auto.commit=true

# ============================================
# WebSocket Configuration
# ============================================
quarkus.websockets-next.server.enabled=true

📝 Étape 3 : DTOs pour les Événements Kafka

NotificationEvent.java

package com.lions.dev.dto.events;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.Map;
import java.util.UUID;

/**
 * Événement de notification publié dans Kafka.
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class NotificationEvent {
    private String userId; // Clé Kafka (pour routing)
    private String type; // friend_request, event_reminder, message_alert, etc.
    private Map<String, Object> data;
    private Long timestamp;
    
    public NotificationEvent(String userId, String type, Map<String, Object> data) {
        this.userId = userId;
        this.type = type;
        this.data = data;
        this.timestamp = System.currentTimeMillis();
    }
}

ChatMessageEvent.java

package com.lions.dev.dto.events;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.UUID;

/**
 * Événement de message chat publié dans Kafka.
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessageEvent {
    private String conversationId; // Clé Kafka
    private String senderId;
    private String recipientId;
    private String content;
    private String messageId;
    private Long timestamp;
}

ReactionEvent.java

package com.lions.dev.dto.events;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
 * Événement de réaction (like, comment) publié dans Kafka.
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ReactionEvent {
    private String postId; // Clé Kafka
    private String userId;
    private String reactionType; // like, comment, share
    private Map<String, Object> data;
    private Long timestamp;
}

🔌 Étape 4 : WebSocket avec WebSockets Next

NotificationWebSocketNext.java

package com.lions.dev.websocket;

import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;

import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * WebSocket endpoint pour les notifications en temps réel (WebSockets Next).
 * 
 * Architecture:
 * Services → Kafka → Bridge → WebSocket → Client
 */
@WebSocket(path = "/notifications/{userId}")
@ApplicationScoped
public class NotificationWebSocketNext {

    // Stockage des connexions actives par utilisateur (multi-device support)
    private static final Map<UUID, Set<WebSocketConnection>> userConnections = new ConcurrentHashMap<>();

    @OnOpen
    public void onOpen(WebSocketConnection connection, String userId) {
        try {
            UUID userUUID = UUID.fromString(userId);
            
            // Ajouter la connexion à l'ensemble des connexions de l'utilisateur
            userConnections.computeIfAbsent(userUUID, k -> ConcurrentHashMap.newKeySet())
                          .add(connection);
            
            Log.info("[WS-NEXT] Connexion ouverte pour l'utilisateur: " + userId + 
                    " (Total: " + userConnections.get(userUUID).size() + ")");
            
            // Envoyer confirmation
            connection.sendText("{\"type\":\"connected\",\"timestamp\":" + 
                              System.currentTimeMillis() + "}");
            
        } catch (IllegalArgumentException e) {
            Log.error("[WS-NEXT] UUID invalide: " + userId, e);
            connection.close();
        }
    }

    @OnClose
    public void onClose(String userId) {
        try {
            UUID userUUID = UUID.fromString(userId);
            Set<WebSocketConnection> connections = userConnections.get(userUUID);
            
            if (connections != null) {
                connections.removeIf(conn -> !conn.isOpen());
                
                if (connections.isEmpty()) {
                    userConnections.remove(userUUID);
                    Log.info("[WS-NEXT] Toutes les connexions fermées pour: " + userId);
                } else {
                    Log.info("[WS-NEXT] Connexion fermée pour: " + userId + 
                            " (Restantes: " + connections.size() + ")");
                }
            }
        } catch (Exception e) {
            Log.error("[WS-NEXT] Erreur lors de la fermeture", e);
        }
    }

    @OnTextMessage
    public void onMessage(String message, String userId) {
        try {
            Log.debug("[WS-NEXT] Message reçu de " + userId + ": " + message);
            
            // Parser le message JSON
            com.fasterxml.jackson.databind.ObjectMapper mapper = 
                new com.fasterxml.jackson.databind.ObjectMapper();
            Map<String, Object> messageData = mapper.readValue(message, Map.class);
            
            String type = (String) messageData.get("type");
            
            switch (type) {
                case "ping":
                    handlePing(userId);
                    break;
                case "ack":
                    handleAck(messageData, userId);
                    break;
                default:
                    Log.warn("[WS-NEXT] Type de message inconnu: " + type);
            }
        } catch (Exception e) {
            Log.error("[WS-NEXT] Erreur traitement message", e);
        }
    }

    private void handlePing(String userId) {
        UUID userUUID = UUID.fromString(userId);
        Set<WebSocketConnection> connections = userConnections.get(userUUID);
        
        if (connections != null) {
            String pong = "{\"type\":\"pong\",\"timestamp\":" + 
                         System.currentTimeMillis() + "}";
            connections.forEach(conn -> {
                if (conn.isOpen()) {
                    conn.sendText(pong);
                }
            });
        }
    }

    private void handleAck(Map<String, Object> messageData, String userId) {
        String notificationId = (String) messageData.get("notificationId");
        Log.debug("[WS-NEXT] ACK reçu pour notification " + notificationId + 
                 " de " + userId);
    }

    /**
     * Envoie une notification à un utilisateur spécifique.
     * Appelé par le bridge Kafka → WebSocket.
     */
    public static void sendToUser(UUID userId, String message) {
        Set<WebSocketConnection> connections = userConnections.get(userId);
        
        if (connections == null || connections.isEmpty()) {
            Log.debug("[WS-NEXT] Utilisateur " + userId + " non connecté");
            return;
        }
        
        int success = 0;
        int failed = 0;
        
        for (WebSocketConnection conn : connections) {
            if (conn.isOpen()) {
                try {
                    conn.sendText(message);
                    success++;
                } catch (Exception e) {
                    failed++;
                    Log.error("[WS-NEXT] Erreur envoi à " + userId, e);
                }
            } else {
                failed++;
            }
        }
        
        Log.info("[WS-NEXT] Notification envoyée à " + userId + 
                " (Succès: " + success + ", Échec: " + failed + ")");
    }
}

🌉 Étape 5 : Bridge Kafka → WebSocket

NotificationKafkaBridge.java

package com.lions.dev.websocket;

import com.lions.dev.dto.events.NotificationEvent;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;

import java.util.UUID;

/**
 * Bridge qui consomme depuis Kafka et envoie via WebSocket.
 * 
 * Architecture:
 * Kafka Topic (notifications) → Bridge → WebSocket (NotificationWebSocketNext)
 */
@ApplicationScoped
public class NotificationKafkaBridge {

    /**
     * Consomme les événements depuis Kafka et les route vers WebSocket.
     */
    @Incoming("kafka-notifications")
    public void processNotification(Message<NotificationEvent> message) {
        try {
            NotificationEvent event = message.getPayload();
            
            Log.debug("[KAFKA-BRIDGE] Événement reçu: " + event.getType() + 
                     " pour utilisateur: " + event.getUserId());
            
            UUID userId = UUID.fromString(event.getUserId());
            
            // Construire le message JSON pour WebSocket
            String wsMessage = buildWebSocketMessage(event);
            
            // Envoyer via WebSocket
            NotificationWebSocketNext.sendToUser(userId, wsMessage);
            
            // Acknowledger le message Kafka
            message.ack();
            
        } catch (Exception e) {
            Log.error("[KAFKA-BRIDGE] Erreur traitement événement", e);
            message.nack(e);
        }
    }

    private String buildWebSocketMessage(NotificationEvent event) {
        try {
            com.fasterxml.jackson.databind.ObjectMapper mapper = 
                new com.fasterxml.jackson.databind.ObjectMapper();
            
            java.util.Map<String, Object> wsMessage = java.util.Map.of(
                "type", event.getType(),
                "data", event.getData(),
                "timestamp", event.getTimestamp()
            );
            
            return mapper.writeValueAsString(wsMessage);
        } catch (Exception e) {
            Log.error("[KAFKA-BRIDGE] Erreur construction message", e);
            return "{\"type\":\"error\",\"message\":\"Erreur de traitement\"}";
        }
    }
}

📤 Étape 6 : Services Publient dans Kafka

FriendshipService (Modifié)

package com.lions.dev.service;

import com.lions.dev.dto.events.NotificationEvent;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class FriendshipService {
    
    @Inject
    @Channel("notifications")
    Emitter<NotificationEvent> notificationEmitter;

    // ... autres dépendances ...

    /**
     * Envoie une demande d'amitié (publie dans Kafka).
     */
    @Transactional
    public FriendshipCreateOneResponseDTO sendFriendRequest(
            FriendshipCreateOneRequestDTO request) {
        
        // ... logique métier existante ...
        
        // ✅ NOUVEAU: Publier dans Kafka au lieu d'appeler directement WebSocket
        try {
            NotificationEvent event = new NotificationEvent(
                request.getFriendId().toString(), // userId destinataire
                "friend_request",
                java.util.Map.of(
                    "fromUserId", request.getUserId().toString(),
                    "fromFirstName", user.getFirstName(),
                    "fromLastName", user.getLastName(),
                    "requestId", response.getFriendshipId().toString()
                )
            );
            
            notificationEmitter.send(event);
            logger.info("[LOG] Événement friend_request publié dans Kafka pour: " + 
                       request.getFriendId());
                       
        } catch (Exception e) {
            logger.error("[ERROR] Erreur publication Kafka", e);
            // Ne pas bloquer la demande d'amitié si Kafka échoue
        }
        
        return response;
    }

    /**
     * Accepte une demande d'amitié (publie dans Kafka).
     */
    @Transactional
    public FriendshipCreateOneResponseDTO acceptFriendRequest(UUID friendshipId) {
        // ... logique métier existante ...
        
        // ✅ NOUVEAU: Publier dans Kafka
        try {
            NotificationEvent event = new NotificationEvent(
                originalRequest.getUserId().toString(), // userId émetteur
                "friend_request_accepted",
                java.util.Map.of(
                    "friendId", friend.getId().toString(),
                    "friendFirstName", friend.getFirstName(),
                    "friendLastName", friend.getLastName(),
                    "friendshipId", response.getFriendshipId().toString()
                )
            );
            
            notificationEmitter.send(event);
            logger.info("[LOG] Événement friend_request_accepted publié dans Kafka");
            
        } catch (Exception e) {
            logger.error("[ERROR] Erreur publication Kafka", e);
        }
        
        return response;
    }
}

MessageService (Modifié)

package com.lions.dev.service;

import com.lions.dev.dto.events.ChatMessageEvent;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;

@ApplicationScoped
public class MessageService {
    
    @Inject
    @Channel("chat-messages")
    Emitter<ChatMessageEvent> chatMessageEmitter;

    /**
     * Envoie un message (publie dans Kafka).
     */
    @Transactional
    public MessageResponseDTO sendMessage(SendMessageRequestDTO request) {
        // ... logique métier existante ...
        
        // ✅ NOUVEAU: Publier dans Kafka
        try {
            ChatMessageEvent event = new ChatMessageEvent();
            event.setConversationId(conversation.getId().toString());
            event.setSenderId(senderId.toString());
            event.setRecipientId(recipientId.toString());
            event.setContent(request.getContent());
            event.setMessageId(message.getId().toString());
            event.setTimestamp(System.currentTimeMillis());
            
            // Utiliser conversationId comme clé Kafka pour garantir l'ordre
            chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of(
                event,
                () -> CompletableFuture.completedFuture(null), // ack
                throwable -> {
                    logger.error("[ERROR] Erreur envoi Kafka", throwable);
                    return CompletableFuture.completedFuture(null); // nack
                }
            ).addMetadata(org.eclipse.microprofile.reactive.messaging.OutgoingMessageMetadata.builder()
                .withKey(conversation.getId().toString())
                .build()));
            
            logger.info("[LOG] Message publié dans Kafka: " + message.getId());
            
        } catch (Exception e) {
            logger.error("[ERROR] Erreur publication Kafka", e);
            // Ne pas bloquer l'envoi du message si Kafka échoue
        }
        
        return response;
    }
}

SocialPostService (Modifié pour les Réactions)

package com.lions.dev.service;

import com.lions.dev.dto.events.ReactionEvent;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;

@ApplicationScoped
public class SocialPostService {
    
    @Inject
    @Channel("reactions")
    Emitter<ReactionEvent> reactionEmitter;

    /**
     * Like un post (publie dans Kafka).
     */
    @Transactional
    public SocialPost likePost(UUID postId, UUID userId) {
        // ... logique métier existante ...
        
        // ✅ NOUVEAU: Publier dans Kafka pour notifier en temps réel
        try {
            ReactionEvent event = new ReactionEvent();
            event.setPostId(postId.toString());
            event.setUserId(userId.toString());
            event.setReactionType("like");
            event.setData(java.util.Map.of(
                "postId", postId.toString(),
                "userId", userId.toString(),
                "likesCount", post.getLikesCount()
            ));
            event.setTimestamp(System.currentTimeMillis());
            
            reactionEmitter.send(event);
            logger.info("[LOG] Réaction like publiée dans Kafka pour post: " + postId);
            
        } catch (Exception e) {
            logger.error("[ERROR] Erreur publication Kafka", e);
        }
        
        return post;
    }
}

🎨 Frontend : Amélioration du Service WebSocket

realtime_notification_service_v2.dart

import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;

class RealtimeNotificationServiceV2 extends ChangeNotifier {
  RealtimeNotificationServiceV2(this.userId, this.authToken);

  final String userId;
  final String authToken;
  WebSocketChannel? _channel;
  StreamSubscription? _subscription;
  Timer? _heartbeatTimer;
  Timer? _reconnectTimer;
  
  bool _isConnected = false;
  bool get isConnected => _isConnected;
  
  int _reconnectAttempts = 0;
  static const int _maxReconnectAttempts = 5;
  static const Duration _heartbeatInterval = Duration(seconds: 30);
  static const Duration _reconnectDelay = Duration(seconds: 5);

  // Streams pour différents types d'événements
  final _friendRequestController = StreamController<Map<String, dynamic>>.broadcast();
  final _systemNotificationController = StreamController<Map<String, dynamic>>.broadcast();
  final _reactionController = StreamController<Map<String, dynamic>>.broadcast();

  Stream<Map<String, dynamic>> get friendRequestStream => _friendRequestController.stream;
  Stream<Map<String, dynamic>> get systemNotificationStream => _systemNotificationController.stream;
  Stream<Map<String, dynamic>> get reactionStream => _reactionController.stream;

  String get _wsUrl {
    final baseUrl = 'wss://api.afterwork.lions.dev'; // Production
    return '$baseUrl/notifications/$userId';
  }

  Future<void> connect() async {
    if (_isConnected) return;

    try {
      _channel = WebSocketChannel.connect(
        Uri.parse(_wsUrl),
        protocols: ['notifications-v2'],
        headers: {
          'Authorization': 'Bearer $authToken',
        },
      );

      // Heartbeat pour maintenir la connexion
      _heartbeatTimer = Timer.periodic(_heartbeatInterval, (_) {
        _channel?.sink.add(jsonEncode({'type': 'ping'}));
      });

      // Écouter les messages
      _subscription = _channel!.stream.listen(
        _handleMessage,
        onError: _handleError,
        onDone: _handleDisconnection,
        cancelOnError: false,
      );

      _isConnected = true;
      notifyListeners();
      
    } catch (e) {
      _isConnected = false;
      notifyListeners();
      _scheduleReconnect();
    }
  }

  void _handleMessage(dynamic message) {
    try {
      final data = jsonDecode(message as String) as Map<String, dynamic>;
      final type = data['type'] as String;
      
      switch (type) {
        case 'connected':
          _reconnectAttempts = 0; // Reset sur reconnexion réussie
          break;
        case 'pong':
          // Heartbeat réponse
          break;
        case 'friend_request':
        case 'friend_request_accepted':
          _friendRequestController.add(data);
          break;
        case 'event_reminder':
        case 'system_notification':
          _systemNotificationController.add(data);
          break;
        case 'reaction':
          _reactionController.add(data);
          break;
        default:
          // Type inconnu, ignorer ou logger
          break;
      }
    } catch (e) {
      // Erreur de parsing, ignorer
    }
  }

  void _handleError(dynamic error) {
    _isConnected = false;
    notifyListeners();
    _scheduleReconnect();
  }

  void _handleDisconnection() {
    _isConnected = false;
    notifyListeners();
    _scheduleReconnect();
  }

  void _scheduleReconnect() {
    if (_reconnectAttempts >= _maxReconnectAttempts) {
      // Arrêter les tentatives après max
      return;
    }

    _reconnectTimer?.cancel();
    _reconnectTimer = Timer(_reconnectDelay * (_reconnectAttempts + 1), () {
      _reconnectAttempts++;
      connect();
    });
  }

  Future<void> disconnect() async {
    _heartbeatTimer?.cancel();
    _reconnectTimer?.cancel();
    await _subscription?.cancel();
    await _channel?.sink.close(status.normalClosure);
    _isConnected = false;
    notifyListeners();
  }

  @override
  void dispose() {
    disconnect();
    _friendRequestController.close();
    _systemNotificationController.close();
    _reactionController.close();
    super.dispose();
  }
}

🧪 Tests

Test du Bridge Kafka → WebSocket

package com.lions.dev.websocket;

import io.quarkus.test.junit.QuarkusTest;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.junit.jupiter.api.Test;

@QuarkusTest
public class NotificationKafkaBridgeTest {
    
    @Inject
    @Channel("notifications")
    Emitter<NotificationEvent> notificationEmitter;

    @Test
    public void testNotificationFlow() {
        // Publier un événement dans Kafka
        NotificationEvent event = new NotificationEvent(
            "user-123",
            "friend_request",
            Map.of("fromUserId", "user-456")
        );
        
        notificationEmitter.send(event);
        
        // Vérifier que le message arrive bien via WebSocket
        // (nécessite un client WebSocket de test)
    }
}

📊 Monitoring

Métriques Kafka à Surveiller

  1. Lag Consumer : Délai entre production et consommation
  2. Throughput : Messages/seconde
  3. Error Rate : Taux d'erreur
  4. Connection Count : Nombre de connexions WebSocket actives

Endpoint de Santé

@Path("/health/realtime")
public class RealtimeHealthResource {
    
    @GET
    public Response health() {
        return Response.ok(Map.of(
            "websocket_connections", NotificationWebSocketNext.getConnectionCount(),
            "kafka_consumers", getKafkaConsumerCount(),
            "status", "healthy"
        )).build();
    }
}

🚀 Déploiement

Docker Compose (Kafka Local)

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

Production (Kubernetes)

apiVersion: apps/v1
kind: Deployment
metadata:
  name: afterwork-backend
spec:
  replicas: 3  # ✅ Scalabilité horizontale
  template:
    spec:
      containers:
      - name: quarkus
        env:
        - name: KAFKA_BOOTSTRAP_SERVERS
          value: "kafka-service:9092"

Checklist d'Implémentation

Phase 1 : Setup

  • Ajouter dépendances dans pom.xml
  • Configurer application.properties
  • Tester Kafka avec Quarkus Dev Services
  • Créer les DTOs d'événements

Phase 2 : Migration WebSocket

  • Créer NotificationWebSocketNext
  • Créer ChatWebSocketNext
  • Tester avec le frontend existant
  • Comparer performances (avant/après)

Phase 3 : Intégration Kafka

  • Créer NotificationKafkaBridge
  • Créer ChatKafkaBridge
  • Modifier FriendshipService pour publier dans Kafka
  • Modifier MessageService pour publier dans Kafka
  • Modifier SocialPostService pour les réactions

Phase 4 : Frontend

  • Améliorer RealtimeNotificationService avec heartbeat
  • Améliorer ChatWebSocketService avec reconnect
  • Tester la reconnexion automatique
  • Tester multi-device

Phase 5 : Tests & Monitoring

  • Tests unitaires des bridges
  • Tests d'intégration end-to-end
  • Configurer monitoring Kafka
  • Configurer alertes

📚 Ressources Complémentaires