Files
unionflow-mobile-apps/docs/WEBSOCKET_IMPLEMENTATION.md
dahoud d094d6db9c Initial commit: unionflow-mobile-apps
Application Flutter complète (sans build artifacts).

Signed-off-by: lions dev Team
2026-03-15 16:30:08 +00:00

15 KiB

WebSocket + Kafka - Implémentation Complète

Date : 2026-03-14 Statut : Implémenté


📊 Architecture End-to-End

Backend Services
     ↓
KafkaEventProducer (publier events)
     ↓
Kafka Topics (unionflow.*)
     ↓
KafkaEventConsumer (consumer)
     ↓
WebSocketBroadcastService (broadcast)
     ↓
DashboardWebSocketEndpoint (/ws/dashboard)
     ↓
Mobile WebSocketService (Flutter)
     ↓
DashboardBloc (écouter events)
     ↓
UI Auto-refresh

🔧 Backend - Composants implémentés

1. Dépendances Maven

Fichier : pom.xml

<!-- Kafka Event Streaming -->
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-messaging-kafka</artifactId>
</dependency>
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

2. KafkaEventProducer

Fichier : src/main/java/dev/lions/unionflow/server/messaging/KafkaEventProducer.java

Méthodes :

  • publishApprovalPending(UUID approvalId, String organizationId, Map<String, Object> approvalData)
  • publishApprovalApproved(...)
  • publishApprovalRejected(...)
  • publishDashboardStatsUpdate(...)
  • publishKpiUpdate(...)
  • publishUserNotification(...)
  • publishBroadcastNotification(...)
  • publishMemberCreated(...)
  • publishMemberUpdated(...)
  • publishContributionPaid(...)

Usage dans un service métier :

@ApplicationScoped
public class FinanceWorkflowService {

    @Inject
    KafkaEventProducer kafkaProducer;

    public void approveTransaction(UUID approvalId) {
        // ... logique métier ...

        // Publier event Kafka
        var approvalData = Map.of(
            "id", approval.getId().toString(),
            "transactionType", approval.getTransactionType().name(),
            "amount", approval.getAmount(),
            "currency", approval.getCurrency(),
            "approvedBy", approval.getApprovedBy(),
            "approvedAt", approval.getApprovedAt().toString()
        );

        kafkaProducer.publishApprovalApproved(
            approvalId,
            approval.getOrganizationId(),
            approvalData
        );
    }
}

3. KafkaEventConsumer

Fichier : src/main/java/dev/lions/unionflow/server/messaging/KafkaEventConsumer.java

Méthodes :

  • consumeFinanceApprovals(Record<String, String> record) - Topic: unionflow.finance.approvals
  • consumeDashboardStats(...) - Topic: unionflow.dashboard.stats
  • consumeNotifications(...) - Topic: unionflow.notifications.user
  • consumeMembersEvents(...) - Topic: unionflow.members.events
  • consumeContributionsEvents(...) - Topic: unionflow.contributions.events

Chaque consumer :

  1. Reçoit l'event depuis Kafka
  2. Log l'event
  3. Broadcast via WebSocketBroadcastService

4. Configuration Kafka

Fichier : src/main/resources/application.properties

Channels configurés :

  • 5 channels Producer (outgoing) : *-out
  • 5 channels Consumer (incoming) : *-in
  • Group ID : unionflow-websocket-server
  • Bootstrap servers : localhost:9092 (dev) / KAFKA_BOOTSTRAP_SERVERS env var (prod)

5. WebSocket Endpoint (existait déjà)

Fichier : src/main/java/dev/lions/unionflow/server/resource/DashboardWebSocketEndpoint.java

Endpoint : ws://localhost:8085/ws/dashboard

Features :

  • @OnOpen : Connexion client
  • @OnTextMessage : Heartbeat (ping/pong)
  • @OnClose : Déconnexion

6. WebSocketBroadcastService (existait déjà)

