Refactoring
This commit is contained in:
@@ -1,14 +1,18 @@
|
|||||||
package com.lions.dev.service;
|
package com.lions.dev.service;
|
||||||
|
|
||||||
|
import com.lions.dev.dto.events.PresenceEvent;
|
||||||
import com.lions.dev.entity.users.Users;
|
import com.lions.dev.entity.users.Users;
|
||||||
import com.lions.dev.repository.UsersRepository;
|
import com.lions.dev.repository.UsersRepository;
|
||||||
import com.lions.dev.websocket.NotificationWebSocketNext;
|
import io.quarkus.logging.Log;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.enterprise.context.control.ActivateRequestContext;
|
import jakarta.enterprise.context.control.ActivateRequestContext;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.transaction.Transactional;
|
import jakarta.transaction.Transactional;
|
||||||
|
import org.eclipse.microprofile.reactive.messaging.Channel;
|
||||||
|
import org.eclipse.microprofile.reactive.messaging.Emitter;
|
||||||
|
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
|
import java.time.ZoneOffset;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -17,7 +21,10 @@ import java.util.*;
|
|||||||
* Ce service gère:
|
* Ce service gère:
|
||||||
* - Le marquage des utilisateurs comme en ligne/hors ligne
|
* - Le marquage des utilisateurs comme en ligne/hors ligne
|
||||||
* - Le heartbeat pour maintenir le statut online
|
* - Le heartbeat pour maintenir le statut online
|
||||||
* - La diffusion de la présence aux amis via WebSocket
|
* - La diffusion de la présence aux amis via Kafka → WebSocket
|
||||||
|
*
|
||||||
|
* Architecture v2.0:
|
||||||
|
* PresenceService → Kafka Topic (presence.updates) → PresenceKafkaBridge → WebSocket → Client
|
||||||
*/
|
*/
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class PresenceService {
|
public class PresenceService {
|
||||||
@@ -25,6 +32,10 @@ public class PresenceService {
|
|||||||
@Inject
|
@Inject
|
||||||
UsersRepository usersRepository;
|
UsersRepository usersRepository;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
@Channel("presence")
|
||||||
|
Emitter<PresenceEvent> presenceEmitter; // v2.0 - Publie dans Kafka
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marque un utilisateur comme en ligne et broadcast sa présence.
|
* Marque un utilisateur comme en ligne et broadcast sa présence.
|
||||||
*
|
*
|
||||||
@@ -81,7 +92,8 @@ public class PresenceService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Broadcast la présence d'un utilisateur à tous les utilisateurs connectés via WebSocket.
|
* Broadcast la présence d'un utilisateur via Kafka (v2.0).
|
||||||
|
* Le PresenceKafkaBridge consommera depuis Kafka et enverra via WebSocket.
|
||||||
*
|
*
|
||||||
* @param userId L'ID de l'utilisateur
|
* @param userId L'ID de l'utilisateur
|
||||||
* @param isOnline Le statut online
|
* @param isOnline Le statut online
|
||||||
@@ -89,18 +101,25 @@ public class PresenceService {
|
|||||||
*/
|
*/
|
||||||
private void broadcastPresenceToAll(UUID userId, boolean isOnline, LocalDateTime lastSeen) {
|
private void broadcastPresenceToAll(UUID userId, boolean isOnline, LocalDateTime lastSeen) {
|
||||||
try {
|
try {
|
||||||
Map<String, Object> presenceData = new HashMap<>();
|
// Convertir LocalDateTime en timestamp (milliseconds)
|
||||||
presenceData.put("userId", userId.toString());
|
Long lastSeenTimestamp = lastSeen != null
|
||||||
presenceData.put("isOnline", isOnline);
|
? lastSeen.toInstant(ZoneOffset.UTC).toEpochMilli()
|
||||||
presenceData.put("lastSeen", lastSeen != null ? lastSeen.toString() : null);
|
: null;
|
||||||
presenceData.put("timestamp", System.currentTimeMillis());
|
|
||||||
|
// Créer l'événement de présence
|
||||||
// Envoyer via NotificationWebSocketNext (v2.0)
|
PresenceEvent presenceEvent = new PresenceEvent(
|
||||||
NotificationWebSocketNext.broadcastPresenceUpdate(presenceData);
|
userId.toString(),
|
||||||
|
isOnline ? "online" : "offline",
|
||||||
System.out.println("[PRESENCE] Broadcast de la présence de " + userId + " : " + isOnline);
|
lastSeenTimestamp
|
||||||
|
);
|
||||||
|
|
||||||
|
// Publier dans Kafka (le bridge s'occupera de l'envoi WebSocket)
|
||||||
|
presenceEmitter.send(presenceEvent);
|
||||||
|
|
||||||
|
Log.debug("[PRESENCE] Événement de présence publié dans Kafka: " + userId + " -> " +
|
||||||
|
(isOnline ? "online" : "offline"));
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
System.out.println("[ERROR] Erreur lors du broadcast de présence : " + e.getMessage());
|
Log.error("[PRESENCE] Erreur lors de la publication de présence dans Kafka", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,72 @@
|
|||||||
|
package com.lions.dev.websocket.bridge;
|
||||||
|
|
||||||
|
import com.lions.dev.dto.events.PresenceEvent;
|
||||||
|
import com.lions.dev.websocket.NotificationWebSocketNext;
|
||||||
|
import io.quarkus.logging.Log;
|
||||||
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
|
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
||||||
|
import org.eclipse.microprofile.reactive.messaging.Message;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletionStage;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bridge qui consomme depuis Kafka et envoie via WebSocket pour les événements de présence.
|
||||||
|
*
|
||||||
|
* Architecture:
|
||||||
|
* PresenceService → Kafka Topic (presence.updates) → Bridge → WebSocket → Client
|
||||||
|
*
|
||||||
|
* Les événements de présence (online/offline) sont diffusés à tous les utilisateurs connectés
|
||||||
|
* pour mettre à jour leur liste d'amis en temps réel.
|
||||||
|
*/
|
||||||
|
@ApplicationScoped
|
||||||
|
public class PresenceKafkaBridge {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Consomme les événements de présence depuis Kafka et les route vers WebSocket.
|
||||||
|
*
|
||||||
|
* @param message Message Kafka contenant un PresenceEvent
|
||||||
|
* @return CompletionStage pour gérer l'ack/nack asynchrone
|
||||||
|
*/
|
||||||
|
@Incoming("kafka-presence")
|
||||||
|
public CompletionStage<Void> processPresence(Message<PresenceEvent> message) {
|
||||||
|
try {
|
||||||
|
PresenceEvent event = message.getPayload();
|
||||||
|
|
||||||
|
Log.debug("[PRESENCE-BRIDGE] Événement de présence reçu: " + event.getStatus() +
|
||||||
|
" pour utilisateur: " + event.getUserId());
|
||||||
|
|
||||||
|
// Broadcast à tous les utilisateurs connectés (pas seulement le propriétaire)
|
||||||
|
// Car la présence doit être visible par tous les amis
|
||||||
|
NotificationWebSocketNext.broadcastPresenceUpdate(buildPresenceData(event));
|
||||||
|
|
||||||
|
Log.debug("[PRESENCE-BRIDGE] Présence routée vers WebSocket pour: " + event.getUserId());
|
||||||
|
|
||||||
|
// Acknowledger le message Kafka
|
||||||
|
return message.ack();
|
||||||
|
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
Log.error("[PRESENCE-BRIDGE] UUID invalide dans l'événement", e);
|
||||||
|
return message.nack(e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
Log.error("[PRESENCE-BRIDGE] Erreur traitement événement", e);
|
||||||
|
return message.nack(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Construit les données de présence au format attendu par NotificationWebSocketNext.
|
||||||
|
*/
|
||||||
|
private Map<String, Object> buildPresenceData(PresenceEvent event) {
|
||||||
|
Map<String, Object> presenceData = new HashMap<>();
|
||||||
|
presenceData.put("userId", event.getUserId());
|
||||||
|
presenceData.put("isOnline", "online".equals(event.getStatus()));
|
||||||
|
if (event.getLastSeen() != null) {
|
||||||
|
// Convertir timestamp en ISO string si nécessaire
|
||||||
|
presenceData.put("lastSeen", new java.util.Date(event.getLastSeen()).toString());
|
||||||
|
}
|
||||||
|
presenceData.put("timestamp", event.getTimestamp() != null ? event.getTimestamp() : System.currentTimeMillis());
|
||||||
|
return presenceData;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -80,13 +80,10 @@ mp.messaging.outgoing.reactions.key.serializer=org.apache.kafka.common.serializa
|
|||||||
# value.serializer omis - Quarkus génère automatiquement depuis Emitter<ReactionEvent>
|
# value.serializer omis - Quarkus génère automatiquement depuis Emitter<ReactionEvent>
|
||||||
|
|
||||||
# Topic: Presence Updates
|
# Topic: Presence Updates
|
||||||
# NOTE: Configuration désactivée car non utilisée actuellement
|
mp.messaging.outgoing.presence.connector=smallrye-kafka
|
||||||
# PresenceService envoie directement via NotificationWebSocketNext.broadcastPresenceUpdate()
|
mp.messaging.outgoing.presence.topic=presence.updates
|
||||||
# Pour activer Kafka pour presence, décommentez ci-dessous et ajoutez un Emitter<PresenceEvent> dans PresenceService
|
mp.messaging.outgoing.presence.key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
||||||
# mp.messaging.outgoing.presence.connector=smallrye-kafka
|
# value.serializer omis - Quarkus génère automatiquement depuis Emitter<PresenceEvent>
|
||||||
# mp.messaging.outgoing.presence.topic=presence.updates
|
|
||||||
# mp.messaging.outgoing.presence.key.serializer=org.apache.kafka.common.serialization.StringSerializer
|
|
||||||
# mp.messaging.outgoing.presence.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
|
|
||||||
|
|
||||||
# ====================================================================
|
# ====================================================================
|
||||||
# Kafka Topics - Incoming (Kafka → WebSocket Bridge)
|
# Kafka Topics - Incoming (Kafka → WebSocket Bridge)
|
||||||
@@ -117,3 +114,12 @@ mp.messaging.incoming.kafka-reactions.group.id=websocket-reactions-bridge
|
|||||||
mp.messaging.incoming.kafka-reactions.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
mp.messaging.incoming.kafka-reactions.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
||||||
# value.deserializer omis - Quarkus génère automatiquement depuis Message<ReactionEvent>
|
# value.deserializer omis - Quarkus génère automatiquement depuis Message<ReactionEvent>
|
||||||
mp.messaging.incoming.kafka-reactions.enable.auto.commit=true
|
mp.messaging.incoming.kafka-reactions.enable.auto.commit=true
|
||||||
|
|
||||||
|
# Consommer depuis Kafka et router vers WebSocket pour présence
|
||||||
|
# Note: Quarkus génère automatiquement les deserializers Jackson basés sur le type générique Message<PresenceEvent>
|
||||||
|
mp.messaging.incoming.kafka-presence.connector=smallrye-kafka
|
||||||
|
mp.messaging.incoming.kafka-presence.topic=presence.updates
|
||||||
|
mp.messaging.incoming.kafka-presence.group.id=websocket-presence-bridge
|
||||||
|
mp.messaging.incoming.kafka-presence.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
|
||||||
|
# value.deserializer omis - Quarkus génère automatiquement depuis Message<PresenceEvent>
|
||||||
|
mp.messaging.incoming.kafka-presence.enable.auto.commit=true
|
||||||
|
|||||||
Reference in New Issue
Block a user