package com.lions.dev.service; import com.lions.dev.entity.chat.Conversation; import com.lions.dev.entity.chat.Message; import com.lions.dev.entity.users.Users; import com.lions.dev.exception.UserNotFoundException; import com.lions.dev.repository.ConversationRepository; import com.lions.dev.repository.MessageRepository; import com.lions.dev.repository.UsersRepository; import com.lions.dev.dto.events.ChatMessageEvent; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import io.smallrye.reactive.messaging.kafka.api.OutgoingKafkaRecordMetadata; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import jakarta.transaction.Transactional; import org.jboss.logging.Logger; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; /** * Service de gestion des messages et conversations. * * Ce service contient la logique métier pour l'envoi de messages, * la récupération de conversations, et la gestion des messages non lus. */ @ApplicationScoped public class MessageService { private static final Logger logger = Logger.getLogger(MessageService.class); @Inject MessageRepository messageRepository; @Inject ConversationRepository conversationRepository; @Inject UsersRepository usersRepository; @Inject NotificationService notificationService; @Inject @Channel("chat-messages") Emitter chatMessageEmitter; // v2.0 - Publie dans Kafka /** * Envoie un message d'un utilisateur à un autre. * * @param senderId L'ID de l'expéditeur * @param recipientId L'ID du destinataire * @param content Le contenu du message * @param messageType Le type de message (text, image, etc.) * @param mediaUrl L'URL du média (optionnel) * @return Le message créé * @throws UserNotFoundException Si l'un des utilisateurs n'existe pas */ @Transactional public Message sendMessage( UUID senderId, UUID recipientId, String content, String messageType, String mediaUrl) { logger.info("[MessageService] Envoi de message de " + senderId + " à " + recipientId); // Récupérer les utilisateurs Users sender = usersRepository.findById(senderId); Users recipient = usersRepository.findById(recipientId); if (sender == null) { throw new UserNotFoundException("Expéditeur non trouvé avec l'ID : " + senderId); } if (recipient == null) { throw new UserNotFoundException("Destinataire non trouvé avec l'ID : " + recipientId); } // Trouver ou créer la conversation Conversation conversation = conversationRepository.findOrCreate(sender, recipient); // Créer le message Message message = new Message(conversation, sender, content); message.setMessageType(messageType != null ? messageType : "text"); if (mediaUrl != null && !mediaUrl.isEmpty()) { message.setMediaUrl(mediaUrl); } message.markAsDelivered(); // Persister le message messageRepository.persist(message); logger.info("[MessageService] Message créé avec l'ID : " + message.getId()); // Mettre à jour la conversation conversation.updateLastMessage(message); conversationRepository.persist(conversation); // Créer une notification pour le destinataire try { // v2.0 - Utiliser les nouveaux noms de champs String senderName = sender.getFirstName() + " " + sender.getLastName(); String notificationMessage = content.length() > 50 ? content.substring(0, 50) + "..." : content; notificationService.createNotification( "Nouveau message de " + senderName, notificationMessage, "message", recipientId, null ); logger.info("[MessageService] Notification créée pour le destinataire"); } catch (Exception e) { logger.error("[MessageService] Erreur lors de la création de la notification : " + e.getMessage()); } // TEMPS RÉEL : Publier dans Kafka (v2.0) try { // Créer l'événement pour Kafka ChatMessageEvent event = new ChatMessageEvent(); event.setConversationId(conversation.getId().toString()); event.setSenderId(senderId.toString()); event.setRecipientId(recipientId.toString()); event.setContent(content); event.setMessageId(message.getId().toString()); event.setEventType("message"); event.setTimestamp(System.currentTimeMillis()); // Métadonnées additionnelles Map eventMetadata = new HashMap<>(); eventMetadata.put("senderFirstName", sender.getFirstName()); eventMetadata.put("senderLastName", sender.getLastName()); eventMetadata.put("senderProfileImageUrl", sender.getProfileImageUrl() != null ? sender.getProfileImageUrl() : ""); eventMetadata.put("isRead", message.isRead()); eventMetadata.put("attachmentUrl", mediaUrl != null ? mediaUrl : ""); eventMetadata.put("attachmentType", messageType != null ? messageType : "text"); event.setMetadata(eventMetadata); // Publier dans Kafka (utiliser conversationId comme clé pour garantir l'ordre) OutgoingKafkaRecordMetadata kafkaMetadata = OutgoingKafkaRecordMetadata.builder() .withKey(conversation.getId().toString()) .build(); chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of( event, () -> java.util.concurrent.CompletableFuture.completedFuture(null), // ack throwable -> { logger.error("[MessageService] Erreur envoi Kafka: " + throwable.getMessage()); return java.util.concurrent.CompletableFuture.completedFuture(null); // nack } ).addMetadata(kafkaMetadata)); logger.info("[MessageService] Message publié dans Kafka: " + message.getId()); // Envoyer confirmation de délivrance à l'expéditeur (via Kafka aussi) try { ChatMessageEvent deliveryEvent = new ChatMessageEvent(); deliveryEvent.setConversationId(conversation.getId().toString()); deliveryEvent.setSenderId(recipientId.toString()); // Le destinataire confirme deliveryEvent.setRecipientId(senderId.toString()); // À l'expéditeur deliveryEvent.setMessageId(message.getId().toString()); deliveryEvent.setEventType("delivery_confirmation"); deliveryEvent.setTimestamp(System.currentTimeMillis()); Map deliveryEventMetadata = new HashMap<>(); deliveryEventMetadata.put("isDelivered", true); deliveryEvent.setMetadata(deliveryEventMetadata); OutgoingKafkaRecordMetadata deliveryKafkaMetadata = OutgoingKafkaRecordMetadata.builder() .withKey(conversation.getId().toString()) .build(); chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of( deliveryEvent, () -> java.util.concurrent.CompletableFuture.completedFuture(null), throwable -> java.util.concurrent.CompletableFuture.completedFuture(null) ).addMetadata(deliveryKafkaMetadata)); logger.info("[MessageService] Confirmation de délivrance publiée dans Kafka pour : " + senderId); } catch (Exception deliveryEx) { logger.error("[MessageService] Erreur publication confirmation délivrance : " + deliveryEx.getMessage()); // Ne pas bloquer si la confirmation échoue } } catch (Exception e) { io.quarkus.logging.Log.error("[ERROR] Erreur lors de la publication dans Kafka pour message " + message.getId(), e); // Ne pas bloquer l'envoi du message si Kafka échoue } return message; } /** * Récupère toutes les conversations d'un utilisateur. * * @param userId L'ID de l'utilisateur * @return Liste des conversations * @throws UserNotFoundException Si l'utilisateur n'existe pas */ public List getUserConversations(UUID userId) { logger.info("[MessageService] Récupération des conversations pour l'utilisateur ID : " + userId); Users user = usersRepository.findById(userId); if (user == null) { throw new UserNotFoundException("Utilisateur non trouvé avec l'ID : " + userId); } return conversationRepository.findByUser(user); } /** * Récupère tous les messages d'une conversation avec pagination. * * @param conversationId L'ID de la conversation * @param page Le numéro de la page * @param size La taille de la page * @return Liste paginée des messages */ public List getConversationMessages(UUID conversationId, int page, int size) { logger.info("[MessageService] Récupération des messages pour la conversation ID : " + conversationId); return messageRepository.findByConversationId(conversationId, page, size); } /** * Récupère une conversation spécifique. * * @param conversationId L'ID de la conversation * @return La conversation */ public Conversation getConversation(UUID conversationId) { logger.info("[MessageService] Récupération de la conversation ID : " + conversationId); return conversationRepository.findById(conversationId); } /** * Récupère une conversation entre deux utilisateurs. * * @param user1Id L'ID du premier utilisateur * @param user2Id L'ID du deuxième utilisateur * @return La conversation ou null si elle n'existe pas * @throws UserNotFoundException Si l'un des utilisateurs n'existe pas */ public Conversation getConversationBetweenUsers(UUID user1Id, UUID user2Id) { logger.info("[MessageService] Recherche de conversation entre " + user1Id + " et " + user2Id); Users user1 = usersRepository.findById(user1Id); Users user2 = usersRepository.findById(user2Id); if (user1 == null || user2 == null) { throw new UserNotFoundException("Un ou plusieurs utilisateurs non trouvés"); } return conversationRepository.findBetweenUsers(user1, user2); } /** * Marque un message comme lu. * * @param messageId L'ID du message * @return Le message mis à jour */ @Transactional public Message markMessageAsRead(UUID messageId) { logger.info("[MessageService] Marquage du message comme lu : " + messageId); Message message = messageRepository.findById(messageId); if (message == null) { throw new IllegalArgumentException("Message non trouvé avec l'ID : " + messageId); } message.markAsRead(); messageRepository.persist(message); // Envoyer confirmation de lecture à l'expéditeur via WebSocket try { // Récupérer le destinataire (l'autre utilisateur de la conversation) Conversation conversation = message.getConversation(); UUID recipientId = conversation.getUser1().getId().equals(message.getSender().getId()) ? conversation.getUser2().getId() : conversation.getUser1().getId(); // Publier confirmation de lecture dans Kafka (v2.0) try { ChatMessageEvent readEvent = new ChatMessageEvent(); readEvent.setConversationId(conversation.getId().toString()); readEvent.setSenderId(recipientId.toString()); // Celui qui a lu readEvent.setRecipientId(message.getSender().getId().toString()); // L'expéditeur readEvent.setMessageId(message.getId().toString()); readEvent.setEventType("read_confirmation"); readEvent.setTimestamp(System.currentTimeMillis()); Map readEventMetadata = new HashMap<>(); readEventMetadata.put("readBy", recipientId.toString()); readEventMetadata.put("readAt", System.currentTimeMillis()); readEvent.setMetadata(readEventMetadata); OutgoingKafkaRecordMetadata readKafkaMetadata = OutgoingKafkaRecordMetadata.builder() .withKey(conversation.getId().toString()) .build(); chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of( readEvent, () -> java.util.concurrent.CompletableFuture.completedFuture(null), throwable -> java.util.concurrent.CompletableFuture.completedFuture(null) ).addMetadata(readKafkaMetadata)); logger.info("[MessageService] Confirmation de lecture publiée dans Kafka pour : " + message.getSender().getId()); } catch (Exception e) { logger.error("[MessageService] Erreur publication confirmation lecture : " + e.getMessage()); } } catch (Exception e) { logger.error("[MessageService] Erreur envoi confirmation lecture : " + e.getMessage()); } return message; } /** * Marque tous les messages d'une conversation comme lus pour un utilisateur. * * @param conversationId L'ID de la conversation * @param userId L'ID de l'utilisateur * @return Le nombre de messages marqués comme lus */ @Transactional public int markAllMessagesAsRead(UUID conversationId, UUID userId) { logger.info("[MessageService] Marquage de tous les messages comme lus pour la conversation " + conversationId); Conversation conversation = conversationRepository.findById(conversationId); if (conversation == null) { throw new IllegalArgumentException("Conversation non trouvée"); } Users user = usersRepository.findById(userId); if (user == null) { throw new UserNotFoundException("Utilisateur non trouvé"); } // Marquer les messages comme lus int count = messageRepository.markAllAsRead(conversationId, userId); // Mettre à jour le compteur de la conversation conversation.markAllAsReadForUser(user); conversationRepository.persist(conversation); return count; } /** * Compte le nombre total de messages non lus pour un utilisateur. * * @param userId L'ID de l'utilisateur * @return Le nombre de messages non lus */ public long getTotalUnreadCount(UUID userId) { logger.info("[MessageService] Récupération du nombre total de messages non lus pour l'utilisateur " + userId); return conversationRepository.countTotalUnreadMessages(userId); } /** * Supprime un message. * * @param messageId L'ID du message * @return true si le message a été supprimé */ @Transactional public boolean deleteMessage(UUID messageId) { logger.info("[MessageService] Suppression du message ID : " + messageId); Message message = messageRepository.findById(messageId); if (message == null) { return false; } messageRepository.delete(message); return true; } /** * Supprime une conversation et tous ses messages. * * @param conversationId L'ID de la conversation * @return true si la conversation a été supprimée */ @Transactional public boolean deleteConversation(UUID conversationId) { logger.info("[MessageService] Suppression de la conversation ID : " + conversationId); // Supprimer d'abord tous les messages messageRepository.deleteByConversationId(conversationId); // Puis supprimer la conversation return conversationRepository.deleteConversation(conversationId); } }