diff --git a/src/main/java/com/lions/dev/service/PresenceService.java b/src/main/java/com/lions/dev/service/PresenceService.java index 53969e9..1823e44 100644 --- a/src/main/java/com/lions/dev/service/PresenceService.java +++ b/src/main/java/com/lions/dev/service/PresenceService.java @@ -1,14 +1,18 @@ package com.lions.dev.service; +import com.lions.dev.dto.events.PresenceEvent; import com.lions.dev.entity.users.Users; import com.lions.dev.repository.UsersRepository; -import com.lions.dev.websocket.NotificationWebSocketNext; +import io.quarkus.logging.Log; import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.context.control.ActivateRequestContext; import jakarta.inject.Inject; import jakarta.transaction.Transactional; +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.*; /** @@ -17,7 +21,10 @@ import java.util.*; * Ce service gère: * - Le marquage des utilisateurs comme en ligne/hors ligne * - Le heartbeat pour maintenir le statut online - * - La diffusion de la présence aux amis via WebSocket + * - La diffusion de la présence aux amis via Kafka → WebSocket + * + * Architecture v2.0: + * PresenceService → Kafka Topic (presence.updates) → PresenceKafkaBridge → WebSocket → Client */ @ApplicationScoped public class PresenceService { @@ -25,6 +32,10 @@ public class PresenceService { @Inject UsersRepository usersRepository; + @Inject + @Channel("presence") + Emitter presenceEmitter; // v2.0 - Publie dans Kafka + /** * Marque un utilisateur comme en ligne et broadcast sa présence. * @@ -81,7 +92,8 @@ public class PresenceService { } /** - * Broadcast la présence d'un utilisateur à tous les utilisateurs connectés via WebSocket. + * Broadcast la présence d'un utilisateur via Kafka (v2.0). + * Le PresenceKafkaBridge consommera depuis Kafka et enverra via WebSocket. * * @param userId L'ID de l'utilisateur * @param isOnline Le statut online @@ -89,18 +101,25 @@ public class PresenceService { */ private void broadcastPresenceToAll(UUID userId, boolean isOnline, LocalDateTime lastSeen) { try { - Map presenceData = new HashMap<>(); - presenceData.put("userId", userId.toString()); - presenceData.put("isOnline", isOnline); - presenceData.put("lastSeen", lastSeen != null ? lastSeen.toString() : null); - presenceData.put("timestamp", System.currentTimeMillis()); - - // Envoyer via NotificationWebSocketNext (v2.0) - NotificationWebSocketNext.broadcastPresenceUpdate(presenceData); - - System.out.println("[PRESENCE] Broadcast de la présence de " + userId + " : " + isOnline); + // Convertir LocalDateTime en timestamp (milliseconds) + Long lastSeenTimestamp = lastSeen != null + ? lastSeen.toInstant(ZoneOffset.UTC).toEpochMilli() + : null; + + // Créer l'événement de présence + PresenceEvent presenceEvent = new PresenceEvent( + userId.toString(), + isOnline ? "online" : "offline", + lastSeenTimestamp + ); + + // Publier dans Kafka (le bridge s'occupera de l'envoi WebSocket) + presenceEmitter.send(presenceEvent); + + Log.debug("[PRESENCE] Événement de présence publié dans Kafka: " + userId + " -> " + + (isOnline ? "online" : "offline")); } catch (Exception e) { - System.out.println("[ERROR] Erreur lors du broadcast de présence : " + e.getMessage()); + Log.error("[PRESENCE] Erreur lors de la publication de présence dans Kafka", e); } } diff --git a/src/main/java/com/lions/dev/websocket/bridge/PresenceKafkaBridge.java b/src/main/java/com/lions/dev/websocket/bridge/PresenceKafkaBridge.java new file mode 100644 index 0000000..5ba741a --- /dev/null +++ b/src/main/java/com/lions/dev/websocket/bridge/PresenceKafkaBridge.java @@ -0,0 +1,72 @@ +package com.lions.dev.websocket.bridge; + +import com.lions.dev.dto.events.PresenceEvent; +import com.lions.dev.websocket.NotificationWebSocketNext; +import io.quarkus.logging.Log; +import jakarta.enterprise.context.ApplicationScoped; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +/** + * Bridge qui consomme depuis Kafka et envoie via WebSocket pour les événements de présence. + * + * Architecture: + * PresenceService → Kafka Topic (presence.updates) → Bridge → WebSocket → Client + * + * Les événements de présence (online/offline) sont diffusés à tous les utilisateurs connectés + * pour mettre à jour leur liste d'amis en temps réel. + */ +@ApplicationScoped +public class PresenceKafkaBridge { + + /** + * Consomme les événements de présence depuis Kafka et les route vers WebSocket. + * + * @param message Message Kafka contenant un PresenceEvent + * @return CompletionStage pour gérer l'ack/nack asynchrone + */ + @Incoming("kafka-presence") + public CompletionStage processPresence(Message message) { + try { + PresenceEvent event = message.getPayload(); + + Log.debug("[PRESENCE-BRIDGE] Événement de présence reçu: " + event.getStatus() + + " pour utilisateur: " + event.getUserId()); + + // Broadcast à tous les utilisateurs connectés (pas seulement le propriétaire) + // Car la présence doit être visible par tous les amis + NotificationWebSocketNext.broadcastPresenceUpdate(buildPresenceData(event)); + + Log.debug("[PRESENCE-BRIDGE] Présence routée vers WebSocket pour: " + event.getUserId()); + + // Acknowledger le message Kafka + return message.ack(); + + } catch (IllegalArgumentException e) { + Log.error("[PRESENCE-BRIDGE] UUID invalide dans l'événement", e); + return message.nack(e); + } catch (Exception e) { + Log.error("[PRESENCE-BRIDGE] Erreur traitement événement", e); + return message.nack(e); + } + } + + /** + * Construit les données de présence au format attendu par NotificationWebSocketNext. + */ + private Map buildPresenceData(PresenceEvent event) { + Map presenceData = new HashMap<>(); + presenceData.put("userId", event.getUserId()); + presenceData.put("isOnline", "online".equals(event.getStatus())); + if (event.getLastSeen() != null) { + // Convertir timestamp en ISO string si nécessaire + presenceData.put("lastSeen", new java.util.Date(event.getLastSeen()).toString()); + } + presenceData.put("timestamp", event.getTimestamp() != null ? event.getTimestamp() : System.currentTimeMillis()); + return presenceData; + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index ecf1c83..2004b19 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -80,13 +80,10 @@ mp.messaging.outgoing.reactions.key.serializer=org.apache.kafka.common.serializa # value.serializer omis - Quarkus génère automatiquement depuis Emitter # Topic: Presence Updates -# NOTE: Configuration désactivée car non utilisée actuellement -# PresenceService envoie directement via NotificationWebSocketNext.broadcastPresenceUpdate() -# Pour activer Kafka pour presence, décommentez ci-dessous et ajoutez un Emitter dans PresenceService -# mp.messaging.outgoing.presence.connector=smallrye-kafka -# mp.messaging.outgoing.presence.topic=presence.updates -# mp.messaging.outgoing.presence.key.serializer=org.apache.kafka.common.serialization.StringSerializer -# mp.messaging.outgoing.presence.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer +mp.messaging.outgoing.presence.connector=smallrye-kafka +mp.messaging.outgoing.presence.topic=presence.updates +mp.messaging.outgoing.presence.key.serializer=org.apache.kafka.common.serialization.StringSerializer +# value.serializer omis - Quarkus génère automatiquement depuis Emitter # ==================================================================== # Kafka Topics - Incoming (Kafka → WebSocket Bridge) @@ -117,3 +114,12 @@ mp.messaging.incoming.kafka-reactions.group.id=websocket-reactions-bridge mp.messaging.incoming.kafka-reactions.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer # value.deserializer omis - Quarkus génère automatiquement depuis Message mp.messaging.incoming.kafka-reactions.enable.auto.commit=true + +# Consommer depuis Kafka et router vers WebSocket pour présence +# Note: Quarkus génère automatiquement les deserializers Jackson basés sur le type générique Message +mp.messaging.incoming.kafka-presence.connector=smallrye-kafka +mp.messaging.incoming.kafka-presence.topic=presence.updates +mp.messaging.incoming.kafka-presence.group.id=websocket-presence-bridge +mp.messaging.incoming.kafka-presence.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer +# value.deserializer omis - Quarkus génère automatiquement depuis Message +mp.messaging.incoming.kafka-presence.enable.auto.commit=true