# 💻 Exemples d'Implémentation - Temps Réel avec Kafka ## 📦 Étape 1 : Ajouter les Dépendances ### pom.xml ```xml io.quarkus quarkus-websockets-next io.quarkus quarkus-messaging-kafka io.quarkiverse.reactivemessaginghttp quarkus-reactive-messaging-http 1.0.0 io.quarkus quarkus-jsonb ``` --- ## 🔧 Étape 2 : Configuration application.properties ```properties # ============================================ # Kafka Configuration # ============================================ kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092} # Topic: Notifications mp.messaging.outgoing.notifications.connector=smallrye-kafka mp.messaging.outgoing.notifications.topic=notifications mp.messaging.outgoing.notifications.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.notifications.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer # Topic: Chat Messages mp.messaging.outgoing.chat-messages.connector=smallrye-kafka mp.messaging.outgoing.chat-messages.topic=chat.messages mp.messaging.outgoing.chat-messages.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.chat-messages.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer # Topic: Reactions (likes, comments) mp.messaging.outgoing.reactions.connector=smallrye-kafka mp.messaging.outgoing.reactions.topic=reactions mp.messaging.outgoing.reactions.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.reactions.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer # Topic: Presence Updates mp.messaging.outgoing.presence.connector=smallrye-kafka mp.messaging.outgoing.presence.topic=presence.updates mp.messaging.outgoing.presence.key.serializer=org.apache.kafka.common.serialization.StringSerializer mp.messaging.outgoing.presence.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer # ============================================ # Kafka → WebSocket Bridge (Incoming) # ============================================ # Consommer depuis Kafka et router vers WebSocket mp.messaging.incoming.kafka-notifications.connector=smallrye-kafka mp.messaging.incoming.kafka-notifications.topic=notifications mp.messaging.incoming.kafka-notifications.group.id=websocket-notifications-bridge mp.messaging.incoming.kafka-notifications.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.incoming.kafka-notifications.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer mp.messaging.incoming.kafka-notifications.enable.auto.commit=true mp.messaging.incoming.kafka-chat.connector=smallrye-kafka mp.messaging.incoming.kafka-chat.topic=chat.messages mp.messaging.incoming.kafka-chat.group.id=websocket-chat-bridge mp.messaging.incoming.kafka-chat.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer mp.messaging.incoming.kafka-chat.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer mp.messaging.incoming.kafka-chat.enable.auto.commit=true # ============================================ # WebSocket Configuration # ============================================ quarkus.websockets-next.server.enabled=true ``` --- ## 📝 Étape 3 : DTOs pour les Événements Kafka ### NotificationEvent.java ```java package com.lions.dev.dto.events; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import java.util.Map; import java.util.UUID; /** * Événement de notification publié dans Kafka. */ @Getter @Setter @NoArgsConstructor @AllArgsConstructor public class NotificationEvent { private String userId; // Clé Kafka (pour routing) private String type; // friend_request, event_reminder, message_alert, etc. private Map data; private Long timestamp; public NotificationEvent(String userId, String type, Map data) { this.userId = userId; this.type = type; this.data = data; this.timestamp = System.currentTimeMillis(); } } ``` ### ChatMessageEvent.java ```java package com.lions.dev.dto.events; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import java.util.UUID; /** * Événement de message chat publié dans Kafka. */ @Getter @Setter @NoArgsConstructor @AllArgsConstructor public class ChatMessageEvent { private String conversationId; // Clé Kafka private String senderId; private String recipientId; private String content; private String messageId; private Long timestamp; } ``` ### ReactionEvent.java ```java package com.lions.dev.dto.events; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; /** * Événement de réaction (like, comment) publié dans Kafka. */ @Getter @Setter @NoArgsConstructor @AllArgsConstructor public class ReactionEvent { private String postId; // Clé Kafka private String userId; private String reactionType; // like, comment, share private Map data; private Long timestamp; } ``` --- ## 🔌 Étape 4 : WebSocket avec WebSockets Next ### NotificationWebSocketNext.java ```java package com.lions.dev.websocket; import io.quarkus.logging.Log; import io.quarkus.websockets.next.OnClose; import io.quarkus.websockets.next.OnOpen; import io.quarkus.websockets.next.OnTextMessage; import io.quarkus.websockets.next.WebSocket; import io.quarkus.websockets.next.WebSocketConnection; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket endpoint pour les notifications en temps réel (WebSockets Next). * * Architecture: * Services → Kafka → Bridge → WebSocket → Client */ @WebSocket(path = "/notifications/{userId}") @ApplicationScoped public class NotificationWebSocketNext { // Stockage des connexions actives par utilisateur (multi-device support) private static final Map> userConnections = new ConcurrentHashMap<>(); @OnOpen public void onOpen(WebSocketConnection connection, String userId) { try { UUID userUUID = UUID.fromString(userId); // Ajouter la connexion à l'ensemble des connexions de l'utilisateur userConnections.computeIfAbsent(userUUID, k -> ConcurrentHashMap.newKeySet()) .add(connection); Log.info("[WS-NEXT] Connexion ouverte pour l'utilisateur: " + userId + " (Total: " + userConnections.get(userUUID).size() + ")"); // Envoyer confirmation connection.sendText("{\"type\":\"connected\",\"timestamp\":" + System.currentTimeMillis() + "}"); } catch (IllegalArgumentException e) { Log.error("[WS-NEXT] UUID invalide: " + userId, e); connection.close(); } } @OnClose public void onClose(String userId) { try { UUID userUUID = UUID.fromString(userId); Set connections = userConnections.get(userUUID); if (connections != null) { connections.removeIf(conn -> !conn.isOpen()); if (connections.isEmpty()) { userConnections.remove(userUUID); Log.info("[WS-NEXT] Toutes les connexions fermées pour: " + userId); } else { Log.info("[WS-NEXT] Connexion fermée pour: " + userId + " (Restantes: " + connections.size() + ")"); } } } catch (Exception e) { Log.error("[WS-NEXT] Erreur lors de la fermeture", e); } } @OnTextMessage public void onMessage(String message, String userId) { try { Log.debug("[WS-NEXT] Message reçu de " + userId + ": " + message); // Parser le message JSON com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map messageData = mapper.readValue(message, Map.class); String type = (String) messageData.get("type"); switch (type) { case "ping": handlePing(userId); break; case "ack": handleAck(messageData, userId); break; default: Log.warn("[WS-NEXT] Type de message inconnu: " + type); } } catch (Exception e) { Log.error("[WS-NEXT] Erreur traitement message", e); } } private void handlePing(String userId) { UUID userUUID = UUID.fromString(userId); Set connections = userConnections.get(userUUID); if (connections != null) { String pong = "{\"type\":\"pong\",\"timestamp\":" + System.currentTimeMillis() + "}"; connections.forEach(conn -> { if (conn.isOpen()) { conn.sendText(pong); } }); } } private void handleAck(Map messageData, String userId) { String notificationId = (String) messageData.get("notificationId"); Log.debug("[WS-NEXT] ACK reçu pour notification " + notificationId + " de " + userId); } /** * Envoie une notification à un utilisateur spécifique. * Appelé par le bridge Kafka → WebSocket. */ public static void sendToUser(UUID userId, String message) { Set connections = userConnections.get(userId); if (connections == null || connections.isEmpty()) { Log.debug("[WS-NEXT] Utilisateur " + userId + " non connecté"); return; } int success = 0; int failed = 0; for (WebSocketConnection conn : connections) { if (conn.isOpen()) { try { conn.sendText(message); success++; } catch (Exception e) { failed++; Log.error("[WS-NEXT] Erreur envoi à " + userId, e); } } else { failed++; } } Log.info("[WS-NEXT] Notification envoyée à " + userId + " (Succès: " + success + ", Échec: " + failed + ")"); } } ``` --- ## 🌉 Étape 5 : Bridge Kafka → WebSocket ### NotificationKafkaBridge.java ```java package com.lions.dev.websocket; import com.lions.dev.dto.events.NotificationEvent; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.eclipse.microprofile.reactive.messaging.Message; import java.util.UUID; /** * Bridge qui consomme depuis Kafka et envoie via WebSocket. * * Architecture: * Kafka Topic (notifications) → Bridge → WebSocket (NotificationWebSocketNext) */ @ApplicationScoped public class NotificationKafkaBridge { /** * Consomme les événements depuis Kafka et les route vers WebSocket. */ @Incoming("kafka-notifications") public void processNotification(Message message) { try { NotificationEvent event = message.getPayload(); Log.debug("[KAFKA-BRIDGE] Événement reçu: " + event.getType() + " pour utilisateur: " + event.getUserId()); UUID userId = UUID.fromString(event.getUserId()); // Construire le message JSON pour WebSocket String wsMessage = buildWebSocketMessage(event); // Envoyer via WebSocket NotificationWebSocketNext.sendToUser(userId, wsMessage); // Acknowledger le message Kafka message.ack(); } catch (Exception e) { Log.error("[KAFKA-BRIDGE] Erreur traitement événement", e); message.nack(e); } } private String buildWebSocketMessage(NotificationEvent event) { try { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); java.util.Map wsMessage = java.util.Map.of( "type", event.getType(), "data", event.getData(), "timestamp", event.getTimestamp() ); return mapper.writeValueAsString(wsMessage); } catch (Exception e) { Log.error("[KAFKA-BRIDGE] Erreur construction message", e); return "{\"type\":\"error\",\"message\":\"Erreur de traitement\"}"; } } } ``` --- ## 📤 Étape 6 : Services Publient dans Kafka ### FriendshipService (Modifié) ```java package com.lions.dev.service; import com.lions.dev.dto.events.NotificationEvent; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import jakarta.inject.Inject; import jakarta.enterprise.context.ApplicationScoped; @ApplicationScoped public class FriendshipService { @Inject @Channel("notifications") Emitter notificationEmitter; // ... autres dépendances ... /** * Envoie une demande d'amitié (publie dans Kafka). */ @Transactional public FriendshipCreateOneResponseDTO sendFriendRequest( FriendshipCreateOneRequestDTO request) { // ... logique métier existante ... // ✅ NOUVEAU: Publier dans Kafka au lieu d'appeler directement WebSocket try { NotificationEvent event = new NotificationEvent( request.getFriendId().toString(), // userId destinataire "friend_request", java.util.Map.of( "fromUserId", request.getUserId().toString(), "fromFirstName", user.getFirstName(), "fromLastName", user.getLastName(), "requestId", response.getFriendshipId().toString() ) ); notificationEmitter.send(event); logger.info("[LOG] Événement friend_request publié dans Kafka pour: " + request.getFriendId()); } catch (Exception e) { logger.error("[ERROR] Erreur publication Kafka", e); // Ne pas bloquer la demande d'amitié si Kafka échoue } return response; } /** * Accepte une demande d'amitié (publie dans Kafka). */ @Transactional public FriendshipCreateOneResponseDTO acceptFriendRequest(UUID friendshipId) { // ... logique métier existante ... // ✅ NOUVEAU: Publier dans Kafka try { NotificationEvent event = new NotificationEvent( originalRequest.getUserId().toString(), // userId émetteur "friend_request_accepted", java.util.Map.of( "friendId", friend.getId().toString(), "friendFirstName", friend.getFirstName(), "friendLastName", friend.getLastName(), "friendshipId", response.getFriendshipId().toString() ) ); notificationEmitter.send(event); logger.info("[LOG] Événement friend_request_accepted publié dans Kafka"); } catch (Exception e) { logger.error("[ERROR] Erreur publication Kafka", e); } return response; } } ``` ### MessageService (Modifié) ```java package com.lions.dev.service; import com.lions.dev.dto.events.ChatMessageEvent; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import jakarta.inject.Inject; @ApplicationScoped public class MessageService { @Inject @Channel("chat-messages") Emitter chatMessageEmitter; /** * Envoie un message (publie dans Kafka). */ @Transactional public MessageResponseDTO sendMessage(SendMessageRequestDTO request) { // ... logique métier existante ... // ✅ NOUVEAU: Publier dans Kafka try { ChatMessageEvent event = new ChatMessageEvent(); event.setConversationId(conversation.getId().toString()); event.setSenderId(senderId.toString()); event.setRecipientId(recipientId.toString()); event.setContent(request.getContent()); event.setMessageId(message.getId().toString()); event.setTimestamp(System.currentTimeMillis()); // Utiliser conversationId comme clé Kafka pour garantir l'ordre chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of( event, () -> CompletableFuture.completedFuture(null), // ack throwable -> { logger.error("[ERROR] Erreur envoi Kafka", throwable); return CompletableFuture.completedFuture(null); // nack } ).addMetadata(org.eclipse.microprofile.reactive.messaging.OutgoingMessageMetadata.builder() .withKey(conversation.getId().toString()) .build())); logger.info("[LOG] Message publié dans Kafka: " + message.getId()); } catch (Exception e) { logger.error("[ERROR] Erreur publication Kafka", e); // Ne pas bloquer l'envoi du message si Kafka échoue } return response; } } ``` ### SocialPostService (Modifié pour les Réactions) ```java package com.lions.dev.service; import com.lions.dev.dto.events.ReactionEvent; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import jakarta.inject.Inject; @ApplicationScoped public class SocialPostService { @Inject @Channel("reactions") Emitter reactionEmitter; /** * Like un post (publie dans Kafka). */ @Transactional public SocialPost likePost(UUID postId, UUID userId) { // ... logique métier existante ... // ✅ NOUVEAU: Publier dans Kafka pour notifier en temps réel try { ReactionEvent event = new ReactionEvent(); event.setPostId(postId.toString()); event.setUserId(userId.toString()); event.setReactionType("like"); event.setData(java.util.Map.of( "postId", postId.toString(), "userId", userId.toString(), "likesCount", post.getLikesCount() )); event.setTimestamp(System.currentTimeMillis()); reactionEmitter.send(event); logger.info("[LOG] Réaction like publiée dans Kafka pour post: " + postId); } catch (Exception e) { logger.error("[ERROR] Erreur publication Kafka", e); } return post; } } ``` --- ## 🎨 Frontend : Amélioration du Service WebSocket ### realtime_notification_service_v2.dart ```dart import 'dart:async'; import 'dart:convert'; import 'package:flutter/foundation.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/status.dart' as status; class RealtimeNotificationServiceV2 extends ChangeNotifier { RealtimeNotificationServiceV2(this.userId, this.authToken); final String userId; final String authToken; WebSocketChannel? _channel; StreamSubscription? _subscription; Timer? _heartbeatTimer; Timer? _reconnectTimer; bool _isConnected = false; bool get isConnected => _isConnected; int _reconnectAttempts = 0; static const int _maxReconnectAttempts = 5; static const Duration _heartbeatInterval = Duration(seconds: 30); static const Duration _reconnectDelay = Duration(seconds: 5); // Streams pour différents types d'événements final _friendRequestController = StreamController>.broadcast(); final _systemNotificationController = StreamController>.broadcast(); final _reactionController = StreamController>.broadcast(); Stream> get friendRequestStream => _friendRequestController.stream; Stream> get systemNotificationStream => _systemNotificationController.stream; Stream> get reactionStream => _reactionController.stream; String get _wsUrl { final baseUrl = 'wss://api.afterwork.lions.dev'; // Production return '$baseUrl/notifications/$userId'; } Future connect() async { if (_isConnected) return; try { _channel = WebSocketChannel.connect( Uri.parse(_wsUrl), protocols: ['notifications-v2'], headers: { 'Authorization': 'Bearer $authToken', }, ); // Heartbeat pour maintenir la connexion _heartbeatTimer = Timer.periodic(_heartbeatInterval, (_) { _channel?.sink.add(jsonEncode({'type': 'ping'})); }); // Écouter les messages _subscription = _channel!.stream.listen( _handleMessage, onError: _handleError, onDone: _handleDisconnection, cancelOnError: false, ); _isConnected = true; notifyListeners(); } catch (e) { _isConnected = false; notifyListeners(); _scheduleReconnect(); } } void _handleMessage(dynamic message) { try { final data = jsonDecode(message as String) as Map; final type = data['type'] as String; switch (type) { case 'connected': _reconnectAttempts = 0; // Reset sur reconnexion réussie break; case 'pong': // Heartbeat réponse break; case 'friend_request': case 'friend_request_accepted': _friendRequestController.add(data); break; case 'event_reminder': case 'system_notification': _systemNotificationController.add(data); break; case 'reaction': _reactionController.add(data); break; default: // Type inconnu, ignorer ou logger break; } } catch (e) { // Erreur de parsing, ignorer } } void _handleError(dynamic error) { _isConnected = false; notifyListeners(); _scheduleReconnect(); } void _handleDisconnection() { _isConnected = false; notifyListeners(); _scheduleReconnect(); } void _scheduleReconnect() { if (_reconnectAttempts >= _maxReconnectAttempts) { // Arrêter les tentatives après max return; } _reconnectTimer?.cancel(); _reconnectTimer = Timer(_reconnectDelay * (_reconnectAttempts + 1), () { _reconnectAttempts++; connect(); }); } Future disconnect() async { _heartbeatTimer?.cancel(); _reconnectTimer?.cancel(); await _subscription?.cancel(); await _channel?.sink.close(status.normalClosure); _isConnected = false; notifyListeners(); } @override void dispose() { disconnect(); _friendRequestController.close(); _systemNotificationController.close(); _reactionController.close(); super.dispose(); } } ``` --- ## 🧪 Tests ### Test du Bridge Kafka → WebSocket ```java package com.lions.dev.websocket; import io.quarkus.test.junit.QuarkusTest; import jakarta.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.junit.jupiter.api.Test; @QuarkusTest public class NotificationKafkaBridgeTest { @Inject @Channel("notifications") Emitter notificationEmitter; @Test public void testNotificationFlow() { // Publier un événement dans Kafka NotificationEvent event = new NotificationEvent( "user-123", "friend_request", Map.of("fromUserId", "user-456") ); notificationEmitter.send(event); // Vérifier que le message arrive bien via WebSocket // (nécessite un client WebSocket de test) } } ``` --- ## 📊 Monitoring ### Métriques Kafka à Surveiller 1. **Lag Consumer** : Délai entre production et consommation 2. **Throughput** : Messages/seconde 3. **Error Rate** : Taux d'erreur 4. **Connection Count** : Nombre de connexions WebSocket actives ### Endpoint de Santé ```java @Path("/health/realtime") public class RealtimeHealthResource { @GET public Response health() { return Response.ok(Map.of( "websocket_connections", NotificationWebSocketNext.getConnectionCount(), "kafka_consumers", getKafkaConsumerCount(), "status", "healthy" )).build(); } } ``` --- ## 🚀 Déploiement ### Docker Compose (Kafka Local) ```yaml version: '3.8' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 ``` ### Production (Kubernetes) ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: afterwork-backend spec: replicas: 3 # ✅ Scalabilité horizontale template: spec: containers: - name: quarkus env: - name: KAFKA_BOOTSTRAP_SERVERS value: "kafka-service:9092" ``` --- ## ✅ Checklist d'Implémentation ### Phase 1 : Setup - [ ] Ajouter dépendances dans `pom.xml` - [ ] Configurer `application.properties` - [ ] Tester Kafka avec Quarkus Dev Services - [ ] Créer les DTOs d'événements ### Phase 2 : Migration WebSocket - [ ] Créer `NotificationWebSocketNext` - [ ] Créer `ChatWebSocketNext` - [ ] Tester avec le frontend existant - [ ] Comparer performances (avant/après) ### Phase 3 : Intégration Kafka - [ ] Créer `NotificationKafkaBridge` - [ ] Créer `ChatKafkaBridge` - [ ] Modifier `FriendshipService` pour publier dans Kafka - [ ] Modifier `MessageService` pour publier dans Kafka - [ ] Modifier `SocialPostService` pour les réactions ### Phase 4 : Frontend - [ ] Améliorer `RealtimeNotificationService` avec heartbeat - [ ] Améliorer `ChatWebSocketService` avec reconnect - [ ] Tester la reconnexion automatique - [ ] Tester multi-device ### Phase 5 : Tests & Monitoring - [ ] Tests unitaires des bridges - [ ] Tests d'intégration end-to-end - [ ] Configurer monitoring Kafka - [ ] Configurer alertes --- ## 📚 Ressources Complémentaires - [Quarkus WebSockets Next Tutorial](https://quarkus.io/guides/websockets-next-tutorial) - [Quarkus Kafka Guide](https://quarkus.io/guides/kafka) - [Reactive Messaging HTTP Extension](https://docs.quarkiverse.io/quarkus-reactive-messaging-http/dev/reactive-messaging-websocket.html) - [Kafka Best Practices](https://kafka.apache.org/documentation/#bestPractices)