package com.lions.dev.websocket; import com.lions.dev.service.NotificationService; import com.lions.dev.service.FriendshipService; import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.websocket.*; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket endpoint pour les notifications en temps réel. * * Ce endpoint gère: * - Les notifications de demandes d'amitié (envoi, acceptation, rejet) * - Les notifications système (événements, rappels) * - Les alertes de messages * * URL: ws://localhost:8080/notifications/ws/{userId} */ @ServerEndpoint("/notifications/ws/{userId}") @ApplicationScoped public class NotificationWebSocket { @Inject NotificationService notificationService; @Inject FriendshipService friendshipService; @Inject com.lions.dev.service.PresenceService presenceService; // Map pour stocker les sessions WebSocket par utilisateur // Support de plusieurs sessions par utilisateur (multi-device) private static final Map> userSessions = new ConcurrentHashMap<>(); /** * Appelé lorsqu'un utilisateur se connecte. * * @param session La session WebSocket * @param userId L'ID de l'utilisateur */ @OnOpen public void onOpen(Session session, @PathParam("userId") String userId) { try { UUID userUUID = UUID.fromString(userId); // Ajouter la session à l'ensemble des sessions de l'utilisateur userSessions.computeIfAbsent(userUUID, k -> ConcurrentHashMap.newKeySet()).add(session); Log.info("[NOTIFICATION-WS] Connexion ouverte pour l'utilisateur ID : " + userId + " (Total sessions: " + userSessions.get(userUUID).size() + ")"); // Envoyer un message de confirmation String confirmationMessage = buildNotificationJson("connected", Map.of("message", "Connecté au service de notifications en temps réel")); session.getAsyncRemote().sendText(confirmationMessage); // Marquer l'utilisateur comme en ligne presenceService.setUserOnline(userUUID); } catch (IllegalArgumentException e) { Log.error("[NOTIFICATION-WS] UUID invalide : " + userId, e); try { session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, "UUID invalide")); } catch (IOException ioException) { Log.error("[NOTIFICATION-WS] Erreur lors de la fermeture de session", ioException); } } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors de la connexion : " + e.getMessage(), e); } } /** * Appelé lorsqu'un message est reçu. * * Gère les messages de type ping, ack, etc. * * @param message Le message reçu (au format JSON) * @param userId L'ID de l'utilisateur qui envoie le message */ @OnMessage public void onMessage(String message, @PathParam("userId") String userId) { try { Log.info("[NOTIFICATION-WS] Message reçu de l'utilisateur " + userId + " : " + message); // Parser le message JSON com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map messageData = mapper.readValue(message, Map.class); String type = (String) messageData.get("type"); switch (type) { case "ping": handlePing(userId); break; case "ack": handleAcknowledgement(messageData, userId); break; default: Log.warn("[NOTIFICATION-WS] Type de message inconnu : " + type); } } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors du traitement du message : " + e.getMessage(), e); } } /** * Appelé lorsqu'une erreur se produit. * * @param session La session WebSocket * @param error L'erreur */ @OnError public void onError(Session session, Throwable error) { Log.error("[NOTIFICATION-WS] Erreur WebSocket : " + error.getMessage(), error); } /** * Appelé lorsqu'un utilisateur se déconnecte. * * @param session La session WebSocket * @param userId L'ID de l'utilisateur */ @OnClose public void onClose(Session session, @PathParam("userId") String userId) { try { UUID userUUID = UUID.fromString(userId); // Supprimer la session de l'ensemble Set sessions = userSessions.get(userUUID); if (sessions != null) { sessions.remove(session); // Si l'utilisateur n'a plus de sessions, supprimer l'entrée et marquer hors ligne if (sessions.isEmpty()) { userSessions.remove(userUUID); presenceService.setUserOffline(userUUID); Log.info("[NOTIFICATION-WS] Toutes les sessions fermées pour l'utilisateur ID : " + userId); } else { Log.info("[NOTIFICATION-WS] Session fermée pour l'utilisateur ID : " + userId + " (Sessions restantes: " + sessions.size() + ")"); } } } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors de la fermeture : " + e.getMessage(), e); } } /** * Gère les messages de type ping (keep-alive). */ private void handlePing(String userId) { try { UUID userUUID = UUID.fromString(userId); // Mettre à jour le heartbeat de présence presenceService.heartbeat(userUUID); String pongMessage = buildNotificationJson("pong", Map.of("timestamp", System.currentTimeMillis())); sendToUser(userUUID, pongMessage); Log.debug("[NOTIFICATION-WS] Pong envoyé à l'utilisateur : " + userId); } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors de l'envoi du pong : " + e.getMessage(), e); } } /** * Gère les accusés de réception des notifications. */ private void handleAcknowledgement(Map messageData, String userId) { try { String notificationId = (String) messageData.get("notificationId"); Log.info("[NOTIFICATION-WS] ACK reçu pour la notification " + notificationId + " de l'utilisateur " + userId); // Optionnel: marquer la notification comme délivrée en base de données } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors du traitement de l'ACK : " + e.getMessage(), e); } } /** * Envoie une notification à toutes les sessions d'un utilisateur spécifique. * * Cette méthode est statique pour permettre son appel depuis les services * sans nécessiter une instance de NotificationWebSocket. * * @param userId L'ID de l'utilisateur * @param notificationType Le type de notification * @param data Les données de la notification */ public static void sendNotificationToUser(UUID userId, String notificationType, Map data) { Set sessions = userSessions.get(userId); if (sessions == null || sessions.isEmpty()) { Log.warn("[NOTIFICATION-WS] Utilisateur " + userId + " non connecté ou aucune session active"); return; } String json = buildNotificationJson(notificationType, data); int successCount = 0; int failCount = 0; for (Session session : sessions) { if (session.isOpen()) { try { session.getAsyncRemote().sendText(json); successCount++; } catch (Exception e) { failCount++; Log.error("[NOTIFICATION-WS] Erreur lors de l'envoi à une session de l'utilisateur " + userId + " : " + e.getMessage(), e); } } else { failCount++; } } Log.info("[NOTIFICATION-WS] Notification " + notificationType + " envoyée à l'utilisateur " + userId + " (Succès: " + successCount + ", Échec: " + failCount + ")"); } /** * Envoie un message à toutes les sessions d'un utilisateur. * * Version privée pour usage interne (ping/pong, etc.) * * @param userId L'ID de l'utilisateur * @param message Le message à envoyer */ private void sendToUser(UUID userId, String message) { Set sessions = userSessions.get(userId); if (sessions != null) { for (Session session : sessions) { if (session.isOpen()) { try { session.getAsyncRemote().sendText(message); } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors de l'envoi à l'utilisateur " + userId + " : " + e.getMessage(), e); } } } } } /** * Diffuse une notification à tous les utilisateurs connectés. * * @param notificationType Le type de notification * @param data Les données de la notification */ public static void broadcastNotification(String notificationType, Map data) { String json = buildNotificationJson(notificationType, data); int totalSessions = 0; int successCount = 0; for (Set sessions : userSessions.values()) { for (Session session : sessions) { totalSessions++; if (session.isOpen()) { try { session.getAsyncRemote().sendText(json); successCount++; } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors de la diffusion : " + e.getMessage(), e); } } } } Log.info("[NOTIFICATION-WS] Notification diffusée à " + successCount + " sessions sur " + totalSessions); } /** * Construit un message JSON pour les notifications. * * Format: {"type": "notification_type", "data": {...}} * * @param type Le type de notification * @param data Les données de la notification * @return Le JSON sous forme de String */ private static String buildNotificationJson(String type, Map data) { try { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map message = Map.of( "type", type, "data", data, "timestamp", System.currentTimeMillis() ); return mapper.writeValueAsString(message); } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors de la construction du JSON : " + e.getMessage(), e); return "{\"type\":\"error\",\"data\":{\"message\":\"Erreur de construction du message\"}}"; } } /** * Récupère le nombre total d'utilisateurs connectés. * * @return Le nombre d'utilisateurs connectés */ public static int getConnectedUsersCount() { return userSessions.size(); } /** * Récupère le nombre total de sessions actives. * * @return Le nombre total de sessions */ public static int getTotalSessionsCount() { return userSessions.values().stream() .mapToInt(Set::size) .sum(); } /** * Broadcast une mise à jour de présence à tous les utilisateurs connectés. * * @param presenceData Les données de présence (userId, isOnline, lastSeen) */ public static void broadcastPresenceUpdate(Map presenceData) { try { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map envelope = Map.of( "type", "presence", "data", presenceData ); String json = mapper.writeValueAsString(envelope); // Envoyer à tous les utilisateurs connectés for (Set sessions : userSessions.values()) { for (Session session : sessions) { if (session.isOpen()) { try { session.getAsyncRemote().sendText(json); } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur broadcast présence : " + e.getMessage()); } } } } Log.debug("[NOTIFICATION-WS] Présence broadcastée : " + presenceData.get("userId")); } catch (Exception e) { Log.error("[NOTIFICATION-WS] Erreur lors du broadcast de présence : " + e.getMessage(), e); } } }