/// Service WebSocket pour temps réel (Kafka events) library websocket_service; import 'dart:async'; import 'dart:convert'; import 'package:injectable/injectable.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import 'package:web_socket_channel/status.dart' as status; import '../config/environment.dart'; import '../utils/logger.dart'; /// Events WebSocket typés abstract class WebSocketEvent { final String eventType; final DateTime timestamp; final Map data; WebSocketEvent({ required this.eventType, required this.timestamp, required this.data, }); factory WebSocketEvent.fromJson(Map json) { final eventType = json['eventType'] as String; final timestamp = json['timestamp'] != null ? DateTime.parse(json['timestamp'] as String) : DateTime.now(); final data = (json['data'] as Map?) ?? {}; switch (eventType) { case 'APPROVAL_PENDING': case 'APPROVAL_APPROVED': case 'APPROVAL_REJECTED': return FinanceApprovalEvent( eventType: eventType, timestamp: timestamp, data: data, organizationId: json['organizationId'] as String?, ); case 'DASHBOARD_STATS_UPDATED': case 'KPI_UPDATED': return DashboardStatsEvent( eventType: eventType, timestamp: timestamp, data: data, organizationId: json['organizationId'] as String?, ); case 'USER_NOTIFICATION': case 'BROADCAST_NOTIFICATION': return NotificationEvent( eventType: eventType, timestamp: timestamp, data: data, userId: json['userId'] as String?, organizationId: json['organizationId'] as String?, ); case 'MEMBER_CREATED': case 'MEMBER_UPDATED': return MemberEvent( eventType: eventType, timestamp: timestamp, data: data, organizationId: json['organizationId'] as String?, ); case 'CONTRIBUTION_PAID': return ContributionEvent( eventType: eventType, timestamp: timestamp, data: data, organizationId: json['organizationId'] as String?, ); case 'NOUVEAU_MESSAGE': case 'MESSAGE_SUPPRIME': case 'CONVERSATION_LUE': return ChatMessageEvent( eventType: eventType, timestamp: timestamp, data: data, conversationId: json['conversationId'] as String?, organizationId: json['organizationId'] as String?, ); default: return GenericEvent( eventType: eventType, timestamp: timestamp, data: data, ); } } } class FinanceApprovalEvent extends WebSocketEvent { final String? organizationId; FinanceApprovalEvent({ required super.eventType, required super.timestamp, required super.data, this.organizationId, }); } class DashboardStatsEvent extends WebSocketEvent { final String? organizationId; DashboardStatsEvent({ required super.eventType, required super.timestamp, required super.data, this.organizationId, }); } class NotificationEvent extends WebSocketEvent { final String? userId; final String? organizationId; NotificationEvent({ required super.eventType, required super.timestamp, required super.data, this.userId, this.organizationId, }); } class MemberEvent extends WebSocketEvent { final String? organizationId; MemberEvent({ required super.eventType, required super.timestamp, required super.data, this.organizationId, }); } class ContributionEvent extends WebSocketEvent { final String? organizationId; ContributionEvent({ required super.eventType, required super.timestamp, required super.data, this.organizationId, }); } class ChatMessageEvent extends WebSocketEvent { final String? conversationId; final String? organizationId; ChatMessageEvent({ required super.eventType, required super.timestamp, required super.data, this.conversationId, this.organizationId, }); } class GenericEvent extends WebSocketEvent { GenericEvent({ required super.eventType, required super.timestamp, required super.data, }); } /// Service WebSocket pour recevoir les events temps réel du backend @singleton class WebSocketService { WebSocketChannel? _channel; Timer? _reconnectTimer; Timer? _heartbeatTimer; final StreamController _eventController = StreamController.broadcast(); final StreamController _connectionStatusController = StreamController.broadcast(); bool _isConnected = false; bool _shouldReconnect = true; int _reconnectAttempts = 0; /// Stream des events WebSocket typés Stream get eventStream => _eventController.stream; /// Stream du statut de connexion Stream get connectionStatusStream => _connectionStatusController.stream; /// Statut de connexion actuel bool get isConnected => _isConnected; /// Connexion au WebSocket void connect() { if (_isConnected || _channel != null) { AppLogger.info('WebSocket déjà connecté'); return; } try { final wsUrl = _buildWebSocketUrl(); AppLogger.info('Connexion WebSocket à $wsUrl...'); _channel = WebSocketChannel.connect(Uri.parse(wsUrl)); _channel!.stream.listen( _onMessage, onError: _onError, onDone: _onDone, cancelOnError: false, ); _isConnected = true; _reconnectAttempts = 0; _connectionStatusController.add(true); // Heartbeat toutes les 30 secondes _startHeartbeat(); AppLogger.info('✅ WebSocket connecté avec succès'); } catch (e) { AppLogger.error('Erreur connexion WebSocket', error: e); _scheduleReconnect(); } } /// Déconnexion du WebSocket void disconnect() { AppLogger.info('Déconnexion WebSocket...'); _shouldReconnect = false; _stopHeartbeat(); _stopReconnectTimer(); _channel?.sink.close(status.normalClosure); _channel = null; _isConnected = false; _connectionStatusController.add(false); } /// Dispose des ressources void dispose() { disconnect(); _eventController.close(); _connectionStatusController.close(); } /// Construit l'URL WebSocket depuis l'URL backend String _buildWebSocketUrl() { var baseUrl = AppConfig.apiBaseUrl; // Remplacer http/https par ws/wss if (baseUrl.startsWith('https://')) { baseUrl = baseUrl.replaceFirst('https://', 'wss://'); } else if (baseUrl.startsWith('http://')) { baseUrl = baseUrl.replaceFirst('http://', 'ws://'); } return '$baseUrl/ws/dashboard'; } // ───────────────────────────────────────────────────────────────────────── // Helpers de conversion de types (Flutter Web / dart2js compatibility) // ───────────────────────────────────────────────────────────────────────── /// Convertit récursivement un objet JSON (potentiellement JS) en types Dart natifs. /// Nécessaire sur Flutter Web où jsonDecode peut retourner des LegacyJavaScriptObject. static dynamic _toDart(dynamic value) { if (value is Map) { return Map.fromEntries( (value as Map).entries.map( (e) => MapEntry(e.key.toString(), _toDart(e.value)), ), ); } if (value is List) { return (value as List).map(_toDart).toList(); } return value; } /// Gestion des messages reçus void _onMessage(dynamic message) { try { if (AppConfig.enableLogging) { AppLogger.debug('WebSocket message reçu: $message'); } // Sur Flutter Web (web_socket_channel ^3.0.x avec package:web), les messages // text peuvent arriver comme JSString/LegacyJavaScriptObject plutôt que String. // toString() fonctionne pour les strings JS primitifs. final String rawMessage = message is String ? message : message.toString(); if (rawMessage.isEmpty) return; // Convertir en types Dart natifs pour éviter les LegacyJavaScriptObject imbriqués final dynamic decoded = jsonDecode(rawMessage); if (decoded is! Map) { AppLogger.warning('WebSocket: message ignoré (non-objet): type=${decoded.runtimeType}'); return; } final json = _toDart(decoded) as Map; final type = json['type'] as String?; // Gérer les messages système if (type == 'connected') { final connectedMsg = (json['data'] as Map?)?['message'] ?? 'WebSocket connecté'; AppLogger.info('🔗 WebSocket: $connectedMsg'); return; } if (type == 'pong') { if (AppConfig.enableLogging) { AppLogger.debug('WebSocket heartbeat pong reçu'); } return; } if (type == 'ack') { return; // Accusé de réception, ignoré } // Event métier (Kafka) if (json.containsKey('eventType')) { final event = WebSocketEvent.fromJson(json); _eventController.add(event); AppLogger.info('📨 Event reçu: ${event.eventType}'); } } catch (e) { AppLogger.error('Erreur parsing message WebSocket', error: e); } } /// Gestion des erreurs void _onError(dynamic error) { AppLogger.error('WebSocket error', error: error); _isConnected = false; _stopHeartbeat(); _channel = null; _connectionStatusController.add(false); _scheduleReconnect(); } /// Gestion de la fermeture de connexion void _onDone() { if (!_isConnected) return; // Déjà traité par _onError AppLogger.info('WebSocket connexion fermée'); _isConnected = false; _channel = null; _connectionStatusController.add(false); _stopHeartbeat(); _scheduleReconnect(); } /// Planifier une reconnexion avec backoff exponentiel void _scheduleReconnect() { if (!_shouldReconnect) { return; } if (_reconnectTimer != null) { return; // Reconnexion déjà planifiée } _stopReconnectTimer(); // Backoff exponentiel : 2^attempts secondes (max 60s) final delaySeconds = (2 << _reconnectAttempts).clamp(1, 60); _reconnectAttempts++; AppLogger.info('⏳ Reconnexion WebSocket dans ${delaySeconds}s (tentative $_reconnectAttempts)'); _reconnectTimer = Timer(Duration(seconds: delaySeconds), () { _channel = null; AppLogger.info('🔄 Tentative de reconnexion WebSocket...'); connect(); }); } /// Arrêter le timer de reconnexion void _stopReconnectTimer() { _reconnectTimer?.cancel(); _reconnectTimer = null; } /// Démarrer le heartbeat (ping toutes les 30s) void _startHeartbeat() { _stopHeartbeat(); _heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) { if (_isConnected && _channel != null) { try { _channel!.sink.add(jsonEncode({'type': 'ping'})); if (AppConfig.enableLogging) { AppLogger.debug('WebSocket heartbeat ping envoyé'); } } catch (e) { AppLogger.error('Erreur envoi heartbeat', error: e); } } }); } /// Arrêter le heartbeat void _stopHeartbeat() { _heartbeatTimer?.cancel(); _heartbeatTimer = null; } }