package com.lions.dev.websocket; import com.lions.dev.dto.response.chat.MessageResponseDTO; import com.lions.dev.entity.chat.Message; import com.lions.dev.service.MessageService; import io.quarkus.logging.Log; import io.quarkus.websockets.next.OnClose; import io.quarkus.websockets.next.OnOpen; import io.quarkus.websockets.next.OnTextMessage; import io.quarkus.websockets.next.WebSocket; import io.quarkus.websockets.next.WebSocketConnection; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket endpoint pour le chat en temps réel (WebSockets Next). * * Architecture v2.0: * Client → WebSocket → MessageService → Kafka → Bridge → WebSocket → Destinataire * * Gère: * - La connexion/déconnexion des utilisateurs * - L'envoi et la réception de messages en temps réel * - Les indicateurs de frappe (typing indicators) * - Les confirmations de lecture (read receipts) * - Les confirmations de délivrance * * URL: ws://localhost:8080/chat/{userId} */ @WebSocket(path = "/afterwork/chat/{userId}") @ApplicationScoped public class ChatWebSocketNext { @Inject MessageService messageService; // Map pour stocker les sessions WebSocket des utilisateurs connectés private static final Map sessions = new ConcurrentHashMap<>(); @OnOpen public void onOpen(WebSocketConnection connection) { String userId = connection.pathParam("userId"); try { UUID userUUID = UUID.fromString(userId); sessions.put(userUUID, connection); Log.info("[CHAT-WS-NEXT] WebSocket ouvert pour l'utilisateur ID : " + userId + " (sessions actives: " + sessions.size() + ")"); // Envoyer un message de confirmation String confirmation = buildJsonMessage("connected", Map.of("message", "Connecté au chat")); connection.sendText(confirmation); } catch (IllegalArgumentException e) { Log.error("[CHAT-WS-NEXT] UUID invalide: " + userId, e); connection.close(); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors de la connexion", e); connection.close(); } } @OnClose public void onClose(WebSocketConnection connection) { try { String userId = connection.pathParam("userId"); UUID userUUID = UUID.fromString(userId); sessions.remove(userUUID); Log.info("[CHAT-WS-NEXT] WebSocket fermé pour l'utilisateur ID : " + userId); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors de la fermeture", e); } } @OnTextMessage public void onMessage(String message, WebSocketConnection connection) { try { String userId = connection.pathParam("userId"); Log.debug("[CHAT-WS-NEXT] Message reçu de " + userId + ": " + message); com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map raw = mapper.readValue(message, Map.class); String type = (String) raw.get("type"); @SuppressWarnings("unchecked") Map data = (Map) raw.get("data"); switch (type) { case "message": if (data != null) handleChatMessage(data, userId); else Log.warn("[CHAT-WS-NEXT] Message sans 'data'"); break; case "typing": if (data != null) handleTypingIndicator(data, userId); break; case "read": if (data != null) handleReadReceipt(data, userId); else Log.warn("[CHAT-WS-NEXT] Read receipt sans 'data'"); break; case "ping": // Heartbeat - ignorer break; default: Log.warn("[CHAT-WS-NEXT] Type inconnu: " + type); } } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors du traitement du message", e); } } /** * Gère l'envoi d'un message de chat via WebSocket. * Note: L'envoi principal passe par REST (POST /messages). Cette méthode * est pour compatibilité si le client envoie via WebSocket. */ private void handleChatMessage(Map data, String senderId) { try { UUID senderUUID = UUID.fromString(senderId); UUID recipientUUID = UUID.fromString((String) data.get("recipientId")); String content = (String) data.get("content"); String messageType = data.getOrDefault("messageType", "text").toString(); String mediaUrl = (String) data.get("mediaUrl"); // Enregistrer le message dans la base de données // MessageService publiera automatiquement dans Kafka Message message = messageService.sendMessage( senderUUID, recipientUUID, content, messageType, mediaUrl ); // Créer le DTO de réponse MessageResponseDTO response = new MessageResponseDTO(message); String responseJson = buildJsonMessage("message", Map.of("message", response)); // Envoyer confirmation à l'expéditeur sendToUser(senderUUID, responseJson); Log.info("[CHAT-WS-NEXT] Message traité de " + senderId + " à " + recipientUUID); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors de l'envoi du message", e); } } /** * Gère les indicateurs de frappe. * data doit contenir recipientId (ID du destinataire) et isTyping. */ private void handleTypingIndicator(Map data, String userId) { try { Object recipientIdObj = data.get("recipientId"); if (recipientIdObj == null) { Log.warn("[CHAT-WS-NEXT] Typing sans recipientId - ignoré"); return; } UUID recipientUUID = UUID.fromString(recipientIdObj.toString()); Object isTypingObj = data.get("isTyping"); boolean isTyping = isTypingObj instanceof Boolean ? (Boolean) isTypingObj : Boolean.parseBoolean(String.valueOf(isTypingObj)); String response = buildJsonMessage("typing", Map.of( "conversationId", data.getOrDefault("conversationId", ""), "userId", userId, "isTyping", isTyping )); sendToUser(recipientUUID, response); Log.debug("[CHAT-WS-NEXT] Indicateur de frappe envoyé de " + userId + " à " + recipientUUID); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors de l'envoi de l'indicateur de frappe", e); } } /** * Gère les confirmations de lecture. * Envoie type "read" (format attendu par le client Flutter). */ private void handleReadReceipt(Map data, String userId) { try { UUID messageUUID = UUID.fromString((String) data.get("messageId")); Message message = messageService.markMessageAsRead(messageUUID); if (message != null) { UUID senderUUID = message.getSender().getId(); long now = System.currentTimeMillis(); String timestampIso = java.time.Instant.ofEpochMilli(now).toString(); String response = buildJsonMessage("read", Map.of( "messageId", messageUUID.toString(), "userId", userId, "timestamp", timestampIso )); sendToUser(senderUUID, response); Log.info("[CHAT-WS-NEXT] Confirmation de lecture envoyée pour message " + messageUUID); } } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors du traitement de la confirmation de lecture", e); } } /** * Envoie un message chat à un utilisateur spécifique via WebSocket. * Appelé par le bridge Kafka → WebSocket. * * @param userId ID de l'utilisateur destinataire * @param message Message JSON à envoyer */ public static void sendMessageToUser(UUID userId, String message) { WebSocketConnection connection = sessions.get(userId); if (connection == null || !connection.isOpen()) { if (connection != null) { sessions.remove(userId); Log.debug("[CHAT-WS-NEXT] Connexion périmée supprimée pour " + userId); } Log.info("[CHAT-WS-NEXT] Destinataire " + userId + " non connecté, message non délivré (sessions: " + sessions.size() + ")"); return; } try { String preview = message.length() > 300 ? message.substring(0, 300) + "..." : message; Log.info("[CHAT-WS-NEXT] Envoi vers " + userId + " (" + message.length() + " car): " + preview); connection.sendText(message); Log.info("[CHAT-WS-NEXT] Message délivré à l'utilisateur: " + userId); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors de l'envoi à " + userId + ", connexion supprimée", e); sessions.remove(userId); } } /** * Envoie une confirmation de délivrance à l'expéditeur via WebSocket. */ public static void sendDeliveryConfirmation(UUID senderId, Map confirmationData) { WebSocketConnection connection = sessions.get(senderId); if (connection == null || !connection.isOpen()) { if (connection != null) sessions.remove(senderId); Log.debug("[CHAT-WS-NEXT] Expéditeur " + senderId + " non connecté pour confirmation"); return; } try { String response = buildJsonMessage("delivery_confirmation", confirmationData); connection.sendText(response); Log.debug("[CHAT-WS-NEXT] Confirmation de délivrance envoyée à: " + senderId); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur envoi confirmation à " + senderId + ", connexion supprimée", e); sessions.remove(senderId); } } /** * Envoie une confirmation de lecture à l'expéditeur via WebSocket. */ public static void sendReadConfirmation(UUID senderId, Map readData) { WebSocketConnection connection = sessions.get(senderId); if (connection == null || !connection.isOpen()) { if (connection != null) sessions.remove(senderId); Log.debug("[CHAT-WS-NEXT] Expéditeur " + senderId + " non connecté pour confirmation de lecture"); return; } try { String response = buildJsonMessage("read_confirmation", readData); connection.sendText(response); Log.debug("[CHAT-WS-NEXT] Confirmation de lecture envoyée à: " + senderId); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur envoi confirmation lecture à " + senderId + ", connexion supprimée", e); sessions.remove(senderId); } } /** * Envoie un message à un utilisateur (méthode privée pour usage interne). */ private void sendToUser(UUID userId, String message) { WebSocketConnection connection = sessions.get(userId); if (connection != null && connection.isOpen()) { try { connection.sendText(message); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur lors de l'envoi à " + userId, e); } } } /** * Construit un message JSON. */ private static String buildJsonMessage(String type, Map data) { try { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); Map message = Map.of( "type", type, "data", data, "timestamp", System.currentTimeMillis() ); return mapper.writeValueAsString(message); } catch (Exception e) { Log.error("[CHAT-WS-NEXT] Erreur construction JSON", e); return "{\"type\":\"error\",\"data\":{\"message\":\"Erreur de construction\"}}"; } } /** * Récupère le nombre d'utilisateurs connectés au chat. */ public static int getConnectedUsersCount() { return sessions.size(); } }