# WebSocket + Kafka - Implémentation Complète
**Date** : 2026-03-14
**Statut** : ✅ **Implémenté**
---
## 📊 Architecture End-to-End
```
Backend Services
↓
KafkaEventProducer (publier events)
↓
Kafka Topics (unionflow.*)
↓
KafkaEventConsumer (consumer)
↓
WebSocketBroadcastService (broadcast)
↓
DashboardWebSocketEndpoint (/ws/dashboard)
↓
Mobile WebSocketService (Flutter)
↓
DashboardBloc (écouter events)
↓
UI Auto-refresh
```
---
## 🔧 Backend - Composants implémentés
### 1. Dépendances Maven
**Fichier** : `pom.xml`
```xml
io.quarkus
quarkus-messaging-kafka
io.quarkus
quarkus-smallrye-reactive-messaging-kafka
```
### 2. KafkaEventProducer
**Fichier** : `src/main/java/dev/lions/unionflow/server/messaging/KafkaEventProducer.java`
**Méthodes** :
- `publishApprovalPending(UUID approvalId, String organizationId, Map approvalData)`
- `publishApprovalApproved(...)`
- `publishApprovalRejected(...)`
- `publishDashboardStatsUpdate(...)`
- `publishKpiUpdate(...)`
- `publishUserNotification(...)`
- `publishBroadcastNotification(...)`
- `publishMemberCreated(...)`
- `publishMemberUpdated(...)`
- `publishContributionPaid(...)`
**Usage dans un service métier** :
```java
@ApplicationScoped
public class FinanceWorkflowService {
@Inject
KafkaEventProducer kafkaProducer;
public void approveTransaction(UUID approvalId) {
// ... logique métier ...
// Publier event Kafka
var approvalData = Map.of(
"id", approval.getId().toString(),
"transactionType", approval.getTransactionType().name(),
"amount", approval.getAmount(),
"currency", approval.getCurrency(),
"approvedBy", approval.getApprovedBy(),
"approvedAt", approval.getApprovedAt().toString()
);
kafkaProducer.publishApprovalApproved(
approvalId,
approval.getOrganizationId(),
approvalData
);
}
}
```
### 3. KafkaEventConsumer
**Fichier** : `src/main/java/dev/lions/unionflow/server/messaging/KafkaEventConsumer.java`
**Méthodes** :
- `consumeFinanceApprovals(Record record)` - Topic: `unionflow.finance.approvals`
- `consumeDashboardStats(...)` - Topic: `unionflow.dashboard.stats`
- `consumeNotifications(...)` - Topic: `unionflow.notifications.user`
- `consumeMembersEvents(...)` - Topic: `unionflow.members.events`
- `consumeContributionsEvents(...)` - Topic: `unionflow.contributions.events`
**Chaque consumer** :
1. Reçoit l'event depuis Kafka
2. Log l'event
3. Broadcast via `WebSocketBroadcastService`
### 4. Configuration Kafka
**Fichier** : `src/main/resources/application.properties`
**Channels configurés** :
- 5 channels Producer (outgoing) : `*-out`
- 5 channels Consumer (incoming) : `*-in`
- Group ID : `unionflow-websocket-server`
- Bootstrap servers : `localhost:9092` (dev) / `KAFKA_BOOTSTRAP_SERVERS` env var (prod)
### 5. WebSocket Endpoint (existait déjà)
**Fichier** : `src/main/java/dev/lions/unionflow/server/resource/DashboardWebSocketEndpoint.java`
**Endpoint** : `ws://localhost:8085/ws/dashboard`
**Features** :
- @OnOpen : Connexion client
- @OnTextMessage : Heartbeat (ping/pong)
- @OnClose : Déconnexion
### 6. WebSocketBroadcastService (existait déjà)
**Fichier** : `src/main/java/dev/lions/unionflow/server/service/WebSocketBroadcastService.java`
**Méthodes** :
- `broadcast(String message)` - Broadcast à tous les clients connectés
- `broadcastStatsUpdate(String jsonData)`
- `broadcastNewActivity(String jsonData)`
- `broadcastEventUpdate(String jsonData)`
- `broadcastNotification(String jsonData)`
---
## 📱 Mobile - Composants implémentés
### 1. WebSocketService
**Fichier** : `lib/core/websocket/websocket_service.dart`
**Features** :
- ✅ Connexion WebSocket avec URL auto-détectée depuis `AppConfig.backendBaseUrl`
- ✅ Reconnexion automatique avec backoff exponentiel (2^n secondes, max 60s)
- ✅ Heartbeat (ping toutes les 30s)
- ✅ Stream des events typés (`Stream`)
- ✅ Stream statut connexion (`Stream`)
- ✅ Parsing events avec factory pattern
**Types d'events** :
- `FinanceApprovalEvent` - Workflow approbations
- `DashboardStatsEvent` - Stats dashboard
- `NotificationEvent` - Notifications
- `MemberEvent` - Events membres
- `ContributionEvent` - Cotisations
- `GenericEvent` - Events génériques
**Usage** :
```dart
// Injection
@singleton
class DashboardBloc extends Bloc {
final WebSocketService webSocketService;
DashboardBloc({required this.webSocketService}) {
// Écouter les events WebSocket
webSocketService.eventStream.listen((event) {
if (event is DashboardStatsEvent) {
add(RefreshDashboardFromWebSocket(event.data));
}
});
// Écouter le statut de connexion
webSocketService.connectionStatusStream.listen((isConnected) {
if (isConnected) {
print('✅ WebSocket connecté');
} else {
print('❌ WebSocket déconnecté');
}
});
// Connexion au WebSocket
webSocketService.connect();
}
@override
Future close() {
webSocketService.disconnect();
return super.close();
}
}
```
### 2. Enregistrement DI
Le `WebSocketService` est annoté `@singleton`, donc automatiquement enregistré par injectable.
**Génération code** :
```bash
flutter pub run build_runner build --delete-conflicting-outputs
```
### 3. Intégration dans DashboardBloc ✅
**Fichier** : `lib/features/dashboard/presentation/bloc/dashboard_bloc.dart`
**Nouveaux events** :
```dart
// dashboard_event.dart
class RefreshDashboardFromWebSocket extends DashboardEvent {
final Map data;
const RefreshDashboardFromWebSocket(this.data);
@override
List