15 KiB
WebSocket + Kafka - Implémentation Complète
Date : 2026-03-14 Statut : ✅ Implémenté
📊 Architecture End-to-End
Backend Services
↓
KafkaEventProducer (publier events)
↓
Kafka Topics (unionflow.*)
↓
KafkaEventConsumer (consumer)
↓
WebSocketBroadcastService (broadcast)
↓
DashboardWebSocketEndpoint (/ws/dashboard)
↓
Mobile WebSocketService (Flutter)
↓
DashboardBloc (écouter events)
↓
UI Auto-refresh
🔧 Backend - Composants implémentés
1. Dépendances Maven
Fichier : pom.xml
<!-- Kafka Event Streaming -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
2. KafkaEventProducer
Fichier : src/main/java/dev/lions/unionflow/server/messaging/KafkaEventProducer.java
Méthodes :
publishApprovalPending(UUID approvalId, String organizationId, Map<String, Object> approvalData)publishApprovalApproved(...)publishApprovalRejected(...)publishDashboardStatsUpdate(...)publishKpiUpdate(...)publishUserNotification(...)publishBroadcastNotification(...)publishMemberCreated(...)publishMemberUpdated(...)publishContributionPaid(...)
Usage dans un service métier :
@ApplicationScoped
public class FinanceWorkflowService {
@Inject
KafkaEventProducer kafkaProducer;
public void approveTransaction(UUID approvalId) {
// ... logique métier ...
// Publier event Kafka
var approvalData = Map.of(
"id", approval.getId().toString(),
"transactionType", approval.getTransactionType().name(),
"amount", approval.getAmount(),
"currency", approval.getCurrency(),
"approvedBy", approval.getApprovedBy(),
"approvedAt", approval.getApprovedAt().toString()
);
kafkaProducer.publishApprovalApproved(
approvalId,
approval.getOrganizationId(),
approvalData
);
}
}
3. KafkaEventConsumer
Fichier : src/main/java/dev/lions/unionflow/server/messaging/KafkaEventConsumer.java
Méthodes :
consumeFinanceApprovals(Record<String, String> record)- Topic:unionflow.finance.approvalsconsumeDashboardStats(...)- Topic:unionflow.dashboard.statsconsumeNotifications(...)- Topic:unionflow.notifications.userconsumeMembersEvents(...)- Topic:unionflow.members.eventsconsumeContributionsEvents(...)- Topic:unionflow.contributions.events
Chaque consumer :
- Reçoit l'event depuis Kafka
- Log l'event
- Broadcast via
WebSocketBroadcastService
4. Configuration Kafka
Fichier : src/main/resources/application.properties
Channels configurés :
- 5 channels Producer (outgoing) :
*-out - 5 channels Consumer (incoming) :
*-in - Group ID :
unionflow-websocket-server - Bootstrap servers :
localhost:9092(dev) /KAFKA_BOOTSTRAP_SERVERSenv var (prod)
5. WebSocket Endpoint (existait déjà)
Fichier : src/main/java/dev/lions/unionflow/server/resource/DashboardWebSocketEndpoint.java
Endpoint : ws://localhost:8085/ws/dashboard
Features :
- @OnOpen : Connexion client
- @OnTextMessage : Heartbeat (ping/pong)
- @OnClose : Déconnexion
6. WebSocketBroadcastService (existait déjà)
Fichier : src/main/java/dev/lions/unionflow/server/service/WebSocketBroadcastService.java
Méthodes :
broadcast(String message)- Broadcast à tous les clients connectésbroadcastStatsUpdate(String jsonData)broadcastNewActivity(String jsonData)broadcastEventUpdate(String jsonData)broadcastNotification(String jsonData)
📱 Mobile - Composants implémentés
1. WebSocketService
Fichier : lib/core/websocket/websocket_service.dart
Features :
- ✅ Connexion WebSocket avec URL auto-détectée depuis
AppConfig.backendBaseUrl - ✅ Reconnexion automatique avec backoff exponentiel (2^n secondes, max 60s)
- ✅ Heartbeat (ping toutes les 30s)
- ✅ Stream des events typés (
Stream<WebSocketEvent>) - ✅ Stream statut connexion (
Stream<bool>) - ✅ Parsing events avec factory pattern
Types d'events :
FinanceApprovalEvent- Workflow approbationsDashboardStatsEvent- Stats dashboardNotificationEvent- NotificationsMemberEvent- Events membresContributionEvent- CotisationsGenericEvent- Events génériques
Usage :
// Injection
@singleton
class DashboardBloc extends Bloc<DashboardEvent, DashboardState> {
final WebSocketService webSocketService;
DashboardBloc({required this.webSocketService}) {
// Écouter les events WebSocket
webSocketService.eventStream.listen((event) {
if (event is DashboardStatsEvent) {
add(RefreshDashboardFromWebSocket(event.data));
}
});
// Écouter le statut de connexion
webSocketService.connectionStatusStream.listen((isConnected) {
if (isConnected) {
print('✅ WebSocket connecté');
} else {
print('❌ WebSocket déconnecté');
}
});
// Connexion au WebSocket
webSocketService.connect();
}
@override
Future<void> close() {
webSocketService.disconnect();
return super.close();
}
}
2. Enregistrement DI
Le WebSocketService est annoté @singleton, donc automatiquement enregistré par injectable.
Génération code :
flutter pub run build_runner build --delete-conflicting-outputs
3. Intégration dans DashboardBloc ✅
Fichier : lib/features/dashboard/presentation/bloc/dashboard_bloc.dart
Nouveaux events :
// dashboard_event.dart
class RefreshDashboardFromWebSocket extends DashboardEvent {
final Map<String, dynamic> data;
const RefreshDashboardFromWebSocket(this.data);
@override
List<Object> get props => [data];
}
class WebSocketConnectionChanged extends DashboardEvent {
final bool isConnected;
const WebSocketConnectionChanged(this.isConnected);
@override
List<Object> get props => [isConnected];
}
Implémentation DashboardBloc :
@injectable
class DashboardBloc extends Bloc<DashboardEvent, DashboardState> {
final WebSocketService webSocketService;
StreamSubscription<WebSocketEvent>? _webSocketEventSubscription;
StreamSubscription<bool>? _webSocketConnectionSubscription;
DashboardBloc({
required this.getDashboardData,
required this.getDashboardStats,
required this.getRecentActivities,
required this.getUpcomingEvents,
required this.webSocketService,
}) : super(DashboardInitial()) {
on<RefreshDashboardFromWebSocket>(_onRefreshDashboardFromWebSocket);
on<WebSocketConnectionChanged>(_onWebSocketConnectionChanged);
// Initialiser WebSocket
_initializeWebSocket();
}
void _initializeWebSocket() {
// Connexion au WebSocket
webSocketService.connect();
// Écouter les events WebSocket
_webSocketEventSubscription = webSocketService.eventStream.listen(
(event) {
// Dispatcher uniquement les events pertinents
if (event is DashboardStatsEvent ||
event is FinanceApprovalEvent ||
event is MemberEvent ||
event is ContributionEvent) {
add(RefreshDashboardFromWebSocket(event.data));
}
},
);
// Écouter le statut de connexion
_webSocketConnectionSubscription = webSocketService.connectionStatusStream.listen(
(isConnected) {
add(WebSocketConnectionChanged(isConnected));
},
);
}
Future<void> _onRefreshDashboardFromWebSocket(
RefreshDashboardFromWebSocket event,
Emitter<DashboardState> emit,
) async {
// Rafraîchir uniquement les stats (optimisation)
if (state is DashboardLoaded) {
final result = await getDashboardStats(...);
result.fold(
(failure) => {}, // Garder les données actuelles
(stats) {
final updatedData = currentData.copyWith(stats: stats);
emit(DashboardLoaded(updatedData));
},
);
}
}
@override
Future<void> close() {
_webSocketEventSubscription?.cancel();
_webSocketConnectionSubscription?.cancel();
webSocketService.disconnect();
return super.close();
}
}
Résultat : Le dashboard se rafraîchit automatiquement en temps réel lorsqu'un event Kafka est reçu via WebSocket.
🚀 Utilisation End-to-End
Scénario 1 : Approbation Finance
Backend
// FinanceWorkflowResource.java
@POST
@Path("/approvals/{id}/approve")
public Response approveTransaction(@PathParam("id") UUID id) {
// 1. Logique métier
transactionApprovalService.approve(id);
// 2. Publier event Kafka
var approval = repository.findById(id);
kafkaProducer.publishApprovalApproved(
id,
approval.getOrganizationId(),
Map.of(
"id", approval.getId().toString(),
"transactionType", approval.getTransactionType().name(),
"amount", approval.getAmount(),
"approvedBy", approval.getApprovedBy()
)
);
return Response.ok().build();
}
Flux
1. POST /api/v1/finance/approvals/{id}/approve
2. Backend → Kafka topic "unionflow.finance.approvals"
3. KafkaEventConsumer consomme event
4. WebSocketBroadcastService → Broadcast à tous les clients WS
5. Mobile WebSocketService reçoit event
6. DashboardBloc reçoit FinanceApprovalEvent
7. UI auto-refresh
Scénario 2 : Dashboard Stats Update
Backend
// DashboardService.java
@Scheduled(every = "10s")
public void updateDashboardStats() {
organizations.forEach(org -> {
var stats = calculateStats(org.getId());
// Publier stats via Kafka
kafkaProducer.publishDashboardStatsUpdate(
org.getId(),
Map.of(
"totalMembers", stats.getTotalMembers(),
"totalContributions", stats.getTotalContributions(),
"pendingApprovals", stats.getPendingApprovals()
)
);
});
}
Mobile
// DashboardBloc
on<RefreshDashboardFromWebSocket>((event, emit) {
final stats = DashboardStats.fromJson(event.data);
emit(DashboardLoaded(stats: stats));
});
// Écoute automatique dans le constructeur
webSocketService.eventStream.listen((event) {
if (event is DashboardStatsEvent) {
add(RefreshDashboardFromWebSocket(event.data));
}
});
🧪 Tests
Backend - Test Kafka Producer
@QuarkusTest
class KafkaEventProducerTest {
@Inject
KafkaEventProducer producer;
@Test
void shouldPublishApprovalEvent() {
var approvalData = Map.of("id", UUID.randomUUID().toString());
producer.publishApprovalPending(UUID.randomUUID(), "org-123", approvalData);
// Vérifier que l'event est publié dans Kafka
// (nécessite un test consumer ou Kafka testcontainer)
}
}
Mobile - Test WebSocketService
void main() {
group('WebSocketService', () {
late WebSocketService service;
setUp(() {
service = WebSocketService();
});
tearDown(() {
service.dispose();
});
test('should connect to WebSocket', () async {
service.connect();
await Future.delayed(const Duration(milliseconds: 500));
expect(service.isConnected, true);
});
test('should receive events', () async {
service.connect();
final events = <WebSocketEvent>[];
service.eventStream.listen((event) {
events.add(event);
});
// Simuler event depuis backend
// ...
await Future.delayed(const Duration(seconds: 2));
expect(events.isNotEmpty, true);
});
test('should reconnect on disconnection', () async {
service.connect();
await Future.delayed(const Duration(milliseconds: 500));
// Forcer déconnexion
service.disconnect();
expect(service.isConnected, false);
// Vérifier reconnexion automatique
await Future.delayed(const Duration(seconds: 3));
// La reconnexion devrait avoir eu lieu
});
});
}
🔧 Configuration Production
Backend
Kubernetes ConfigMap :
apiVersion: v1
kind: ConfigMap
metadata:
name: unionflow-backend-config
data:
KAFKA_BOOTSTRAP_SERVERS: "kafka-service.kafka.svc.cluster.local:9092"
Deployment :
env:
- name: KAFKA_BOOTSTRAP_SERVERS
valueFrom:
configMapKeyRef:
name: unionflow-backend-config
key: KAFKA_BOOTSTRAP_SERVERS
Mobile
AppConfig automatique :
static String get backendBaseUrl {
switch (environment) {
case 'prod':
return 'https://api.lions.dev/unionflow';
// ...
}
}
// WebSocket URL dérivée automatiquement:
// https://api.lions.dev/unionflow → wss://api.lions.dev/unionflow/ws/dashboard
📋 Checklist Déploiement
Backend
- Dépendances Kafka ajoutées au pom.xml
- KafkaEventProducer créé
- KafkaEventConsumer créé
- Configuration Kafka dans application.properties
- WebSocketEndpoint existe (déjà fait)
- WebSocketBroadcastService existe (déjà fait)
- Docker Compose avec Kafka (à tester localement)
- Tests Kafka Producer/Consumer
- Intégration dans services métier (FinanceWorkflowService, etc.)
Mobile
- Package web_socket_channel dans pubspec.yaml
- WebSocketService créé
- Events typés (FinanceApprovalEvent, DashboardStatsEvent, etc.)
- Reconnexion automatique
- Heartbeat
- Intégration dans DashboardBloc ✅
- Tests WebSocketService
- Tests intégration E2E
🚀 Prochaines étapes
-
Démarrer Kafka localement :
cd unionflow docker-compose up -d kafka zookeeper -
Tester backend :
cd unionflow-server-impl-quarkus ./mvnw quarkus:dev -
Tester mobile :
cd unionflow-mobile-apps flutter pub run build_runner build --delete-conflicting-outputs flutter run --dart-define=ENV=dev -
Publier un event test (via Swagger UI) :
- POST
/api/v1/finance/approvals/{id}/approve - Vérifier que l'event arrive sur mobile
- POST
-
Intégrer dans DashboardBloc :
- Écouter
webSocketService.eventStream - Dispatch events vers le BLoC
- Écouter
✅ Résultat attendu
- ✅ Backend publie events dans Kafka
- ✅ Kafka consumer broadcast via WebSocket
- ✅ Mobile reçoit events en temps réel
- ✅ Dashboard auto-refresh sans pull manuel
- ✅ Notifications push instantanées
- ✅ Reconnexion automatique si déconnexion
Implémenté par : Claude Sonnet 4.5 Date : 2026-03-14 Status : ✅ Backend + Mobile + DashboardBloc intégration COMPLETS - Prêt pour tests end-to-end