Fichier : src/main/java/dev/lions/unionflow/server/service/WebSocketBroadcastService.java

Méthodes :

  • broadcast(String message) - Broadcast à tous les clients connectés
  • broadcastStatsUpdate(String jsonData)
  • broadcastNewActivity(String jsonData)
  • broadcastEventUpdate(String jsonData)
  • broadcastNotification(String jsonData)

📱 Mobile - Composants implémentés

1. WebSocketService

Fichier : lib/core/websocket/websocket_service.dart

Features :

  • Connexion WebSocket avec URL auto-détectée depuis AppConfig.backendBaseUrl
  • Reconnexion automatique avec backoff exponentiel (2^n secondes, max 60s)
  • Heartbeat (ping toutes les 30s)
  • Stream des events typés (Stream<WebSocketEvent>)
  • Stream statut connexion (Stream<bool>)
  • Parsing events avec factory pattern

Types d'events :

  • FinanceApprovalEvent - Workflow approbations
  • DashboardStatsEvent - Stats dashboard
  • NotificationEvent - Notifications
  • MemberEvent - Events membres
  • ContributionEvent - Cotisations
  • GenericEvent - Events génériques

Usage :

// Injection
@singleton
class DashboardBloc extends Bloc<DashboardEvent, DashboardState> {
  final WebSocketService webSocketService;

  DashboardBloc({required this.webSocketService}) {
    // Écouter les events WebSocket
    webSocketService.eventStream.listen((event) {
      if (event is DashboardStatsEvent) {
        add(RefreshDashboardFromWebSocket(event.data));
      }
    });

    // Écouter le statut de connexion
    webSocketService.connectionStatusStream.listen((isConnected) {
      if (isConnected) {
        print('✅ WebSocket connecté');
      } else {
        print('❌ WebSocket déconnecté');
      }
    });

    // Connexion au WebSocket
    webSocketService.connect();
  }

  @override
  Future<void> close() {
    webSocketService.disconnect();
    return super.close();
  }
}

2. Enregistrement DI

Le WebSocketService est annoté @singleton, donc automatiquement enregistré par injectable.

Génération code :

flutter pub run build_runner build --delete-conflicting-outputs

3. Intégration dans DashboardBloc

Fichier : lib/features/dashboard/presentation/bloc/dashboard_bloc.dart

Nouveaux events :

// dashboard_event.dart
class RefreshDashboardFromWebSocket extends DashboardEvent {
  final Map<String, dynamic> data;
  const RefreshDashboardFromWebSocket(this.data);
  @override
  List<Object> get props => [data];
}

class WebSocketConnectionChanged extends DashboardEvent {
  final bool isConnected;
  const WebSocketConnectionChanged(this.isConnected);
  @override
  List<Object> get props => [isConnected];
}

Implémentation DashboardBloc :

@injectable
class DashboardBloc extends Bloc<DashboardEvent, DashboardState> {
  final WebSocketService webSocketService;
  StreamSubscription<WebSocketEvent>? _webSocketEventSubscription;
  StreamSubscription<bool>? _webSocketConnectionSubscription;

  DashboardBloc({
    required this.getDashboardData,
    required this.getDashboardStats,
    required this.getRecentActivities,
    required this.getUpcomingEvents,
    required this.webSocketService,
  }) : super(DashboardInitial()) {
    on<RefreshDashboardFromWebSocket>(_onRefreshDashboardFromWebSocket);
    on<WebSocketConnectionChanged>(_onWebSocketConnectionChanged);

    // Initialiser WebSocket
    _initializeWebSocket();
  }

