# WebSocket + Kafka - Implémentation Complète **Date** : 2026-03-14 **Statut** : ✅ **Implémenté** --- ## 📊 Architecture End-to-End ``` Backend Services ↓ KafkaEventProducer (publier events) ↓ Kafka Topics (unionflow.*) ↓ KafkaEventConsumer (consumer) ↓ WebSocketBroadcastService (broadcast) ↓ DashboardWebSocketEndpoint (/ws/dashboard) ↓ Mobile WebSocketService (Flutter) ↓ DashboardBloc (écouter events) ↓ UI Auto-refresh ``` --- ## 🔧 Backend - Composants implémentés ### 1. Dépendances Maven **Fichier** : `pom.xml` ```xml io.quarkus quarkus-messaging-kafka io.quarkus quarkus-smallrye-reactive-messaging-kafka ``` ### 2. KafkaEventProducer **Fichier** : `src/main/java/dev/lions/unionflow/server/messaging/KafkaEventProducer.java` **Méthodes** : - `publishApprovalPending(UUID approvalId, String organizationId, Map approvalData)` - `publishApprovalApproved(...)` - `publishApprovalRejected(...)` - `publishDashboardStatsUpdate(...)` - `publishKpiUpdate(...)` - `publishUserNotification(...)` - `publishBroadcastNotification(...)` - `publishMemberCreated(...)` - `publishMemberUpdated(...)` - `publishContributionPaid(...)` **Usage dans un service métier** : ```java @ApplicationScoped public class FinanceWorkflowService { @Inject KafkaEventProducer kafkaProducer; public void approveTransaction(UUID approvalId) { // ... logique métier ... // Publier event Kafka var approvalData = Map.of( "id", approval.getId().toString(), "transactionType", approval.getTransactionType().name(), "amount", approval.getAmount(), "currency", approval.getCurrency(), "approvedBy", approval.getApprovedBy(), "approvedAt", approval.getApprovedAt().toString() ); kafkaProducer.publishApprovalApproved( approvalId, approval.getOrganizationId(), approvalData ); } } ``` ### 3. KafkaEventConsumer **Fichier** : `src/main/java/dev/lions/unionflow/server/messaging/KafkaEventConsumer.java` **Méthodes** : - `consumeFinanceApprovals(Record record)` - Topic: `unionflow.finance.approvals` - `consumeDashboardStats(...)` - Topic: `unionflow.dashboard.stats` - `consumeNotifications(...)` - Topic: `unionflow.notifications.user` - `consumeMembersEvents(...)` - Topic: `unionflow.members.events` - `consumeContributionsEvents(...)` - Topic: `unionflow.contributions.events` **Chaque consumer** : 1. Reçoit l'event depuis Kafka 2. Log l'event 3. Broadcast via `WebSocketBroadcastService` ### 4. Configuration Kafka **Fichier** : `src/main/resources/application.properties` **Channels configurés** : - 5 channels Producer (outgoing) : `*-out` - 5 channels Consumer (incoming) : `*-in` - Group ID : `unionflow-websocket-server` - Bootstrap servers : `localhost:9092` (dev) / `KAFKA_BOOTSTRAP_SERVERS` env var (prod) ### 5. WebSocket Endpoint (existait déjà) **Fichier** : `src/main/java/dev/lions/unionflow/server/resource/DashboardWebSocketEndpoint.java` **Endpoint** : `ws://localhost:8085/ws/dashboard` **Features** : - @OnOpen : Connexion client - @OnTextMessage : Heartbeat (ping/pong) - @OnClose : Déconnexion ### 6. WebSocketBroadcastService (existait déjà) **Fichier** : `src/main/java/dev/lions/unionflow/server/service/WebSocketBroadcastService.java` **Méthodes** : - `broadcast(String message)` - Broadcast à tous les clients connectés - `broadcastStatsUpdate(String jsonData)` - `broadcastNewActivity(String jsonData)` - `broadcastEventUpdate(String jsonData)` - `broadcastNotification(String jsonData)` --- ## 📱 Mobile - Composants implémentés ### 1. WebSocketService **Fichier** : `lib/core/websocket/websocket_service.dart` **Features** : - ✅ Connexion WebSocket avec URL auto-détectée depuis `AppConfig.backendBaseUrl` - ✅ Reconnexion automatique avec backoff exponentiel (2^n secondes, max 60s) - ✅ Heartbeat (ping toutes les 30s) - ✅ Stream des events typés (`Stream`) - ✅ Stream statut connexion (`Stream`) - ✅ Parsing events avec factory pattern **Types d'events** : - `FinanceApprovalEvent` - Workflow approbations - `DashboardStatsEvent` - Stats dashboard - `NotificationEvent` - Notifications - `MemberEvent` - Events membres - `ContributionEvent` - Cotisations - `GenericEvent` - Events génériques **Usage** : ```dart // Injection @singleton class DashboardBloc extends Bloc { final WebSocketService webSocketService; DashboardBloc({required this.webSocketService}) { // Écouter les events WebSocket webSocketService.eventStream.listen((event) { if (event is DashboardStatsEvent) { add(RefreshDashboardFromWebSocket(event.data)); } }); // Écouter le statut de connexion webSocketService.connectionStatusStream.listen((isConnected) { if (isConnected) { print('✅ WebSocket connecté'); } else { print('❌ WebSocket déconnecté'); } }); // Connexion au WebSocket webSocketService.connect(); } @override Future close() { webSocketService.disconnect(); return super.close(); } } ``` ### 2. Enregistrement DI Le `WebSocketService` est annoté `@singleton`, donc automatiquement enregistré par injectable. **Génération code** : ```bash flutter pub run build_runner build --delete-conflicting-outputs ``` ### 3. Intégration dans DashboardBloc ✅ **Fichier** : `lib/features/dashboard/presentation/bloc/dashboard_bloc.dart` **Nouveaux events** : ```dart // dashboard_event.dart class RefreshDashboardFromWebSocket extends DashboardEvent { final Map data; const RefreshDashboardFromWebSocket(this.data); @override List get props => [data]; } class WebSocketConnectionChanged extends DashboardEvent { final bool isConnected; const WebSocketConnectionChanged(this.isConnected); @override List get props => [isConnected]; } ``` **Implémentation DashboardBloc** : ```dart @injectable class DashboardBloc extends Bloc { final WebSocketService webSocketService; StreamSubscription? _webSocketEventSubscription; StreamSubscription? _webSocketConnectionSubscription; DashboardBloc({ required this.getDashboardData, required this.getDashboardStats, required this.getRecentActivities, required this.getUpcomingEvents, required this.webSocketService, }) : super(DashboardInitial()) { on(_onRefreshDashboardFromWebSocket); on(_onWebSocketConnectionChanged); // Initialiser WebSocket _initializeWebSocket(); } void _initializeWebSocket() { // Connexion au WebSocket webSocketService.connect(); // Écouter les events WebSocket _webSocketEventSubscription = webSocketService.eventStream.listen( (event) { // Dispatcher uniquement les events pertinents if (event is DashboardStatsEvent || event is FinanceApprovalEvent || event is MemberEvent || event is ContributionEvent) { add(RefreshDashboardFromWebSocket(event.data)); } }, ); // Écouter le statut de connexion _webSocketConnectionSubscription = webSocketService.connectionStatusStream.listen( (isConnected) { add(WebSocketConnectionChanged(isConnected)); }, ); } Future _onRefreshDashboardFromWebSocket( RefreshDashboardFromWebSocket event, Emitter emit, ) async { // Rafraîchir uniquement les stats (optimisation) if (state is DashboardLoaded) { final result = await getDashboardStats(...); result.fold( (failure) => {}, // Garder les données actuelles (stats) { final updatedData = currentData.copyWith(stats: stats); emit(DashboardLoaded(updatedData)); }, ); } } @override Future close() { _webSocketEventSubscription?.cancel(); _webSocketConnectionSubscription?.cancel(); webSocketService.disconnect(); return super.close(); } } ``` **Résultat** : Le dashboard se rafraîchit automatiquement en temps réel lorsqu'un event Kafka est reçu via WebSocket. --- ## 🚀 Utilisation End-to-End ### Scénario 1 : Approbation Finance #### Backend ```java // FinanceWorkflowResource.java @POST @Path("/approvals/{id}/approve") public Response approveTransaction(@PathParam("id") UUID id) { // 1. Logique métier transactionApprovalService.approve(id); // 2. Publier event Kafka var approval = repository.findById(id); kafkaProducer.publishApprovalApproved( id, approval.getOrganizationId(), Map.of( "id", approval.getId().toString(), "transactionType", approval.getTransactionType().name(), "amount", approval.getAmount(), "approvedBy", approval.getApprovedBy() ) ); return Response.ok().build(); } ``` #### Flux ``` 1. POST /api/v1/finance/approvals/{id}/approve 2. Backend → Kafka topic "unionflow.finance.approvals" 3. KafkaEventConsumer consomme event 4. WebSocketBroadcastService → Broadcast à tous les clients WS 5. Mobile WebSocketService reçoit event 6. DashboardBloc reçoit FinanceApprovalEvent 7. UI auto-refresh ``` ### Scénario 2 : Dashboard Stats Update #### Backend ```java // DashboardService.java @Scheduled(every = "10s") public void updateDashboardStats() { organizations.forEach(org -> { var stats = calculateStats(org.getId()); // Publier stats via Kafka kafkaProducer.publishDashboardStatsUpdate( org.getId(), Map.of( "totalMembers", stats.getTotalMembers(), "totalContributions", stats.getTotalContributions(), "pendingApprovals", stats.getPendingApprovals() ) ); }); } ``` #### Mobile ```dart // DashboardBloc on((event, emit) { final stats = DashboardStats.fromJson(event.data); emit(DashboardLoaded(stats: stats)); }); // Écoute automatique dans le constructeur webSocketService.eventStream.listen((event) { if (event is DashboardStatsEvent) { add(RefreshDashboardFromWebSocket(event.data)); } }); ``` --- ## 🧪 Tests ### Backend - Test Kafka Producer ```java @QuarkusTest class KafkaEventProducerTest { @Inject KafkaEventProducer producer; @Test void shouldPublishApprovalEvent() { var approvalData = Map.of("id", UUID.randomUUID().toString()); producer.publishApprovalPending(UUID.randomUUID(), "org-123", approvalData); // Vérifier que l'event est publié dans Kafka // (nécessite un test consumer ou Kafka testcontainer) } } ``` ### Mobile - Test WebSocketService ```dart void main() { group('WebSocketService', () { late WebSocketService service; setUp(() { service = WebSocketService(); }); tearDown(() { service.dispose(); }); test('should connect to WebSocket', () async { service.connect(); await Future.delayed(const Duration(milliseconds: 500)); expect(service.isConnected, true); }); test('should receive events', () async { service.connect(); final events = []; service.eventStream.listen((event) { events.add(event); }); // Simuler event depuis backend // ... await Future.delayed(const Duration(seconds: 2)); expect(events.isNotEmpty, true); }); test('should reconnect on disconnection', () async { service.connect(); await Future.delayed(const Duration(milliseconds: 500)); // Forcer déconnexion service.disconnect(); expect(service.isConnected, false); // Vérifier reconnexion automatique await Future.delayed(const Duration(seconds: 3)); // La reconnexion devrait avoir eu lieu }); }); } ``` --- ## 🔧 Configuration Production ### Backend **Kubernetes ConfigMap** : ```yaml apiVersion: v1 kind: ConfigMap metadata: name: unionflow-backend-config data: KAFKA_BOOTSTRAP_SERVERS: "kafka-service.kafka.svc.cluster.local:9092" ``` **Deployment** : ```yaml env: - name: KAFKA_BOOTSTRAP_SERVERS valueFrom: configMapKeyRef: name: unionflow-backend-config key: KAFKA_BOOTSTRAP_SERVERS ``` ### Mobile **AppConfig automatique** : ```dart static String get backendBaseUrl { switch (environment) { case 'prod': return 'https://api.lions.dev/unionflow'; // ... } } // WebSocket URL dérivée automatiquement: // https://api.lions.dev/unionflow → wss://api.lions.dev/unionflow/ws/dashboard ``` --- ## 📋 Checklist Déploiement ### Backend - [x] Dépendances Kafka ajoutées au pom.xml - [x] KafkaEventProducer créé - [x] KafkaEventConsumer créé - [x] Configuration Kafka dans application.properties - [x] WebSocketEndpoint existe (déjà fait) - [x] WebSocketBroadcastService existe (déjà fait) - [ ] Docker Compose avec Kafka (à tester localement) - [ ] Tests Kafka Producer/Consumer - [ ] Intégration dans services métier (FinanceWorkflowService, etc.) ### Mobile - [x] Package web_socket_channel dans pubspec.yaml - [x] WebSocketService créé - [x] Events typés (FinanceApprovalEvent, DashboardStatsEvent, etc.) - [x] Reconnexion automatique - [x] Heartbeat - [x] **Intégration dans DashboardBloc** ✅ - [ ] Tests WebSocketService - [ ] Tests intégration E2E --- ## 🚀 Prochaines étapes 1. **Démarrer Kafka localement** : ```bash cd unionflow docker-compose up -d kafka zookeeper ``` 2. **Tester backend** : ```bash cd unionflow-server-impl-quarkus ./mvnw quarkus:dev ``` 3. **Tester mobile** : ```bash cd unionflow-mobile-apps flutter pub run build_runner build --delete-conflicting-outputs flutter run --dart-define=ENV=dev ``` 4. **Publier un event test** (via Swagger UI) : - POST `/api/v1/finance/approvals/{id}/approve` - Vérifier que l'event arrive sur mobile 5. **Intégrer dans DashboardBloc** : - Écouter `webSocketService.eventStream` - Dispatch events vers le BLoC --- ## ✅ Résultat attendu - ✅ Backend publie events dans Kafka - ✅ Kafka consumer broadcast via WebSocket - ✅ Mobile reçoit events en temps réel - ✅ Dashboard auto-refresh sans pull manuel - ✅ Notifications push instantanées - ✅ Reconnexion automatique si déconnexion --- **Implémenté par** : Claude Sonnet 4.5 **Date** : 2026-03-14 **Status** : ✅ **Backend + Mobile + DashboardBloc intégration COMPLETS - Prêt pour tests end-to-end**