Files
unionflow-mobile-apps/lib/core/websocket/websocket_service.dart
dahoud 07b8488714 feat(core): refonte architecture transverse (cache, network, websocket, DI)
- lib/app : app.dart, router mis à jour (routes nouveaux modules)
- lib/core/cache : cache_service + cached_datasource_decorator
- lib/core/network : api_client, offline_manager, retry_policy
- lib/core/websocket : websocket service (reconnexion exponentielle, heartbeat)
- lib/core/di : injection + register_module
- lib/core/storage : pending_operations_store (offline support)
- lib/core/navigation : main_navigation_layout (onglets par rôle)
- lib/core/config : environment, lcb_ft_constants
- lib/core/utils : error_formatter, validators
- pubspec.yaml/lock : dépendances mises à jour
2026-04-15 20:26:20 +00:00

418 lines
12 KiB
Dart

/// Service WebSocket pour temps réel (Kafka events)
library websocket_service;
import 'dart:async';
import 'dart:convert';
import 'package:injectable/injectable.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
import '../config/environment.dart';
import '../utils/logger.dart';
/// Events WebSocket typés
abstract class WebSocketEvent {
final String eventType;
final DateTime timestamp;
final Map<String, dynamic> data;
WebSocketEvent({
required this.eventType,
required this.timestamp,
required this.data,
});
factory WebSocketEvent.fromJson(Map<String, dynamic> json) {
final eventType = json['eventType'] as String;
final timestamp = json['timestamp'] != null
? DateTime.parse(json['timestamp'] as String)
: DateTime.now();
final data = (json['data'] as Map<String, dynamic>?) ?? <String, dynamic>{};
switch (eventType) {
case 'APPROVAL_PENDING':
case 'APPROVAL_APPROVED':
case 'APPROVAL_REJECTED':
return FinanceApprovalEvent(
eventType: eventType,
timestamp: timestamp,
data: data,
organizationId: json['organizationId'] as String?,
);
case 'DASHBOARD_STATS_UPDATED':
case 'KPI_UPDATED':
return DashboardStatsEvent(
eventType: eventType,
timestamp: timestamp,
data: data,
organizationId: json['organizationId'] as String?,
);
case 'USER_NOTIFICATION':
case 'BROADCAST_NOTIFICATION':
return NotificationEvent(
eventType: eventType,
timestamp: timestamp,
data: data,
userId: json['userId'] as String?,
organizationId: json['organizationId'] as String?,
);
case 'MEMBER_CREATED':
case 'MEMBER_UPDATED':
return MemberEvent(
eventType: eventType,
timestamp: timestamp,
data: data,
organizationId: json['organizationId'] as String?,
);
case 'CONTRIBUTION_PAID':
return ContributionEvent(
eventType: eventType,
timestamp: timestamp,
data: data,
organizationId: json['organizationId'] as String?,
);
case 'NOUVEAU_MESSAGE':
case 'MESSAGE_SUPPRIME':
case 'CONVERSATION_LUE':
return ChatMessageEvent(
eventType: eventType,
timestamp: timestamp,
data: data,
conversationId: json['conversationId'] as String?,
organizationId: json['organizationId'] as String?,
);
default:
return GenericEvent(
eventType: eventType,
timestamp: timestamp,
data: data,
);
}
}
}
class FinanceApprovalEvent extends WebSocketEvent {
final String? organizationId;
FinanceApprovalEvent({
required super.eventType,
required super.timestamp,
required super.data,
this.organizationId,
});
}
class DashboardStatsEvent extends WebSocketEvent {
final String? organizationId;
DashboardStatsEvent({
required super.eventType,
required super.timestamp,
required super.data,
this.organizationId,
});
}
class NotificationEvent extends WebSocketEvent {
final String? userId;
final String? organizationId;
NotificationEvent({
required super.eventType,
required super.timestamp,
required super.data,
this.userId,
this.organizationId,
});
}
class MemberEvent extends WebSocketEvent {
final String? organizationId;
MemberEvent({
required super.eventType,
required super.timestamp,
required super.data,
this.organizationId,
});
}
class ContributionEvent extends WebSocketEvent {
final String? organizationId;
ContributionEvent({
required super.eventType,
required super.timestamp,
required super.data,
this.organizationId,
});
}
class ChatMessageEvent extends WebSocketEvent {
final String? conversationId;
final String? organizationId;
ChatMessageEvent({
required super.eventType,
required super.timestamp,
required super.data,
this.conversationId,
this.organizationId,
});
}
class GenericEvent extends WebSocketEvent {
GenericEvent({
required super.eventType,
required super.timestamp,
required super.data,
});
}
/// Service WebSocket pour recevoir les events temps réel du backend
@singleton
class WebSocketService {
WebSocketChannel? _channel;
Timer? _reconnectTimer;
Timer? _heartbeatTimer;
final StreamController<WebSocketEvent> _eventController = StreamController.broadcast();
final StreamController<bool> _connectionStatusController = StreamController.broadcast();
bool _isConnected = false;
bool _shouldReconnect = true;
int _reconnectAttempts = 0;
/// Stream des events WebSocket typés
Stream<WebSocketEvent> get eventStream => _eventController.stream;
/// Stream du statut de connexion
Stream<bool> get connectionStatusStream => _connectionStatusController.stream;
/// Statut de connexion actuel
bool get isConnected => _isConnected;
/// Connexion au WebSocket
void connect() {
if (_isConnected || _channel != null) {
AppLogger.info('WebSocket déjà connecté');
return;
}
try {
final wsUrl = _buildWebSocketUrl();
AppLogger.info('Connexion WebSocket à $wsUrl...');
_channel = WebSocketChannel.connect(Uri.parse(wsUrl));
_channel!.stream.listen(
_onMessage,
onError: _onError,
onDone: _onDone,
cancelOnError: false,
);
_isConnected = true;
_reconnectAttempts = 0;
_connectionStatusController.add(true);
// Heartbeat toutes les 30 secondes
_startHeartbeat();
AppLogger.info('✅ WebSocket connecté avec succès');
} catch (e) {
AppLogger.error('Erreur connexion WebSocket', error: e);
_scheduleReconnect();
}
}
/// Déconnexion du WebSocket
void disconnect() {
AppLogger.info('Déconnexion WebSocket...');
_shouldReconnect = false;
_stopHeartbeat();
_stopReconnectTimer();
_channel?.sink.close(status.normalClosure);
_channel = null;
_isConnected = false;
_connectionStatusController.add(false);
}
/// Dispose des ressources
void dispose() {
disconnect();
_eventController.close();
_connectionStatusController.close();
}
/// Construit l'URL WebSocket depuis l'URL backend
String _buildWebSocketUrl() {
var baseUrl = AppConfig.apiBaseUrl;
// Remplacer http/https par ws/wss
if (baseUrl.startsWith('https://')) {
baseUrl = baseUrl.replaceFirst('https://', 'wss://');
} else if (baseUrl.startsWith('http://')) {
baseUrl = baseUrl.replaceFirst('http://', 'ws://');
}
return '$baseUrl/ws/dashboard';
}
// ─────────────────────────────────────────────────────────────────────────
// Helpers de conversion de types (Flutter Web / dart2js compatibility)
// ─────────────────────────────────────────────────────────────────────────
/// Convertit récursivement un objet JSON (potentiellement JS) en types Dart natifs.
/// Nécessaire sur Flutter Web où jsonDecode peut retourner des LegacyJavaScriptObject.
static dynamic _toDart(dynamic value) {
if (value is Map) {
return Map<String, dynamic>.fromEntries(
(value as Map).entries.map(
(e) => MapEntry(e.key.toString(), _toDart(e.value)),
),
);
}
if (value is List) {
return (value as List).map(_toDart).toList();
}
return value;
}
/// Gestion des messages reçus
void _onMessage(dynamic message) {
try {
if (AppConfig.enableLogging) {
AppLogger.debug('WebSocket message reçu: $message');
}
// Sur Flutter Web (web_socket_channel ^3.0.x avec package:web), les messages
// text peuvent arriver comme JSString/LegacyJavaScriptObject plutôt que String.
// toString() fonctionne pour les strings JS primitifs.
final String rawMessage = message is String ? message : message.toString();
if (rawMessage.isEmpty) return;
// Convertir en types Dart natifs pour éviter les LegacyJavaScriptObject imbriqués
final dynamic decoded = jsonDecode(rawMessage);
if (decoded is! Map) {
AppLogger.warning('WebSocket: message ignoré (non-objet): type=${decoded.runtimeType}');
return;
}
final json = _toDart(decoded) as Map<String, dynamic>;
final type = json['type'] as String?;
// Gérer les messages système
if (type == 'connected') {
final connectedMsg = (json['data'] as Map<String, dynamic>?)?['message'] ?? 'WebSocket connecté';
AppLogger.info('🔗 WebSocket: $connectedMsg');
return;
}
if (type == 'pong') {
if (AppConfig.enableLogging) {
AppLogger.debug('WebSocket heartbeat pong reçu');
}
return;
}
if (type == 'ack') {
return; // Accusé de réception, ignoré
}
// Event métier (Kafka)
if (json.containsKey('eventType')) {
final event = WebSocketEvent.fromJson(json);
_eventController.add(event);
AppLogger.info('📨 Event reçu: ${event.eventType}');
}
} catch (e) {
AppLogger.error('Erreur parsing message WebSocket', error: e);
}
}
/// Gestion des erreurs
void _onError(dynamic error) {
AppLogger.error('WebSocket error', error: error);
_isConnected = false;
_stopHeartbeat();
_channel = null;
_connectionStatusController.add(false);
_scheduleReconnect();
}
/// Gestion de la fermeture de connexion
void _onDone() {
if (!_isConnected) return; // Déjà traité par _onError
AppLogger.info('WebSocket connexion fermée');
_isConnected = false;
_channel = null;
_connectionStatusController.add(false);
_stopHeartbeat();
_scheduleReconnect();
}
/// Planifier une reconnexion avec backoff exponentiel
void _scheduleReconnect() {
if (!_shouldReconnect) {
return;
}
if (_reconnectTimer != null) {
return; // Reconnexion déjà planifiée
}
_stopReconnectTimer();
// Backoff exponentiel : 2^attempts secondes (max 60s)
final delaySeconds = (2 << _reconnectAttempts).clamp(1, 60);
_reconnectAttempts++;
AppLogger.info('⏳ Reconnexion WebSocket dans ${delaySeconds}s (tentative $_reconnectAttempts)');
_reconnectTimer = Timer(Duration(seconds: delaySeconds), () {
_channel = null;
AppLogger.info('🔄 Tentative de reconnexion WebSocket...');
connect();
});
}
/// Arrêter le timer de reconnexion
void _stopReconnectTimer() {
_reconnectTimer?.cancel();
_reconnectTimer = null;
}
/// Démarrer le heartbeat (ping toutes les 30s)
void _startHeartbeat() {
_stopHeartbeat();
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
if (_isConnected && _channel != null) {
try {
_channel!.sink.add(jsonEncode({'type': 'ping'}));
if (AppConfig.enableLogging) {
AppLogger.debug('WebSocket heartbeat ping envoyé');
}
} catch (e) {
AppLogger.error('Erreur envoi heartbeat', error: e);
}
}
});
}
/// Arrêter le heartbeat
void _stopHeartbeat() {
_heartbeatTimer?.cancel();
_heartbeatTimer = null;
}
}