package dev.lions.unionflow.server.messaging; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.smallrye.reactive.messaging.kafka.Record; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Channel; import org.eclipse.microprofile.reactive.messaging.Emitter; import org.jboss.logging.Logger; import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * Producer Kafka pour publier des events UnionFlow. *

* Publie sur différents topics Kafka qui sont ensuite consommés * par le WebSocket server pour broadcast aux clients mobiles/web. */ @ApplicationScoped public class KafkaEventProducer { private static final Logger LOG = Logger.getLogger(KafkaEventProducer.class); @Inject ObjectMapper objectMapper; @Channel("finance-approvals-out") Emitter> financeApprovalsEmitter; @Channel("dashboard-stats-out") Emitter> dashboardStatsEmitter; @Channel("notifications-out") Emitter> notificationsEmitter; @Channel("members-events-out") Emitter> membersEventsEmitter; @Channel("contributions-events-out") Emitter> contributionsEventsEmitter; @Channel("chat-messages-out") Emitter> chatMessagesEmitter; /** * Publie un event d'approbation en attente. */ public void publishApprovalPending(UUID approvalId, String organizationId, Map approvalData) { var event = buildEvent("APPROVAL_PENDING", organizationId, approvalData); publishToChannel(financeApprovalsEmitter, approvalId.toString(), event, "finance-approvals"); } /** * Publie un event d'approbation approuvée. */ public void publishApprovalApproved(UUID approvalId, String organizationId, Map approvalData) { var event = buildEvent("APPROVAL_APPROVED", organizationId, approvalData); publishToChannel(financeApprovalsEmitter, approvalId.toString(), event, "finance-approvals"); } /** * Publie un event d'approbation rejetée. */ public void publishApprovalRejected(UUID approvalId, String organizationId, Map approvalData) { var event = buildEvent("APPROVAL_REJECTED", organizationId, approvalData); publishToChannel(financeApprovalsEmitter, approvalId.toString(), event, "finance-approvals"); } /** * Publie une mise à jour des stats dashboard. */ public void publishDashboardStatsUpdate(String organizationId, Map stats) { var event = buildEvent("DASHBOARD_STATS_UPDATED", organizationId, stats); publishToChannel(dashboardStatsEmitter, organizationId, event, "dashboard-stats"); } /** * Publie un KPI temps réel. */ public void publishKpiUpdate(String organizationId, Map kpiData) { var event = buildEvent("KPI_UPDATED", organizationId, kpiData); publishToChannel(dashboardStatsEmitter, organizationId, event, "dashboard-stats"); } /** * Publie une notification utilisateur. */ public void publishUserNotification(String userId, Map notificationData) { var event = buildEvent("USER_NOTIFICATION", null, notificationData); event.put("userId", userId); publishToChannel(notificationsEmitter, userId, event, "notifications"); } /** * Publie une notification broadcast (toute une organisation). */ public void publishBroadcastNotification(String organizationId, Map notificationData) { var event = buildEvent("BROADCAST_NOTIFICATION", organizationId, notificationData); publishToChannel(notificationsEmitter, organizationId, event, "notifications"); } /** * Publie un event de création de membre. */ public void publishMemberCreated(UUID memberId, String organizationId, Map memberData) { var event = buildEvent("MEMBER_CREATED", organizationId, memberData); publishToChannel(membersEventsEmitter, memberId.toString(), event, "members-events"); } /** * Publie un event de modification de membre. */ public void publishMemberUpdated(UUID memberId, String organizationId, Map memberData) { var event = buildEvent("MEMBER_UPDATED", organizationId, memberData); publishToChannel(membersEventsEmitter, memberId.toString(), event, "members-events"); } /** * Publie un event de désactivation de membre (soft delete). * Les consommateurs peuvent réagir : bloquer comptes épargne, annuler inscriptions, * reassigner approvals pending, nettoyer notifications, etc. */ public void publishMemberDeactivated(dev.lions.unionflow.server.entity.Membre membre) { if (membre == null || membre.getId() == null) return; Map data = new java.util.HashMap<>(); data.put("membreId", membre.getId().toString()); data.put("email", membre.getEmail()); data.put("nomComplet", membre.getNomComplet()); data.put("numeroMembre", membre.getNumeroMembre()); // organisationId principal (si présent) pour routage par org String orgId = membre.getMembresOrganisations() != null && !membre.getMembresOrganisations().isEmpty() && membre.getMembresOrganisations().get(0).getOrganisation() != null ? membre.getMembresOrganisations().get(0).getOrganisation().getId().toString() : ""; var event = buildEvent("MEMBER_DEACTIVATED", orgId, data); publishToChannel(membersEventsEmitter, membre.getId().toString(), event, "members-events"); } /** * Publie un event de cotisation payée. */ public void publishContributionPaid(UUID contributionId, String organizationId, Map contributionData) { var event = buildEvent("CONTRIBUTION_PAID", organizationId, contributionData); publishToChannel(contributionsEventsEmitter, contributionId.toString(), event, "contributions-events"); } /** * Publie un event de nouveau message de chat. * Les clients WebSocket de l'organisation sont notifiés pour rafraîchir leurs messages. */ public void publishNouveauMessage(UUID conversationId, String organizationId, Map messageData) { var event = buildEvent("NOUVEAU_MESSAGE", organizationId, messageData); publishToChannel(chatMessagesEmitter, conversationId.toString(), event, "chat-messages"); } /** * Construit un event avec structure standardisée. */ private Map buildEvent(String eventType, String organizationId, Map data) { var event = new HashMap(); event.put("eventType", eventType); event.put("timestamp", Instant.now().toString()); if (organizationId != null) { event.put("organizationId", organizationId); } event.put("data", data); return event; } /** * Publie un event sur un channel Kafka avec gestion d'erreur. */ private void publishToChannel(Emitter> emitter, String key, Map event, String topicName) { try { String eventJson = objectMapper.writeValueAsString(event); emitter.send(Record.of(key, eventJson)); LOG.debugf("Published event to %s: %s", topicName, eventJson); } catch (JsonProcessingException e) { LOG.errorf(e, "Failed to serialize event for topic %s", topicName); } catch (Exception e) { LOG.errorf(e, "Failed to publish event to topic %s", topicName); } } }