359 lines
9.1 KiB
Dart
359 lines
9.1 KiB
Dart
/// Service WebSocket pour temps réel (Kafka events)
|
|
library websocket_service;
|
|
|
|
import 'dart:async';
|
|
import 'dart:convert';
|
|
|
|
import 'package:injectable/injectable.dart';
|
|
import 'package:web_socket_channel/web_socket_channel.dart';
|
|
import 'package:web_socket_channel/status.dart' as status;
|
|
|
|
import '../config/environment.dart';
|
|
import '../utils/logger.dart';
|
|
|
|
/// Events WebSocket typés
|
|
abstract class WebSocketEvent {
|
|
final String eventType;
|
|
final DateTime timestamp;
|
|
final Map<String, dynamic> data;
|
|
|
|
WebSocketEvent({
|
|
required this.eventType,
|
|
required this.timestamp,
|
|
required this.data,
|
|
});
|
|
|
|
factory WebSocketEvent.fromJson(Map<String, dynamic> json) {
|
|
final eventType = json['eventType'] as String;
|
|
final timestamp = DateTime.parse(json['timestamp'] as String);
|
|
final data = json['data'] as Map<String, dynamic>;
|
|
|
|
switch (eventType) {
|
|
case 'APPROVAL_PENDING':
|
|
case 'APPROVAL_APPROVED':
|
|
case 'APPROVAL_REJECTED':
|
|
return FinanceApprovalEvent(
|
|
eventType: eventType,
|
|
timestamp: timestamp,
|
|
data: data,
|
|
organizationId: json['organizationId'] as String?,
|
|
);
|
|
|
|
case 'DASHBOARD_STATS_UPDATED':
|
|
case 'KPI_UPDATED':
|
|
return DashboardStatsEvent(
|
|
eventType: eventType,
|
|
timestamp: timestamp,
|
|
data: data,
|
|
organizationId: json['organizationId'] as String?,
|
|
);
|
|
|
|
case 'USER_NOTIFICATION':
|
|
case 'BROADCAST_NOTIFICATION':
|
|
return NotificationEvent(
|
|
eventType: eventType,
|
|
timestamp: timestamp,
|
|
data: data,
|
|
userId: json['userId'] as String?,
|
|
organizationId: json['organizationId'] as String?,
|
|
);
|
|
|
|
case 'MEMBER_CREATED':
|
|
case 'MEMBER_UPDATED':
|
|
return MemberEvent(
|
|
eventType: eventType,
|
|
timestamp: timestamp,
|
|
data: data,
|
|
organizationId: json['organizationId'] as String?,
|
|
);
|
|
|
|
case 'CONTRIBUTION_PAID':
|
|
return ContributionEvent(
|
|
eventType: eventType,
|
|
timestamp: timestamp,
|
|
data: data,
|
|
organizationId: json['organizationId'] as String?,
|
|
);
|
|
|
|
default:
|
|
return GenericEvent(
|
|
eventType: eventType,
|
|
timestamp: timestamp,
|
|
data: data,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
class FinanceApprovalEvent extends WebSocketEvent {
|
|
final String? organizationId;
|
|
|
|
FinanceApprovalEvent({
|
|
required super.eventType,
|
|
required super.timestamp,
|
|
required super.data,
|
|
this.organizationId,
|
|
});
|
|
}
|
|
|
|
class DashboardStatsEvent extends WebSocketEvent {
|
|
final String? organizationId;
|
|
|
|
DashboardStatsEvent({
|
|
required super.eventType,
|
|
required super.timestamp,
|
|
required super.data,
|
|
this.organizationId,
|
|
});
|
|
}
|
|
|
|
class NotificationEvent extends WebSocketEvent {
|
|
final String? userId;
|
|
final String? organizationId;
|
|
|
|
NotificationEvent({
|
|
required super.eventType,
|
|
required super.timestamp,
|
|
required super.data,
|
|
this.userId,
|
|
this.organizationId,
|
|
});
|
|
}
|
|
|
|
class MemberEvent extends WebSocketEvent {
|
|
final String? organizationId;
|
|
|
|
MemberEvent({
|
|
required super.eventType,
|
|
required super.timestamp,
|
|
required super.data,
|
|
this.organizationId,
|
|
});
|
|
}
|
|
|
|
class ContributionEvent extends WebSocketEvent {
|
|
final String? organizationId;
|
|
|
|
ContributionEvent({
|
|
required super.eventType,
|
|
required super.timestamp,
|
|
required super.data,
|
|
this.organizationId,
|
|
});
|
|
}
|
|
|
|
class GenericEvent extends WebSocketEvent {
|
|
GenericEvent({
|
|
required super.eventType,
|
|
required super.timestamp,
|
|
required super.data,
|
|
});
|
|
}
|
|
|
|
/// Service WebSocket pour recevoir les events temps réel du backend
|
|
@singleton
|
|
class WebSocketService {
|
|
WebSocketChannel? _channel;
|
|
Timer? _reconnectTimer;
|
|
Timer? _heartbeatTimer;
|
|
|
|
final StreamController<WebSocketEvent> _eventController = StreamController.broadcast();
|
|
final StreamController<bool> _connectionStatusController = StreamController.broadcast();
|
|
|
|
bool _isConnected = false;
|
|
bool _shouldReconnect = true;
|
|
int _reconnectAttempts = 0;
|
|
|
|
/// Stream des events WebSocket typés
|
|
Stream<WebSocketEvent> get eventStream => _eventController.stream;
|
|
|
|
/// Stream du statut de connexion
|
|
Stream<bool> get connectionStatusStream => _connectionStatusController.stream;
|
|
|
|
/// Statut de connexion actuel
|
|
bool get isConnected => _isConnected;
|
|
|
|
/// Connexion au WebSocket
|
|
void connect() {
|
|
if (_isConnected || _channel != null) {
|
|
AppLogger.info('WebSocket déjà connecté');
|
|
return;
|
|
}
|
|
|
|
try {
|
|
final wsUrl = _buildWebSocketUrl();
|
|
AppLogger.info('Connexion WebSocket à $wsUrl...');
|
|
|
|
_channel = WebSocketChannel.connect(Uri.parse(wsUrl));
|
|
|
|
_channel!.stream.listen(
|
|
_onMessage,
|
|
onError: _onError,
|
|
onDone: _onDone,
|
|
cancelOnError: false,
|
|
);
|
|
|
|
_isConnected = true;
|
|
_reconnectAttempts = 0;
|
|
_connectionStatusController.add(true);
|
|
|
|
// Heartbeat toutes les 30 secondes
|
|
_startHeartbeat();
|
|
|
|
AppLogger.info('✅ WebSocket connecté avec succès');
|
|
} catch (e) {
|
|
AppLogger.error('Erreur connexion WebSocket', error: e);
|
|
_scheduleReconnect();
|
|
}
|
|
}
|
|
|
|
/// Déconnexion du WebSocket
|
|
void disconnect() {
|
|
AppLogger.info('Déconnexion WebSocket...');
|
|
_shouldReconnect = false;
|
|
_stopHeartbeat();
|
|
_stopReconnectTimer();
|
|
|
|
_channel?.sink.close(status.normalClosure);
|
|
_channel = null;
|
|
|
|
_isConnected = false;
|
|
_connectionStatusController.add(false);
|
|
}
|
|
|
|
/// Dispose des ressources
|
|
void dispose() {
|
|
disconnect();
|
|
_eventController.close();
|
|
_connectionStatusController.close();
|
|
}
|
|
|
|
/// Construit l'URL WebSocket depuis l'URL backend
|
|
String _buildWebSocketUrl() {
|
|
var baseUrl = AppConfig.apiBaseUrl;
|
|
|
|
// Remplacer http/https par ws/wss
|
|
if (baseUrl.startsWith('https://')) {
|
|
baseUrl = baseUrl.replaceFirst('https://', 'wss://');
|
|
} else if (baseUrl.startsWith('http://')) {
|
|
baseUrl = baseUrl.replaceFirst('http://', 'ws://');
|
|
}
|
|
|
|
return '$baseUrl/ws/dashboard';
|
|
}
|
|
|
|
/// Gestion des messages reçus
|
|
void _onMessage(dynamic message) {
|
|
try {
|
|
if (AppConfig.enableLogging) {
|
|
AppLogger.debug('WebSocket message reçu: $message');
|
|
}
|
|
|
|
final json = jsonDecode(message as String) as Map<String, dynamic>;
|
|
final type = json['type'] as String?;
|
|
|
|
// Gérer les messages système
|
|
if (type == 'connected') {
|
|
AppLogger.info('🔗 WebSocket: ${json['data']['message']}');
|
|
return;
|
|
}
|
|
|
|
if (type == 'pong') {
|
|
if (AppConfig.enableLogging) {
|
|
AppLogger.debug('WebSocket heartbeat pong reçu');
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (type == 'ack') {
|
|
return; // Accusé de réception, ignoré
|
|
}
|
|
|
|
// Event métier (Kafka)
|
|
if (json.containsKey('eventType')) {
|
|
final event = WebSocketEvent.fromJson(json);
|
|
_eventController.add(event);
|
|
AppLogger.info('📨 Event reçu: ${event.eventType}');
|
|
}
|
|
} catch (e) {
|
|
AppLogger.error('Erreur parsing message WebSocket', error: e);
|
|
}
|
|
}
|
|
|
|
/// Gestion des erreurs
|
|
void _onError(dynamic error) {
|
|
AppLogger.error('WebSocket error', error: error);
|
|
_isConnected = false;
|
|
_stopHeartbeat();
|
|
_channel = null;
|
|
_connectionStatusController.add(false);
|
|
_scheduleReconnect();
|
|
}
|
|
|
|
/// Gestion de la fermeture de connexion
|
|
void _onDone() {
|
|
if (!_isConnected) return; // Déjà traité par _onError
|
|
AppLogger.info('WebSocket connexion fermée');
|
|
_isConnected = false;
|
|
_channel = null;
|
|
_connectionStatusController.add(false);
|
|
_stopHeartbeat();
|
|
_scheduleReconnect();
|
|
}
|
|
|
|
/// Planifier une reconnexion avec backoff exponentiel
|
|
void _scheduleReconnect() {
|
|
if (!_shouldReconnect) {
|
|
return;
|
|
}
|
|
|
|
if (_reconnectTimer != null) {
|
|
return; // Reconnexion déjà planifiée
|
|
}
|
|
|
|
_stopReconnectTimer();
|
|
|
|
// Backoff exponentiel : 2^attempts secondes (max 60s)
|
|
final delaySeconds = (2 << _reconnectAttempts).clamp(1, 60);
|
|
_reconnectAttempts++;
|
|
|
|
AppLogger.info('⏳ Reconnexion WebSocket dans ${delaySeconds}s (tentative $_reconnectAttempts)');
|
|
|
|
_reconnectTimer = Timer(Duration(seconds: delaySeconds), () {
|
|
_channel = null;
|
|
AppLogger.info('🔄 Tentative de reconnexion WebSocket...');
|
|
connect();
|
|
});
|
|
}
|
|
|
|
/// Arrêter le timer de reconnexion
|
|
void _stopReconnectTimer() {
|
|
_reconnectTimer?.cancel();
|
|
_reconnectTimer = null;
|
|
}
|
|
|
|
/// Démarrer le heartbeat (ping toutes les 30s)
|
|
void _startHeartbeat() {
|
|
_stopHeartbeat();
|
|
|
|
_heartbeatTimer = Timer.periodic(const Duration(seconds: 30), (timer) {
|
|
if (_isConnected && _channel != null) {
|
|
try {
|
|
_channel!.sink.add(jsonEncode({'type': 'ping'}));
|
|
if (AppConfig.enableLogging) {
|
|
AppLogger.debug('WebSocket heartbeat ping envoyé');
|
|
}
|
|
} catch (e) {
|
|
AppLogger.error('Erreur envoi heartbeat', error: e);
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Arrêter le heartbeat
|
|
void _stopHeartbeat() {
|
|
_heartbeatTimer?.cancel();
|
|
_heartbeatTimer = null;
|
|
}
|
|
}
|