package com.lions.dev.websocket; import io.quarkus.logging.Log; import io.quarkus.websockets.next.OnClose; import io.quarkus.websockets.next.OnOpen; import io.quarkus.websockets.next.OnTextMessage; import io.quarkus.websockets.next.WebSocket; import io.quarkus.websockets.next.WebSocketConnection; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import com.lions.dev.service.PresenceService; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket endpoint pour les notifications en temps réel (WebSockets Next). * * Architecture v2.0: * Services métier → Kafka Topic → Kafka Bridge → WebSocket → Client * * Avantages: * - Scalabilité horizontale (plusieurs instances Quarkus) * - Durabilité (événements persistés dans Kafka) * - Découplage (services indépendants des WebSockets) * * URL: ws://localhost:8080/notifications/{userId} */ @WebSocket(path = "/notifications/{userId}") @ApplicationScoped public class NotificationWebSocketNext { @Inject PresenceService presenceService; // Stockage des connexions actives par utilisateur (multi-device support) private static final Map> userConnections = new ConcurrentHashMap<>(); @OnOpen public void onOpen(WebSocketConnection connection) { String userId = connection.pathParam("userId"); try { UUID userUUID = UUID.fromString(userId); // Ajouter la connexion à l'ensemble des connexions de l'utilisateur userConnections.computeIfAbsent(userUUID, k -> ConcurrentHashMap.newKeySet()) .add(connection); Log.info("[WS-NEXT] Connexion ouverte pour l'utilisateur: " + userId + " (Total: " + userConnections.get(userUUID).size() + ")"); // Envoyer confirmation String confirmation = buildJsonMessage("connected", Map.of("message", "Connecté au service de notifications en temps réel")); connection.sendText(confirmation); // Marquer l'utilisateur comme en ligne presenceService.setUserOnline(userUUID); } catch (IllegalArgumentException e) { Log.error("[WS-NEXT] UUID invalide: " + userId, e); connection.close(); } catch (Exception e) { Log.error("[WS-NEXT] Erreur lors de la connexion", e); connection.close(); } } @OnClose public void onClose(WebSocketConnection connection) { try { String userId = connection.pathParam("userId"); UUID userUUID = UUID.fromString(userId); Set connections = userConnections.get(userUUID); if (connections != null) { connections.removeIf(conn -> !conn.isOpen()); if (connections.isEmpty()) { userConnections.remove(userUUID); presenceService.setUserOffline(userUUID); Log.info("[WS-NEXT] Toutes les connexions fermées pour: " + userId); } else { Log.info("[WS-NEXT] Connexion fermée pour: " + userId + " (Restantes: " + connections.size() + ")"); } } } catch (Exception e) { Log.error("[WS-NEXT] Erreur lors de la fermeture", e); } } @OnTextMessage public void onMessage(String message, WebSocketConnection connection) { try { String userId = connection.pathParam("userId"); Log.debug("[WS-NEXT] Message reçu de " + userId + ": " + message); // Parser le message JSON com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map messageData = mapper.readValue(message, Map.class); String type = (String) messageData.get("type"); switch (type) { case "ping": handlePing(userId); break; case "ack": handleAck(messageData, userId); break; default: Log.warn("[WS-NEXT] Type de message inconnu: " + type); } } catch (Exception e) { Log.error("[WS-NEXT] Erreur traitement message", e); } } private void handlePing(String userId) { try { UUID userUUID = UUID.fromString(userId); // Mettre à jour le heartbeat de présence presenceService.heartbeat(userUUID); // Envoyer pong String pong = buildJsonMessage("pong", Map.of("timestamp", System.currentTimeMillis())); sendToUser(userUUID, pong); Log.debug("[WS-NEXT] Pong envoyé à: " + userId); } catch (Exception e) { Log.error("[WS-NEXT] Erreur lors de l'envoi du pong", e); } } private void handleAck(Map messageData, String userId) { try { String notificationId = (String) messageData.get("notificationId"); Log.debug("[WS-NEXT] ACK reçu pour notification " + notificationId + " de " + userId); // Optionnel: marquer la notification comme délivrée en base } catch (Exception e) { Log.error("[WS-NEXT] Erreur lors du traitement de l'ACK", e); } } /** * Envoie une notification à un utilisateur spécifique. * Appelé par le bridge Kafka → WebSocket. * * @param userId ID de l'utilisateur destinataire * @param message Message JSON à envoyer */ public static void sendToUser(UUID userId, String message) { Set connections = userConnections.get(userId); if (connections == null || connections.isEmpty()) { Log.debug("[WS-NEXT] Utilisateur " + userId + " non connecté"); return; } int success = 0; int failed = 0; for (WebSocketConnection conn : connections) { if (conn.isOpen()) { try { conn.sendText(message); success++; } catch (Exception e) { failed++; Log.error("[WS-NEXT] Erreur envoi à " + userId, e); } } else { failed++; } } Log.info("[WS-NEXT] Notification envoyée à " + userId + " (Succès: " + success + ", Échec: " + failed + ")"); } /** * Diffuse une notification à tous les utilisateurs connectés. * * @param notificationType Type de notification * @param data Données de la notification */ public static void broadcastNotification(String notificationType, Map data) { String json = buildJsonMessage(notificationType, data); int totalSessions = 0; int successCount = 0; for (Set sessions : userConnections.values()) { for (WebSocketConnection session : sessions) { totalSessions++; if (session.isOpen()) { try { session.sendText(json); successCount++; } catch (Exception e) { Log.error("[WS-NEXT] Erreur lors de la diffusion", e); } } } } Log.info("[WS-NEXT] Notification diffusée à " + successCount + " sessions sur " + totalSessions); } /** * Broadcast une mise à jour de présence à tous les utilisateurs connectés. * * @param presenceData Données de présence (userId, isOnline, lastSeen) */ public static void broadcastPresenceUpdate(Map presenceData) { try { String json = buildJsonMessage("presence", presenceData); for (Set sessions : userConnections.values()) { for (WebSocketConnection session : sessions) { if (session.isOpen()) { try { session.sendText(json); } catch (Exception e) { Log.error("[WS-NEXT] Erreur broadcast présence", e); } } } } Log.debug("[WS-NEXT] Présence broadcastée: " + presenceData.get("userId")); } catch (Exception e) { Log.error("[WS-NEXT] Erreur lors du broadcast de présence", e); } } /** * Construit un message JSON pour les notifications. * * Format: {"type": "notification_type", "data": {...}, "timestamp": ...} */ private static String buildJsonMessage(String type, Map data) { try { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map message = Map.of( "type", type, "data", data, "timestamp", System.currentTimeMillis() ); return mapper.writeValueAsString(message); } catch (Exception e) { Log.error("[WS-NEXT] Erreur construction JSON", e); return "{\"type\":\"error\",\"data\":{\"message\":\"Erreur de construction du message\"}}"; } } /** * Récupère le nombre total d'utilisateurs connectés. */ public static int getConnectedUsersCount() { return userConnections.size(); } /** * Récupère le nombre total de sessions actives. */ public static int getTotalSessionsCount() { return userConnections.values().stream() .mapToInt(Set::size) .sum(); } }