Files
unionflow-mobile-apps/docs/WEBSOCKET_IMPLEMENTATION.md
dahoud d094d6db9c Initial commit: unionflow-mobile-apps
Application Flutter complète (sans build artifacts).

Signed-off-by: lions dev Team
2026-03-15 16:30:08 +00:00

598 lines
15 KiB
Markdown

# WebSocket + Kafka - Implémentation Complète
**Date** : 2026-03-14
**Statut** : ✅ **Implémenté**
---
## 📊 Architecture End-to-End
```
Backend Services
KafkaEventProducer (publier events)
Kafka Topics (unionflow.*)
KafkaEventConsumer (consumer)
WebSocketBroadcastService (broadcast)
DashboardWebSocketEndpoint (/ws/dashboard)
Mobile WebSocketService (Flutter)
DashboardBloc (écouter events)
UI Auto-refresh
```
---
## 🔧 Backend - Composants implémentés
### 1. Dépendances Maven
**Fichier** : `pom.xml`
```xml
<!-- Kafka Event Streaming -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
```
### 2. KafkaEventProducer
**Fichier** : `src/main/java/dev/lions/unionflow/server/messaging/KafkaEventProducer.java`
**Méthodes** :
- `publishApprovalPending(UUID approvalId, String organizationId, Map<String, Object> approvalData)`
- `publishApprovalApproved(...)`
- `publishApprovalRejected(...)`
- `publishDashboardStatsUpdate(...)`
- `publishKpiUpdate(...)`
- `publishUserNotification(...)`
- `publishBroadcastNotification(...)`
- `publishMemberCreated(...)`
- `publishMemberUpdated(...)`
- `publishContributionPaid(...)`
**Usage dans un service métier** :
```java
@ApplicationScoped
public class FinanceWorkflowService {
@Inject
KafkaEventProducer kafkaProducer;
public void approveTransaction(UUID approvalId) {
// ... logique métier ...
// Publier event Kafka
var approvalData = Map.of(
"id", approval.getId().toString(),
"transactionType", approval.getTransactionType().name(),
"amount", approval.getAmount(),
"currency", approval.getCurrency(),
"approvedBy", approval.getApprovedBy(),
"approvedAt", approval.getApprovedAt().toString()
);
kafkaProducer.publishApprovalApproved(
approvalId,
approval.getOrganizationId(),
approvalData
);
}
}
```
### 3. KafkaEventConsumer
**Fichier** : `src/main/java/dev/lions/unionflow/server/messaging/KafkaEventConsumer.java`
**Méthodes** :
- `consumeFinanceApprovals(Record<String, String> record)` - Topic: `unionflow.finance.approvals`
- `consumeDashboardStats(...)` - Topic: `unionflow.dashboard.stats`
- `consumeNotifications(...)` - Topic: `unionflow.notifications.user`
- `consumeMembersEvents(...)` - Topic: `unionflow.members.events`
- `consumeContributionsEvents(...)` - Topic: `unionflow.contributions.events`
**Chaque consumer** :
1. Reçoit l'event depuis Kafka
2. Log l'event
3. Broadcast via `WebSocketBroadcastService`
### 4. Configuration Kafka
**Fichier** : `src/main/resources/application.properties`
**Channels configurés** :
- 5 channels Producer (outgoing) : `*-out`
- 5 channels Consumer (incoming) : `*-in`
- Group ID : `unionflow-websocket-server`
- Bootstrap servers : `localhost:9092` (dev) / `KAFKA_BOOTSTRAP_SERVERS` env var (prod)
### 5. WebSocket Endpoint (existait déjà)
**Fichier** : `src/main/java/dev/lions/unionflow/server/resource/DashboardWebSocketEndpoint.java`
**Endpoint** : `ws://localhost:8085/ws/dashboard`
**Features** :
- @OnOpen : Connexion client
- @OnTextMessage : Heartbeat (ping/pong)
- @OnClose : Déconnexion
### 6. WebSocketBroadcastService (existait déjà)
**Fichier** : `src/main/java/dev/lions/unionflow/server/service/WebSocketBroadcastService.java`
**Méthodes** :
- `broadcast(String message)` - Broadcast à tous les clients connectés
- `broadcastStatsUpdate(String jsonData)`
- `broadcastNewActivity(String jsonData)`
- `broadcastEventUpdate(String jsonData)`
- `broadcastNotification(String jsonData)`
---
## 📱 Mobile - Composants implémentés
### 1. WebSocketService
**Fichier** : `lib/core/websocket/websocket_service.dart`
**Features** :
- ✅ Connexion WebSocket avec URL auto-détectée depuis `AppConfig.backendBaseUrl`
- ✅ Reconnexion automatique avec backoff exponentiel (2^n secondes, max 60s)
- ✅ Heartbeat (ping toutes les 30s)
- ✅ Stream des events typés (`Stream<WebSocketEvent>`)
- ✅ Stream statut connexion (`Stream<bool>`)
- ✅ Parsing events avec factory pattern
**Types d'events** :
- `FinanceApprovalEvent` - Workflow approbations
- `DashboardStatsEvent` - Stats dashboard
- `NotificationEvent` - Notifications
- `MemberEvent` - Events membres
- `ContributionEvent` - Cotisations
- `GenericEvent` - Events génériques
**Usage** :
```dart
// Injection
@singleton
class DashboardBloc extends Bloc<DashboardEvent, DashboardState> {
final WebSocketService webSocketService;
DashboardBloc({required this.webSocketService}) {
// Écouter les events WebSocket
webSocketService.eventStream.listen((event) {
if (event is DashboardStatsEvent) {
add(RefreshDashboardFromWebSocket(event.data));
}
});
// Écouter le statut de connexion
webSocketService.connectionStatusStream.listen((isConnected) {
if (isConnected) {
print('✅ WebSocket connecté');
} else {
print('❌ WebSocket déconnecté');
}
});
// Connexion au WebSocket
webSocketService.connect();
}
@override
Future<void> close() {
webSocketService.disconnect();
return super.close();
}
}
```
### 2. Enregistrement DI
Le `WebSocketService` est annoté `@singleton`, donc automatiquement enregistré par injectable.
**Génération code** :
```bash
flutter pub run build_runner build --delete-conflicting-outputs
```
### 3. Intégration dans DashboardBloc ✅
**Fichier** : `lib/features/dashboard/presentation/bloc/dashboard_bloc.dart`
**Nouveaux events** :
```dart
// dashboard_event.dart
class RefreshDashboardFromWebSocket extends DashboardEvent {
final Map<String, dynamic> data;
const RefreshDashboardFromWebSocket(this.data);
@override
List<Object> get props => [data];
}
class WebSocketConnectionChanged extends DashboardEvent {
final bool isConnected;
const WebSocketConnectionChanged(this.isConnected);
@override
List<Object> get props => [isConnected];
}
```
**Implémentation DashboardBloc** :
```dart
@injectable
class DashboardBloc extends Bloc<DashboardEvent, DashboardState> {
final WebSocketService webSocketService;
StreamSubscription<WebSocketEvent>? _webSocketEventSubscription;
StreamSubscription<bool>? _webSocketConnectionSubscription;
DashboardBloc({
required this.getDashboardData,
required this.getDashboardStats,
required this.getRecentActivities,
required this.getUpcomingEvents,
required this.webSocketService,
}) : super(DashboardInitial()) {
on<RefreshDashboardFromWebSocket>(_onRefreshDashboardFromWebSocket);
on<WebSocketConnectionChanged>(_onWebSocketConnectionChanged);
// Initialiser WebSocket
_initializeWebSocket();
}
void _initializeWebSocket() {
// Connexion au WebSocket
webSocketService.connect();
// Écouter les events WebSocket
_webSocketEventSubscription = webSocketService.eventStream.listen(
(event) {
// Dispatcher uniquement les events pertinents
if (event is DashboardStatsEvent ||
event is FinanceApprovalEvent ||
event is MemberEvent ||
event is ContributionEvent) {
add(RefreshDashboardFromWebSocket(event.data));
}
},
);
// Écouter le statut de connexion
_webSocketConnectionSubscription = webSocketService.connectionStatusStream.listen(
(isConnected) {
add(WebSocketConnectionChanged(isConnected));
},
);
}
Future<void> _onRefreshDashboardFromWebSocket(
RefreshDashboardFromWebSocket event,
Emitter<DashboardState> emit,
) async {
// Rafraîchir uniquement les stats (optimisation)
if (state is DashboardLoaded) {
final result = await getDashboardStats(...);
result.fold(
(failure) => {}, // Garder les données actuelles
(stats) {
final updatedData = currentData.copyWith(stats: stats);
emit(DashboardLoaded(updatedData));
},
);
}
}
@override
Future<void> close() {
_webSocketEventSubscription?.cancel();
_webSocketConnectionSubscription?.cancel();
webSocketService.disconnect();
return super.close();
}
}
```
**Résultat** : Le dashboard se rafraîchit automatiquement en temps réel lorsqu'un event Kafka est reçu via WebSocket.
---
## 🚀 Utilisation End-to-End
### Scénario 1 : Approbation Finance
#### Backend
```java
// FinanceWorkflowResource.java
@POST
@Path("/approvals/{id}/approve")
public Response approveTransaction(@PathParam("id") UUID id) {
// 1. Logique métier
transactionApprovalService.approve(id);
// 2. Publier event Kafka
var approval = repository.findById(id);
kafkaProducer.publishApprovalApproved(
id,
approval.getOrganizationId(),
Map.of(
"id", approval.getId().toString(),
"transactionType", approval.getTransactionType().name(),
"amount", approval.getAmount(),
"approvedBy", approval.getApprovedBy()
)
);
return Response.ok().build();
}
```
#### Flux
```
1. POST /api/v1/finance/approvals/{id}/approve
2. Backend → Kafka topic "unionflow.finance.approvals"
3. KafkaEventConsumer consomme event
4. WebSocketBroadcastService → Broadcast à tous les clients WS
5. Mobile WebSocketService reçoit event
6. DashboardBloc reçoit FinanceApprovalEvent
7. UI auto-refresh
```
### Scénario 2 : Dashboard Stats Update
#### Backend
```java
// DashboardService.java
@Scheduled(every = "10s")
public void updateDashboardStats() {
organizations.forEach(org -> {
var stats = calculateStats(org.getId());
// Publier stats via Kafka
kafkaProducer.publishDashboardStatsUpdate(
org.getId(),
Map.of(
"totalMembers", stats.getTotalMembers(),
"totalContributions", stats.getTotalContributions(),
"pendingApprovals", stats.getPendingApprovals()
)
);
});
}
```
#### Mobile
```dart
// DashboardBloc
on<RefreshDashboardFromWebSocket>((event, emit) {
final stats = DashboardStats.fromJson(event.data);
emit(DashboardLoaded(stats: stats));
});
// Écoute automatique dans le constructeur
webSocketService.eventStream.listen((event) {
if (event is DashboardStatsEvent) {
add(RefreshDashboardFromWebSocket(event.data));
}
});
```
---
## 🧪 Tests
### Backend - Test Kafka Producer
```java
@QuarkusTest
class KafkaEventProducerTest {
@Inject
KafkaEventProducer producer;
@Test
void shouldPublishApprovalEvent() {
var approvalData = Map.of("id", UUID.randomUUID().toString());
producer.publishApprovalPending(UUID.randomUUID(), "org-123", approvalData);
// Vérifier que l'event est publié dans Kafka
// (nécessite un test consumer ou Kafka testcontainer)
}
}
```
### Mobile - Test WebSocketService
```dart
void main() {
group('WebSocketService', () {
late WebSocketService service;
setUp(() {
service = WebSocketService();
});
tearDown(() {
service.dispose();
});
test('should connect to WebSocket', () async {
service.connect();
await Future.delayed(const Duration(milliseconds: 500));
expect(service.isConnected, true);
});
test('should receive events', () async {
service.connect();
final events = <WebSocketEvent>[];
service.eventStream.listen((event) {
events.add(event);
});
// Simuler event depuis backend
// ...
await Future.delayed(const Duration(seconds: 2));
expect(events.isNotEmpty, true);
});
test('should reconnect on disconnection', () async {
service.connect();
await Future.delayed(const Duration(milliseconds: 500));
// Forcer déconnexion
service.disconnect();
expect(service.isConnected, false);
// Vérifier reconnexion automatique
await Future.delayed(const Duration(seconds: 3));
// La reconnexion devrait avoir eu lieu
});
});
}
```
---
## 🔧 Configuration Production
### Backend
**Kubernetes ConfigMap** :
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: unionflow-backend-config
data:
KAFKA_BOOTSTRAP_SERVERS: "kafka-service.kafka.svc.cluster.local:9092"
```
**Deployment** :
```yaml
env:
- name: KAFKA_BOOTSTRAP_SERVERS
valueFrom:
configMapKeyRef:
name: unionflow-backend-config
key: KAFKA_BOOTSTRAP_SERVERS
```
### Mobile
**AppConfig automatique** :
```dart
static String get backendBaseUrl {
switch (environment) {
case 'prod':
return 'https://api.lions.dev/unionflow';
// ...
}
}
// WebSocket URL dérivée automatiquement:
// https://api.lions.dev/unionflow → wss://api.lions.dev/unionflow/ws/dashboard
```
---
## 📋 Checklist Déploiement
### Backend
- [x] Dépendances Kafka ajoutées au pom.xml
- [x] KafkaEventProducer créé
- [x] KafkaEventConsumer créé
- [x] Configuration Kafka dans application.properties
- [x] WebSocketEndpoint existe (déjà fait)
- [x] WebSocketBroadcastService existe (déjà fait)
- [ ] Docker Compose avec Kafka (à tester localement)
- [ ] Tests Kafka Producer/Consumer
- [ ] Intégration dans services métier (FinanceWorkflowService, etc.)
### Mobile
- [x] Package web_socket_channel dans pubspec.yaml
- [x] WebSocketService créé
- [x] Events typés (FinanceApprovalEvent, DashboardStatsEvent, etc.)
- [x] Reconnexion automatique
- [x] Heartbeat
- [x] **Intégration dans DashboardBloc**
- [ ] Tests WebSocketService
- [ ] Tests intégration E2E
---
## 🚀 Prochaines étapes
1. **Démarrer Kafka localement** :
```bash
cd unionflow
docker-compose up -d kafka zookeeper
```
2. **Tester backend** :
```bash
cd unionflow-server-impl-quarkus
./mvnw quarkus:dev
```
3. **Tester mobile** :
```bash
cd unionflow-mobile-apps
flutter pub run build_runner build --delete-conflicting-outputs
flutter run --dart-define=ENV=dev
```
4. **Publier un event test** (via Swagger UI) :
- POST `/api/v1/finance/approvals/{id}/approve`
- Vérifier que l'event arrive sur mobile
5. **Intégrer dans DashboardBloc** :
- Écouter `webSocketService.eventStream`
- Dispatch events vers le BLoC
---
## ✅ Résultat attendu
- ✅ Backend publie events dans Kafka
- ✅ Kafka consumer broadcast via WebSocket
- ✅ Mobile reçoit events en temps réel
- ✅ Dashboard auto-refresh sans pull manuel
- ✅ Notifications push instantanées
- ✅ Reconnexion automatique si déconnexion
---
**Implémenté par** : Claude Sonnet 4.5
**Date** : 2026-03-14
**Status** : ✅ **Backend + Mobile + DashboardBloc intégration COMPLETS - Prêt pour tests end-to-end**