Sync: code local unifié
Synchronisation du code source local (fait foi). Signed-off-by: lions dev Team
This commit is contained in:
@@ -0,0 +1,155 @@
|
||||
package dev.lions.unionflow.server.messaging;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.smallrye.reactive.messaging.kafka.Record;
|
||||
import jakarta.enterprise.context.ApplicationScoped;
|
||||
import jakarta.inject.Inject;
|
||||
import org.eclipse.microprofile.reactive.messaging.Channel;
|
||||
import org.eclipse.microprofile.reactive.messaging.Emitter;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Producer Kafka pour publier des events UnionFlow.
|
||||
* <p>
|
||||
* Publie sur différents topics Kafka qui sont ensuite consommés
|
||||
* par le WebSocket server pour broadcast aux clients mobiles/web.
|
||||
*/
|
||||
@ApplicationScoped
|
||||
public class KafkaEventProducer {
|
||||
|
||||
private static final Logger LOG = Logger.getLogger(KafkaEventProducer.class);
|
||||
|
||||
@Inject
|
||||
ObjectMapper objectMapper;
|
||||
|
||||
@Channel("finance-approvals-out")
|
||||
Emitter<Record<String, String>> financeApprovalsEmitter;
|
||||
|
||||
@Channel("dashboard-stats-out")
|
||||
Emitter<Record<String, String>> dashboardStatsEmitter;
|
||||
|
||||
@Channel("notifications-out")
|
||||
Emitter<Record<String, String>> notificationsEmitter;
|
||||
|
||||
@Channel("members-events-out")
|
||||
Emitter<Record<String, String>> membersEventsEmitter;
|
||||
|
||||
@Channel("contributions-events-out")
|
||||
Emitter<Record<String, String>> contributionsEventsEmitter;
|
||||
|
||||
/**
|
||||
* Publie un event d'approbation en attente.
|
||||
*/
|
||||
public void publishApprovalPending(UUID approvalId, String organizationId, Map<String, Object> approvalData) {
|
||||
var event = buildEvent("APPROVAL_PENDING", organizationId, approvalData);
|
||||
publishToChannel(financeApprovalsEmitter, approvalId.toString(), event, "finance-approvals");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie un event d'approbation approuvée.
|
||||
*/
|
||||
public void publishApprovalApproved(UUID approvalId, String organizationId, Map<String, Object> approvalData) {
|
||||
var event = buildEvent("APPROVAL_APPROVED", organizationId, approvalData);
|
||||
publishToChannel(financeApprovalsEmitter, approvalId.toString(), event, "finance-approvals");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie un event d'approbation rejetée.
|
||||
*/
|
||||
public void publishApprovalRejected(UUID approvalId, String organizationId, Map<String, Object> approvalData) {
|
||||
var event = buildEvent("APPROVAL_REJECTED", organizationId, approvalData);
|
||||
publishToChannel(financeApprovalsEmitter, approvalId.toString(), event, "finance-approvals");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie une mise à jour des stats dashboard.
|
||||
*/
|
||||
public void publishDashboardStatsUpdate(String organizationId, Map<String, Object> stats) {
|
||||
var event = buildEvent("DASHBOARD_STATS_UPDATED", organizationId, stats);
|
||||
publishToChannel(dashboardStatsEmitter, organizationId, event, "dashboard-stats");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie un KPI temps réel.
|
||||
*/
|
||||
public void publishKpiUpdate(String organizationId, Map<String, Object> kpiData) {
|
||||
var event = buildEvent("KPI_UPDATED", organizationId, kpiData);
|
||||
publishToChannel(dashboardStatsEmitter, organizationId, event, "dashboard-stats");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie une notification utilisateur.
|
||||
*/
|
||||
public void publishUserNotification(String userId, Map<String, Object> notificationData) {
|
||||
var event = buildEvent("USER_NOTIFICATION", null, notificationData);
|
||||
event.put("userId", userId);
|
||||
publishToChannel(notificationsEmitter, userId, event, "notifications");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie une notification broadcast (toute une organisation).
|
||||
*/
|
||||
public void publishBroadcastNotification(String organizationId, Map<String, Object> notificationData) {
|
||||
var event = buildEvent("BROADCAST_NOTIFICATION", organizationId, notificationData);
|
||||
publishToChannel(notificationsEmitter, organizationId, event, "notifications");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie un event de création de membre.
|
||||
*/
|
||||
public void publishMemberCreated(UUID memberId, String organizationId, Map<String, Object> memberData) {
|
||||
var event = buildEvent("MEMBER_CREATED", organizationId, memberData);
|
||||
publishToChannel(membersEventsEmitter, memberId.toString(), event, "members-events");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie un event de modification de membre.
|
||||
*/
|
||||
public void publishMemberUpdated(UUID memberId, String organizationId, Map<String, Object> memberData) {
|
||||
var event = buildEvent("MEMBER_UPDATED", organizationId, memberData);
|
||||
publishToChannel(membersEventsEmitter, memberId.toString(), event, "members-events");
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie un event de cotisation payée.
|
||||
*/
|
||||
public void publishContributionPaid(UUID contributionId, String organizationId, Map<String, Object> contributionData) {
|
||||
var event = buildEvent("CONTRIBUTION_PAID", organizationId, contributionData);
|
||||
publishToChannel(contributionsEventsEmitter, contributionId.toString(), event, "contributions-events");
|
||||
}
|
||||
|
||||
/**
|
||||
* Construit un event avec structure standardisée.
|
||||
*/
|
||||
private Map<String, Object> buildEvent(String eventType, String organizationId, Map<String, Object> data) {
|
||||
var event = new HashMap<String, Object>();
|
||||
event.put("eventType", eventType);
|
||||
event.put("timestamp", Instant.now().toString());
|
||||
if (organizationId != null) {
|
||||
event.put("organizationId", organizationId);
|
||||
}
|
||||
event.put("data", data);
|
||||
return event;
|
||||
}
|
||||
|
||||
/**
|
||||
* Publie un event sur un channel Kafka avec gestion d'erreur.
|
||||
*/
|
||||
private void publishToChannel(Emitter<Record<String, String>> emitter, String key, Map<String, Object> event, String topicName) {
|
||||
try {
|
||||
String eventJson = objectMapper.writeValueAsString(event);
|
||||
emitter.send(Record.of(key, eventJson));
|
||||
LOG.debugf("Published event to %s: %s", topicName, eventJson);
|
||||
} catch (JsonProcessingException e) {
|
||||
LOG.errorf(e, "Failed to serialize event for topic %s", topicName);
|
||||
} catch (Exception e) {
|
||||
LOG.errorf(e, "Failed to publish event to topic %s", topicName);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user