feat: migration complète vers WebSockets Next + Kafka pour temps réel
- Migration de Jakarta WebSocket vers Quarkus WebSockets Next - Implémentation de l'architecture Kafka pour événements temps réel - Ajout des DTOs d'événements (NotificationEvent, ChatMessageEvent, ReactionEvent, PresenceEvent) - Création des bridges Kafka → WebSocket (NotificationKafkaBridge, ChatKafkaBridge, ReactionKafkaBridge) - Mise à jour des services pour publier dans Kafka au lieu d'appeler directement WebSocket - Suppression des classes obsolètes (ChatWebSocket, NotificationWebSocket) - Correction de l'injection des paramètres path dans WebSockets Next (utilisation de connection.pathParam) - Ajout des migrations DB pour bookings, promotions, business hours, amenities, reviews - Mise à jour de la configuration application.properties pour Kafka et WebSockets Next - Mise à jour .gitignore pour ignorer les fichiers de logs
This commit is contained in:
@@ -7,7 +7,10 @@ import com.lions.dev.exception.UserNotFoundException;
|
||||
import com.lions.dev.repository.ConversationRepository;
|
||||
import com.lions.dev.repository.MessageRepository;
|
||||
import com.lions.dev.repository.UsersRepository;
|
||||
import com.lions.dev.websocket.ChatWebSocket;
|
||||
import com.lions.dev.dto.events.ChatMessageEvent;
|
||||
import org.eclipse.microprofile.reactive.messaging.Channel;
|
||||
import org.eclipse.microprofile.reactive.messaging.Emitter;
|
||||
import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.transaction.Transactional;
|
||||
@@ -40,6 +43,10 @@ public class MessageService {
|
||||
@Inject
|
||||
NotificationService notificationService;
|
||||
|
||||
@Inject
|
||||
@Channel("chat-messages")
|
||||
Emitter<ChatMessageEvent> chatMessageEmitter; // v2.0 - Publie dans Kafka
|
||||
|
||||
/**
|
||||
* Envoie un message d'un utilisateur à un autre.
|
||||
*
|
||||
@@ -92,7 +99,8 @@ public class MessageService {
|
||||
|
||||
// Créer une notification pour le destinataire
|
||||
try {
|
||||
String senderName = sender.getPrenoms() + " " + sender.getNom();
|
||||
// v2.0 - Utiliser les nouveaux noms de champs
|
||||
String senderName = sender.getFirstName() + " " + sender.getLastName();
|
||||
String notificationMessage = content.length() > 50
|
||||
? content.substring(0, 50) + "..."
|
||||
: content;
|
||||
@@ -109,43 +117,78 @@ public class MessageService {
|
||||
System.out.println("[ERROR] Erreur lors de la création de la notification : " + e.getMessage());
|
||||
}
|
||||
|
||||
// TEMPS RÉEL : Envoyer le message via WebSocket au destinataire
|
||||
// TEMPS RÉEL : Publier dans Kafka (v2.0)
|
||||
try {
|
||||
Map<String, Object> messageData = new HashMap<>();
|
||||
messageData.put("id", message.getId().toString());
|
||||
messageData.put("conversationId", conversation.getId().toString());
|
||||
messageData.put("senderId", senderId.toString());
|
||||
messageData.put("senderFirstName", sender.getPrenoms());
|
||||
messageData.put("senderLastName", sender.getNom());
|
||||
messageData.put("senderProfileImageUrl", sender.getProfileImageUrl() != null ? sender.getProfileImageUrl() : "");
|
||||
messageData.put("content", content);
|
||||
messageData.put("timestamp", message.getCreatedAt().toString());
|
||||
messageData.put("isRead", message.isRead());
|
||||
messageData.put("attachmentUrl", mediaUrl != null ? mediaUrl : "");
|
||||
messageData.put("attachmentType", messageType != null ? messageType : "text");
|
||||
// Créer l'événement pour Kafka
|
||||
ChatMessageEvent event = new ChatMessageEvent();
|
||||
event.setConversationId(conversation.getId().toString());
|
||||
event.setSenderId(senderId.toString());
|
||||
event.setRecipientId(recipientId.toString());
|
||||
event.setContent(content);
|
||||
event.setMessageId(message.getId().toString());
|
||||
event.setEventType("message");
|
||||
event.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
// Métadonnées additionnelles
|
||||
Map<String, Object> eventMetadata = new HashMap<>();
|
||||
eventMetadata.put("senderFirstName", sender.getFirstName());
|
||||
eventMetadata.put("senderLastName", sender.getLastName());
|
||||
eventMetadata.put("senderProfileImageUrl", sender.getProfileImageUrl() != null ? sender.getProfileImageUrl() : "");
|
||||
eventMetadata.put("isRead", message.isRead());
|
||||
eventMetadata.put("attachmentUrl", mediaUrl != null ? mediaUrl : "");
|
||||
eventMetadata.put("attachmentType", messageType != null ? messageType : "text");
|
||||
event.setMetadata(eventMetadata);
|
||||
|
||||
// Envoyer au destinataire via ChatWebSocket
|
||||
ChatWebSocket.sendMessageToUser(recipientId, messageData);
|
||||
// Publier dans Kafka (utiliser conversationId comme clé pour garantir l'ordre)
|
||||
OutgoingKafkaRecordMetadata kafkaMetadata =
|
||||
OutgoingKafkaRecordMetadata.builder()
|
||||
.withKey(conversation.getId().toString())
|
||||
.build();
|
||||
|
||||
chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of(
|
||||
event,
|
||||
() -> java.util.concurrent.CompletableFuture.completedFuture(null), // ack
|
||||
throwable -> {
|
||||
System.out.println("[ERROR] Erreur envoi Kafka: " + throwable.getMessage());
|
||||
return java.util.concurrent.CompletableFuture.completedFuture(null); // nack
|
||||
}
|
||||
).addMetadata(kafkaMetadata));
|
||||
|
||||
System.out.println("[LOG] Message envoyé via WebSocket au destinataire : " + recipientId);
|
||||
System.out.println("[LOG] Message publié dans Kafka: " + message.getId());
|
||||
|
||||
// Envoyer confirmation de délivrance à l'expéditeur
|
||||
// Envoyer confirmation de délivrance à l'expéditeur (via Kafka aussi)
|
||||
try {
|
||||
Map<String, Object> deliveryConfirmation = new HashMap<>();
|
||||
deliveryConfirmation.put("messageId", message.getId().toString());
|
||||
deliveryConfirmation.put("isDelivered", true);
|
||||
deliveryConfirmation.put("timestamp", System.currentTimeMillis());
|
||||
ChatMessageEvent deliveryEvent = new ChatMessageEvent();
|
||||
deliveryEvent.setConversationId(conversation.getId().toString());
|
||||
deliveryEvent.setSenderId(recipientId.toString()); // Le destinataire confirme
|
||||
deliveryEvent.setRecipientId(senderId.toString()); // À l'expéditeur
|
||||
deliveryEvent.setMessageId(message.getId().toString());
|
||||
deliveryEvent.setEventType("delivery_confirmation");
|
||||
deliveryEvent.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
Map<String, Object> deliveryEventMetadata = new HashMap<>();
|
||||
deliveryEventMetadata.put("isDelivered", true);
|
||||
deliveryEvent.setMetadata(deliveryEventMetadata);
|
||||
|
||||
ChatWebSocket.sendDeliveryConfirmation(senderId, deliveryConfirmation);
|
||||
OutgoingKafkaRecordMetadata deliveryKafkaMetadata =
|
||||
OutgoingKafkaRecordMetadata.builder()
|
||||
.withKey(conversation.getId().toString())
|
||||
.build();
|
||||
|
||||
chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of(
|
||||
deliveryEvent,
|
||||
() -> java.util.concurrent.CompletableFuture.completedFuture(null),
|
||||
throwable -> java.util.concurrent.CompletableFuture.completedFuture(null)
|
||||
).addMetadata(deliveryKafkaMetadata));
|
||||
|
||||
System.out.println("[LOG] Confirmation de délivrance envoyée à l'expéditeur : " + senderId);
|
||||
System.out.println("[LOG] Confirmation de délivrance publiée dans Kafka pour : " + senderId);
|
||||
} catch (Exception deliveryEx) {
|
||||
System.out.println("[ERROR] Erreur envoi confirmation délivrance : " + deliveryEx.getMessage());
|
||||
System.out.println("[ERROR] Erreur publication confirmation délivrance : " + deliveryEx.getMessage());
|
||||
// Ne pas bloquer si la confirmation échoue
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.out.println("[ERROR] Erreur lors de l'envoi du message via WebSocket : " + e.getMessage());
|
||||
// Ne pas bloquer l'envoi du message si WebSocket échoue
|
||||
System.out.println("[ERROR] Erreur lors de la publication dans Kafka : " + e.getMessage());
|
||||
// Ne pas bloquer l'envoi du message si Kafka échoue
|
||||
}
|
||||
|
||||
return message;
|
||||
@@ -240,18 +283,36 @@ public class MessageService {
|
||||
? conversation.getUser2().getId()
|
||||
: conversation.getUser1().getId();
|
||||
|
||||
Map<String, Object> readConfirmation = new HashMap<>();
|
||||
readConfirmation.put("messageId", message.getId().toString());
|
||||
readConfirmation.put("userId", recipientId.toString());
|
||||
readConfirmation.put("timestamp", java.time.LocalDateTime.now().toString());
|
||||
// Publier confirmation de lecture dans Kafka (v2.0)
|
||||
try {
|
||||
ChatMessageEvent readEvent = new ChatMessageEvent();
|
||||
readEvent.setConversationId(conversation.getId().toString());
|
||||
readEvent.setSenderId(recipientId.toString()); // Celui qui a lu
|
||||
readEvent.setRecipientId(message.getSender().getId().toString()); // L'expéditeur
|
||||
readEvent.setMessageId(message.getId().toString());
|
||||
readEvent.setEventType("read_confirmation");
|
||||
readEvent.setTimestamp(System.currentTimeMillis());
|
||||
|
||||
Map<String, Object> readEventMetadata = new HashMap<>();
|
||||
readEventMetadata.put("readBy", recipientId.toString());
|
||||
readEventMetadata.put("readAt", System.currentTimeMillis());
|
||||
readEvent.setMetadata(readEventMetadata);
|
||||
|
||||
// Envoyer via ChatWebSocket avec type "read"
|
||||
com.lions.dev.websocket.ChatWebSocket.sendReadConfirmation(
|
||||
message.getSender().getId(),
|
||||
readConfirmation
|
||||
);
|
||||
OutgoingKafkaRecordMetadata readKafkaMetadata =
|
||||
OutgoingKafkaRecordMetadata.builder()
|
||||
.withKey(conversation.getId().toString())
|
||||
.build();
|
||||
|
||||
chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of(
|
||||
readEvent,
|
||||
() -> java.util.concurrent.CompletableFuture.completedFuture(null),
|
||||
throwable -> java.util.concurrent.CompletableFuture.completedFuture(null)
|
||||
).addMetadata(readKafkaMetadata));
|
||||
|
||||
System.out.println("[LOG] Confirmation de lecture envoyée à l'expéditeur : " + message.getSender().getId());
|
||||
System.out.println("[LOG] Confirmation de lecture publiée dans Kafka pour : " + message.getSender().getId());
|
||||
} catch (Exception e) {
|
||||
System.out.println("[ERROR] Erreur publication confirmation lecture : " + e.getMessage());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.out.println("[ERROR] Erreur envoi confirmation lecture : " + e.getMessage());
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user