  void _initializeWebSocket() {
    // Connexion au WebSocket
    webSocketService.connect();

    // Écouter les events WebSocket
    _webSocketEventSubscription = webSocketService.eventStream.listen(
      (event) {
        // Dispatcher uniquement les events pertinents
        if (event is DashboardStatsEvent ||
            event is FinanceApprovalEvent ||
            event is MemberEvent ||
            event is ContributionEvent) {
          add(RefreshDashboardFromWebSocket(event.data));
        }
      },
    );

    // Écouter le statut de connexion
    _webSocketConnectionSubscription = webSocketService.connectionStatusStream.listen(
      (isConnected) {
        add(WebSocketConnectionChanged(isConnected));
      },
    );
  }

  Future<void> _onRefreshDashboardFromWebSocket(
    RefreshDashboardFromWebSocket event,
    Emitter<DashboardState> emit,
  ) async {
    // Rafraîchir uniquement les stats (optimisation)
    if (state is DashboardLoaded) {
      final result = await getDashboardStats(...);
      result.fold(
        (failure) => {}, // Garder les données actuelles
        (stats) {
          final updatedData = currentData.copyWith(stats: stats);
          emit(DashboardLoaded(updatedData));
        },
      );
    }
  }

  @override
  Future<void> close() {
    _webSocketEventSubscription?.cancel();
    _webSocketConnectionSubscription?.cancel();
    webSocketService.disconnect();
    return super.close();
  }
}

Résultat : Le dashboard se rafraîchit automatiquement en temps réel lorsqu'un event Kafka est reçu via WebSocket.


🚀 Utilisation End-to-End

Scénario 1 : Approbation Finance

Backend

// FinanceWorkflowResource.java
@POST
@Path("/approvals/{id}/approve")
public Response approveTransaction(@PathParam("id") UUID id) {
    // 1. Logique métier
    transactionApprovalService.approve(id);

    // 2. Publier event Kafka
    var approval = repository.findById(id);
    kafkaProducer.publishApprovalApproved(
        id,
        approval.getOrganizationId(),
        Map.of(
            "id", approval.getId().toString(),
            "transactionType", approval.getTransactionType().name(),
            "amount", approval.getAmount(),
            "approvedBy", approval.getApprovedBy()
        )
    );

    return Response.ok().build();
}

Flux

1. POST /api/v1/finance/approvals/{id}/approve
2. Backend → Kafka topic "unionflow.finance.approvals"
3. KafkaEventConsumer consomme event
4. WebSocketBroadcastService → Broadcast à tous les clients WS
5. Mobile WebSocketService reçoit event
6. DashboardBloc reçoit FinanceApprovalEvent
7. UI auto-refresh

Scénario 2 : Dashboard Stats Update

Backend

// DashboardService.java
@Scheduled(every = "10s")
public void updateDashboardStats() {
    organizations.forEach(org -> {
        var stats = calculateStats(org.getId());

        // Publier stats via Kafka
        kafkaProducer.publishDashboardStatsUpdate(
            org.getId(),
            Map.of(
                "totalMembers", stats.getTotalMembers(),
                "totalContributions", stats.getTotalContributions(),
                "pendingApprovals", stats.getPendingApprovals()
            )
        );
    });
}

Mobile

// DashboardBloc
on<RefreshDashboardFromWebSocket>((event, emit) {
  final stats = DashboardStats.fromJson(event.data);
  emit(DashboardLoaded(stats: stats));
});

// Écoute automatique dans le constructeur
webSocketService.eventStream.listen((event) {
  if (event is DashboardStatsEvent) {
    add(RefreshDashboardFromWebSocket(event.data));
  }
});

🧪 Tests

Backend - Test Kafka Producer

@QuarkusTest
class KafkaEventProducerTest {

    @Inject
    KafkaEventProducer producer;

    @Test
    void shouldPublishApprovalEvent() {
        var approvalData = Map.of("id", UUID.randomUUID().toString());
        producer.publishApprovalPending(UUID.randomUUID(), "org-123", approvalData);

        // Vérifier que l'event est publié dans Kafka
        // (nécessite un test consumer ou Kafka testcontainer)
    }
}

Mobile - Test WebSocketService

