package dev.lions.unionflow.server.messaging; import dev.lions.unionflow.server.service.WebSocketBroadcastService; import io.smallrye.reactive.messaging.kafka.Record; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.jboss.logging.Logger; /** * Consumer Kafka pour consommer les events et les broadcaster via WebSocket. *

* Ce consumer écoute tous les topics Kafka et transmet les events * en temps réel aux clients mobiles/web connectés via WebSocket. */ @ApplicationScoped public class KafkaEventConsumer { private static final Logger LOG = Logger.getLogger(KafkaEventConsumer.class); @Inject WebSocketBroadcastService webSocketBroadcastService; /** * Consomme les events d'approbations financières. */ @Incoming("finance-approvals-in") public void consumeFinanceApprovals(Record record) { LOG.debugf("Received finance approval event: key=%s, value=%s", record.key(), record.value()); try { // Broadcast aux clients WebSocket webSocketBroadcastService.broadcast(record.value()); } catch (Exception e) { LOG.errorf(e, "Failed to broadcast finance approval event"); } } /** * Consomme les mises à jour de stats dashboard. */ @Incoming("dashboard-stats-in") public void consumeDashboardStats(Record record) { LOG.debugf("Received dashboard stats event: key=%s", record.key()); try { webSocketBroadcastService.broadcast(record.value()); } catch (Exception e) { LOG.errorf(e, "Failed to broadcast dashboard stats event"); } } /** * Consomme les notifications. */ @Incoming("notifications-in") public void consumeNotifications(Record record) { LOG.debugf("Received notification event: key=%s", record.key()); try { webSocketBroadcastService.broadcast(record.value()); } catch (Exception e) { LOG.errorf(e, "Failed to broadcast notification event"); } } /** * Consomme les events membres. */ @Incoming("members-events-in") public void consumeMembersEvents(Record record) { LOG.debugf("Received member event: key=%s", record.key()); try { webSocketBroadcastService.broadcast(record.value()); } catch (Exception e) { LOG.errorf(e, "Failed to broadcast member event"); } } /** * Consomme les events cotisations. */ @Incoming("contributions-events-in") public void consumeContributionsEvents(Record record) { LOG.debugf("Received contribution event: key=%s", record.key()); try { webSocketBroadcastService.broadcast(record.value()); } catch (Exception e) { LOG.errorf(e, "Failed to broadcast contribution event"); } } /** * Consomme les messages de chat (nouveaux messages envoyés dans une conversation). * Broadcaste l'event en temps réel aux clients WebSocket pour mise à jour instantanée. */ @Incoming("chat-messages-in") public void consumeChatMessages(Record record) { LOG.debugf("Received chat message event: key=%s", record.key()); try { webSocketBroadcastService.broadcast(record.value()); } catch (Exception e) { LOG.errorf(e, "Failed to broadcast chat message event"); } } }