# 💻 Exemples d'Implémentation - Temps Réel avec Kafka
## 📦 Étape 1 : Ajouter les Dépendances
### pom.xml
```xml
io.quarkus
quarkus-websockets-next
io.quarkus
quarkus-messaging-kafka
io.quarkiverse.reactivemessaginghttp
quarkus-reactive-messaging-http
1.0.0
io.quarkus
quarkus-jsonb
```
---
## 🔧 Étape 2 : Configuration application.properties
```properties
# ============================================
# Kafka Configuration
# ============================================
kafka.bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# Topic: Notifications
mp.messaging.outgoing.notifications.connector=smallrye-kafka
mp.messaging.outgoing.notifications.topic=notifications
mp.messaging.outgoing.notifications.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.notifications.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
# Topic: Chat Messages
mp.messaging.outgoing.chat-messages.connector=smallrye-kafka
mp.messaging.outgoing.chat-messages.topic=chat.messages
mp.messaging.outgoing.chat-messages.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.chat-messages.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
# Topic: Reactions (likes, comments)
mp.messaging.outgoing.reactions.connector=smallrye-kafka
mp.messaging.outgoing.reactions.topic=reactions
mp.messaging.outgoing.reactions.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.reactions.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
# Topic: Presence Updates
mp.messaging.outgoing.presence.connector=smallrye-kafka
mp.messaging.outgoing.presence.topic=presence.updates
mp.messaging.outgoing.presence.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.presence.value.serializer=io.quarkus.kafka.client.serialization.JsonbSerializer
# ============================================
# Kafka → WebSocket Bridge (Incoming)
# ============================================
# Consommer depuis Kafka et router vers WebSocket
mp.messaging.incoming.kafka-notifications.connector=smallrye-kafka
mp.messaging.incoming.kafka-notifications.topic=notifications
mp.messaging.incoming.kafka-notifications.group.id=websocket-notifications-bridge
mp.messaging.incoming.kafka-notifications.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.kafka-notifications.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.kafka-notifications.enable.auto.commit=true
mp.messaging.incoming.kafka-chat.connector=smallrye-kafka
mp.messaging.incoming.kafka-chat.topic=chat.messages
mp.messaging.incoming.kafka-chat.group.id=websocket-chat-bridge
mp.messaging.incoming.kafka-chat.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.kafka-chat.value.deserializer=io.quarkus.kafka.client.serialization.JsonbDeserializer
mp.messaging.incoming.kafka-chat.enable.auto.commit=true
# ============================================
# WebSocket Configuration
# ============================================
quarkus.websockets-next.server.enabled=true
```
---
## 📝 Étape 3 : DTOs pour les Événements Kafka
### NotificationEvent.java
```java
package com.lions.dev.dto.events;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.Map;
import java.util.UUID;
/**
* Événement de notification publié dans Kafka.
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class NotificationEvent {
private String userId; // Clé Kafka (pour routing)
private String type; // friend_request, event_reminder, message_alert, etc.
private Map data;
private Long timestamp;
public NotificationEvent(String userId, String type, Map data) {
this.userId = userId;
this.type = type;
this.data = data;
this.timestamp = System.currentTimeMillis();
}
}
```
### ChatMessageEvent.java
```java
package com.lions.dev.dto.events;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.util.UUID;
/**
* Événement de message chat publié dans Kafka.
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ChatMessageEvent {
private String conversationId; // Clé Kafka
private String senderId;
private String recipientId;
private String content;
private String messageId;
private Long timestamp;
}
```
### ReactionEvent.java
```java
package com.lions.dev.dto.events;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* Événement de réaction (like, comment) publié dans Kafka.
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class ReactionEvent {
private String postId; // Clé Kafka
private String userId;
private String reactionType; // like, comment, share
private Map data;
private Long timestamp;
}
```
---
## 🔌 Étape 4 : WebSocket avec WebSockets Next
### NotificationWebSocketNext.java
```java
package com.lions.dev.websocket;
import io.quarkus.logging.Log;
import io.quarkus.websockets.next.OnClose;
import io.quarkus.websockets.next.OnOpen;
import io.quarkus.websockets.next.OnTextMessage;
import io.quarkus.websockets.next.WebSocket;
import io.quarkus.websockets.next.WebSocketConnection;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* WebSocket endpoint pour les notifications en temps réel (WebSockets Next).
*
* Architecture:
* Services → Kafka → Bridge → WebSocket → Client
*/
@WebSocket(path = "/notifications/{userId}")
@ApplicationScoped
public class NotificationWebSocketNext {
// Stockage des connexions actives par utilisateur (multi-device support)
private static final Map> userConnections = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(WebSocketConnection connection, String userId) {
try {
UUID userUUID = UUID.fromString(userId);
// Ajouter la connexion Ă l'ensemble des connexions de l'utilisateur
userConnections.computeIfAbsent(userUUID, k -> ConcurrentHashMap.newKeySet())
.add(connection);
Log.info("[WS-NEXT] Connexion ouverte pour l'utilisateur: " + userId +
" (Total: " + userConnections.get(userUUID).size() + ")");
// Envoyer confirmation
connection.sendText("{\"type\":\"connected\",\"timestamp\":" +
System.currentTimeMillis() + "}");
} catch (IllegalArgumentException e) {
Log.error("[WS-NEXT] UUID invalide: " + userId, e);
connection.close();
}
}
@OnClose
public void onClose(String userId) {
try {
UUID userUUID = UUID.fromString(userId);
Set connections = userConnections.get(userUUID);
if (connections != null) {
connections.removeIf(conn -> !conn.isOpen());
if (connections.isEmpty()) {
userConnections.remove(userUUID);
Log.info("[WS-NEXT] Toutes les connexions fermées pour: " + userId);
} else {
Log.info("[WS-NEXT] Connexion fermée pour: " + userId +
" (Restantes: " + connections.size() + ")");
}
}
} catch (Exception e) {
Log.error("[WS-NEXT] Erreur lors de la fermeture", e);
}
}
@OnTextMessage
public void onMessage(String message, String userId) {
try {
Log.debug("[WS-NEXT] Message reçu de " + userId + ": " + message);
// Parser le message JSON
com.fasterxml.jackson.databind.ObjectMapper mapper =
new com.fasterxml.jackson.databind.ObjectMapper();
Map messageData = mapper.readValue(message, Map.class);
String type = (String) messageData.get("type");
switch (type) {
case "ping":
handlePing(userId);
break;
case "ack":
handleAck(messageData, userId);
break;
default:
Log.warn("[WS-NEXT] Type de message inconnu: " + type);
}
} catch (Exception e) {
Log.error("[WS-NEXT] Erreur traitement message", e);
}
}
private void handlePing(String userId) {
UUID userUUID = UUID.fromString(userId);
Set connections = userConnections.get(userUUID);
if (connections != null) {
String pong = "{\"type\":\"pong\",\"timestamp\":" +
System.currentTimeMillis() + "}";
connections.forEach(conn -> {
if (conn.isOpen()) {
conn.sendText(pong);
}
});
}
}
private void handleAck(Map messageData, String userId) {
String notificationId = (String) messageData.get("notificationId");
Log.debug("[WS-NEXT] ACK reçu pour notification " + notificationId +
" de " + userId);
}
/**
* Envoie une notification à un utilisateur spécifique.
* Appelé par le bridge Kafka → WebSocket.
*/
public static void sendToUser(UUID userId, String message) {
Set connections = userConnections.get(userId);
if (connections == null || connections.isEmpty()) {
Log.debug("[WS-NEXT] Utilisateur " + userId + " non connecté");
return;
}
int success = 0;
int failed = 0;
for (WebSocketConnection conn : connections) {
if (conn.isOpen()) {
try {
conn.sendText(message);
success++;
} catch (Exception e) {
failed++;
Log.error("[WS-NEXT] Erreur envoi Ă " + userId, e);
}
} else {
failed++;
}
}
Log.info("[WS-NEXT] Notification envoyée à " + userId +
" (Succès: " + success + ", Échec: " + failed + ")");
}
}
```
---
## 🌉 Étape 5 : Bridge Kafka → WebSocket
### NotificationKafkaBridge.java
```java
package com.lions.dev.websocket;
import com.lions.dev.dto.events.NotificationEvent;
import io.quarkus.logging.Log;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import java.util.UUID;
/**
* Bridge qui consomme depuis Kafka et envoie via WebSocket.
*
* Architecture:
* Kafka Topic (notifications) → Bridge → WebSocket (NotificationWebSocketNext)
*/
@ApplicationScoped
public class NotificationKafkaBridge {
/**
* Consomme les événements depuis Kafka et les route vers WebSocket.
*/
@Incoming("kafka-notifications")
public void processNotification(Message message) {
try {
NotificationEvent event = message.getPayload();
Log.debug("[KAFKA-BRIDGE] Événement reçu: " + event.getType() +
" pour utilisateur: " + event.getUserId());
UUID userId = UUID.fromString(event.getUserId());
// Construire le message JSON pour WebSocket
String wsMessage = buildWebSocketMessage(event);
// Envoyer via WebSocket
NotificationWebSocketNext.sendToUser(userId, wsMessage);
// Acknowledger le message Kafka
message.ack();
} catch (Exception e) {
Log.error("[KAFKA-BRIDGE] Erreur traitement événement", e);
message.nack(e);
}
}
private String buildWebSocketMessage(NotificationEvent event) {
try {
com.fasterxml.jackson.databind.ObjectMapper mapper =
new com.fasterxml.jackson.databind.ObjectMapper();
java.util.Map wsMessage = java.util.Map.of(
"type", event.getType(),
"data", event.getData(),
"timestamp", event.getTimestamp()
);
return mapper.writeValueAsString(wsMessage);
} catch (Exception e) {
Log.error("[KAFKA-BRIDGE] Erreur construction message", e);
return "{\"type\":\"error\",\"message\":\"Erreur de traitement\"}";
}
}
}
```
---
## 📤 Étape 6 : Services Publient dans Kafka
### FriendshipService (Modifié)
```java
package com.lions.dev.service;
import com.lions.dev.dto.events.NotificationEvent;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;
import jakarta.enterprise.context.ApplicationScoped;
@ApplicationScoped
public class FriendshipService {
@Inject
@Channel("notifications")
Emitter notificationEmitter;
// ... autres dépendances ...
/**
* Envoie une demande d'amitié (publie dans Kafka).
*/
@Transactional
public FriendshipCreateOneResponseDTO sendFriendRequest(
FriendshipCreateOneRequestDTO request) {
// ... logique métier existante ...
// âś… NOUVEAU: Publier dans Kafka au lieu d'appeler directement WebSocket
try {
NotificationEvent event = new NotificationEvent(
request.getFriendId().toString(), // userId destinataire
"friend_request",
java.util.Map.of(
"fromUserId", request.getUserId().toString(),
"fromFirstName", user.getFirstName(),
"fromLastName", user.getLastName(),
"requestId", response.getFriendshipId().toString()
)
);
notificationEmitter.send(event);
logger.info("[LOG] Événement friend_request publié dans Kafka pour: " +
request.getFriendId());
} catch (Exception e) {
logger.error("[ERROR] Erreur publication Kafka", e);
// Ne pas bloquer la demande d'amitié si Kafka échoue
}
return response;
}
/**
* Accepte une demande d'amitié (publie dans Kafka).
*/
@Transactional
public FriendshipCreateOneResponseDTO acceptFriendRequest(UUID friendshipId) {
// ... logique métier existante ...
// âś… NOUVEAU: Publier dans Kafka
try {
NotificationEvent event = new NotificationEvent(
originalRequest.getUserId().toString(), // userId émetteur
"friend_request_accepted",
java.util.Map.of(
"friendId", friend.getId().toString(),
"friendFirstName", friend.getFirstName(),
"friendLastName", friend.getLastName(),
"friendshipId", response.getFriendshipId().toString()
)
);
notificationEmitter.send(event);
logger.info("[LOG] Événement friend_request_accepted publié dans Kafka");
} catch (Exception e) {
logger.error("[ERROR] Erreur publication Kafka", e);
}
return response;
}
}
```
### MessageService (Modifié)
```java
package com.lions.dev.service;
import com.lions.dev.dto.events.ChatMessageEvent;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;
@ApplicationScoped
public class MessageService {
@Inject
@Channel("chat-messages")
Emitter chatMessageEmitter;
/**
* Envoie un message (publie dans Kafka).
*/
@Transactional
public MessageResponseDTO sendMessage(SendMessageRequestDTO request) {
// ... logique métier existante ...
// âś… NOUVEAU: Publier dans Kafka
try {
ChatMessageEvent event = new ChatMessageEvent();
event.setConversationId(conversation.getId().toString());
event.setSenderId(senderId.toString());
event.setRecipientId(recipientId.toString());
event.setContent(request.getContent());
event.setMessageId(message.getId().toString());
event.setTimestamp(System.currentTimeMillis());
// Utiliser conversationId comme clé Kafka pour garantir l'ordre
chatMessageEmitter.send(org.eclipse.microprofile.reactive.messaging.Message.of(
event,
() -> CompletableFuture.completedFuture(null), // ack
throwable -> {
logger.error("[ERROR] Erreur envoi Kafka", throwable);
return CompletableFuture.completedFuture(null); // nack
}
).addMetadata(org.eclipse.microprofile.reactive.messaging.OutgoingMessageMetadata.builder()
.withKey(conversation.getId().toString())
.build()));
logger.info("[LOG] Message publié dans Kafka: " + message.getId());
} catch (Exception e) {
logger.error("[ERROR] Erreur publication Kafka", e);
// Ne pas bloquer l'envoi du message si Kafka échoue
}
return response;
}
}
```
### SocialPostService (Modifié pour les Réactions)
```java
package com.lions.dev.service;
import com.lions.dev.dto.events.ReactionEvent;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import jakarta.inject.Inject;
@ApplicationScoped
public class SocialPostService {
@Inject
@Channel("reactions")
Emitter reactionEmitter;
/**
* Like un post (publie dans Kafka).
*/
@Transactional
public SocialPost likePost(UUID postId, UUID userId) {
// ... logique métier existante ...
// ✅ NOUVEAU: Publier dans Kafka pour notifier en temps réel
try {
ReactionEvent event = new ReactionEvent();
event.setPostId(postId.toString());
event.setUserId(userId.toString());
event.setReactionType("like");
event.setData(java.util.Map.of(
"postId", postId.toString(),
"userId", userId.toString(),
"likesCount", post.getLikesCount()
));
event.setTimestamp(System.currentTimeMillis());
reactionEmitter.send(event);
logger.info("[LOG] Réaction like publiée dans Kafka pour post: " + postId);
} catch (Exception e) {
logger.error("[ERROR] Erreur publication Kafka", e);
}
return post;
}
}
```
---
## 🎨 Frontend : Amélioration du Service WebSocket
### realtime_notification_service_v2.dart
```dart
import 'dart:async';
import 'dart:convert';
import 'package:flutter/foundation.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as status;
class RealtimeNotificationServiceV2 extends ChangeNotifier {
RealtimeNotificationServiceV2(this.userId, this.authToken);
final String userId;
final String authToken;
WebSocketChannel? _channel;
StreamSubscription? _subscription;
Timer? _heartbeatTimer;
Timer? _reconnectTimer;
bool _isConnected = false;
bool get isConnected => _isConnected;
int _reconnectAttempts = 0;
static const int _maxReconnectAttempts = 5;
static const Duration _heartbeatInterval = Duration(seconds: 30);
static const Duration _reconnectDelay = Duration(seconds: 5);
// Streams pour différents types d'événements
final _friendRequestController = StreamController