void main() {
  group('WebSocketService', () {
    late WebSocketService service;

    setUp(() {
      service = WebSocketService();
    });

    tearDown(() {
      service.dispose();
    });

    test('should connect to WebSocket', () async {
      service.connect();
      await Future.delayed(const Duration(milliseconds: 500));

      expect(service.isConnected, true);
    });

    test('should receive events', () async {
      service.connect();

      final events = <WebSocketEvent>[];
      service.eventStream.listen((event) {
        events.add(event);
      });

      // Simuler event depuis backend
      // ...

      await Future.delayed(const Duration(seconds: 2));
      expect(events.isNotEmpty, true);
    });

    test('should reconnect on disconnection', () async {
      service.connect();
      await Future.delayed(const Duration(milliseconds: 500));

      // Forcer déconnexion
      service.disconnect();
      expect(service.isConnected, false);

      // Vérifier reconnexion automatique
      await Future.delayed(const Duration(seconds: 3));
      // La reconnexion devrait avoir eu lieu
    });
  });
}

🔧 Configuration Production

Backend

Kubernetes ConfigMap :

apiVersion: v1
kind: ConfigMap
metadata:
  name: unionflow-backend-config
data:
  KAFKA_BOOTSTRAP_SERVERS: "kafka-service.kafka.svc.cluster.local:9092"

Deployment :

env:
  - name: KAFKA_BOOTSTRAP_SERVERS
    valueFrom:
      configMapKeyRef:
        name: unionflow-backend-config
        key: KAFKA_BOOTSTRAP_SERVERS

Mobile

AppConfig automatique :

static String get backendBaseUrl {
  switch (environment) {
    case 'prod':
      return 'https://api.lions.dev/unionflow';
    // ...
  }
}

// WebSocket URL dérivée automatiquement:
// https://api.lions.dev/unionflow → wss://api.lions.dev/unionflow/ws/dashboard

📋 Checklist Déploiement

Backend

  • Dépendances Kafka ajoutées au pom.xml
  • KafkaEventProducer créé
  • KafkaEventConsumer créé
  • Configuration Kafka dans application.properties
  • WebSocketEndpoint existe (déjà fait)
  • WebSocketBroadcastService existe (déjà fait)
  • Docker Compose avec Kafka (à tester localement)
  • Tests Kafka Producer/Consumer
  • Intégration dans services métier (FinanceWorkflowService, etc.)

Mobile

  • Package web_socket_channel dans pubspec.yaml
  • WebSocketService créé
  • Events typés (FinanceApprovalEvent, DashboardStatsEvent, etc.)
  • Reconnexion automatique
  • Heartbeat
  • Intégration dans DashboardBloc
  • Tests WebSocketService
  • Tests intégration E2E

🚀 Prochaines étapes

  1. Démarrer Kafka localement :

    cd unionflow
    docker-compose up -d kafka zookeeper
    
  2. Tester backend :

    cd unionflow-server-impl-quarkus
    ./mvnw quarkus:dev
    
  3. Tester mobile :

    cd unionflow-mobile-apps
    flutter pub run build_runner build --delete-conflicting-outputs
    flutter run --dart-define=ENV=dev
    
  4. Publier un event test (via Swagger UI) :

    • POST /api/v1/finance/approvals/{id}/approve
    • Vérifier que l'event arrive sur mobile
  5. Intégrer dans DashboardBloc :

    • Écouter webSocketService.eventStream
    • Dispatch events vers le BLoC

Résultat attendu

  • Backend publie events dans Kafka
  • Kafka consumer broadcast via WebSocket
  • Mobile reçoit events en temps réel
  • Dashboard auto-refresh sans pull manuel
  • Notifications push instantanées
  • Reconnexion automatique si déconnexion

Implémenté par : Claude Sonnet 4.5 Date : 2026-03-14 Status : Backend + Mobile + DashboardBloc intégration COMPLETS - Prêt pour tests end-to-end