#!/usr/bin/env python3 """ Script de configuration automatique Keycloak pour UnionFlow Crée le realm, les rôles, le client et tous les utilisateurs nécessaires """ import requests import json import time import sys from typing import Dict, List, Optional class KeycloakSetup: def __init__(self, base_url: str = "http://localhost:8180", admin_user: str = "admin", admin_password: str = "admin123"): self.base_url = base_url self.admin_user = admin_user self.admin_password = admin_password self.admin_token = None self.session = requests.Session() def print_status(self, message: str, status: str = "INFO"): """Affiche un message avec un statut coloré""" icons = {"INFO": "🔍", "SUCCESS": "✅", "ERROR": "❌", "WARNING": "⚠️"} print(f"{icons.get(status, '📋')} {message}") def wait_for_keycloak(self, max_attempts: int = 30) -> bool: """Attend que Keycloak soit disponible""" self.print_status("Attente de la disponibilité de Keycloak...") for attempt in range(max_attempts): try: response = self.session.get(f"{self.base_url}", timeout=5) if response.status_code == 200: self.print_status("Keycloak est disponible", "SUCCESS") return True except requests.exceptions.RequestException: pass if attempt < max_attempts - 1: time.sleep(2) self.print_status("Keycloak n'est pas disponible", "ERROR") return False def get_admin_token(self) -> bool: """Obtient le token d'administration""" self.print_status("Obtention du token administrateur...") # Essayons d'abord avec les credentials par défaut credentials_to_try = [ (self.admin_user, self.admin_password), ("admin", "admin"), ("admin", "password"), ] for username, password in credentials_to_try: try: data = { "username": username, "password": password, "grant_type": "password", "client_id": "admin-cli" } response = self.session.post( f"{self.base_url}/realms/master/protocol/openid-connect/token", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if response.status_code == 200: token_data = response.json() self.admin_token = token_data.get("access_token") if self.admin_token: self.print_status(f"Token obtenu avec {username}/{password}", "SUCCESS") return True except Exception as e: continue # Si aucun credential ne fonctionne, essayons de créer un admin self.print_status("Tentative de création d'un compte admin...", "WARNING") return self._create_initial_admin() def _create_initial_admin(self) -> bool: """Tente de créer un compte admin initial""" try: # En mode dev, Keycloak peut permettre la création d'admin via l'API admin_data = { "username": self.admin_user, "password": self.admin_password, "enabled": True } # Essayons plusieurs endpoints possibles endpoints = [ f"{self.base_url}/admin/realms/master/users", f"{self.base_url}/auth/admin/realms/master/users" ] for endpoint in endpoints: try: response = self.session.post( endpoint, json=admin_data, headers={"Content-Type": "application/json"} ) if response.status_code in [201, 409]: # 409 = already exists return self.get_admin_token() except: continue except Exception as e: pass self.print_status("Impossible d'obtenir un token admin. Configuration manuelle requise.", "ERROR") return False def create_realm(self, realm_name: str = "unionflow") -> bool: """Crée le realm UnionFlow""" self.print_status(f"Création du realm {realm_name}...") realm_data = { "realm": realm_name, "enabled": True, "displayName": "UnionFlow", "loginWithEmailAllowed": True, "duplicateEmailsAllowed": False, "resetPasswordAllowed": True, "editUsernameAllowed": False, "bruteForceProtected": False, "registrationAllowed": False, "rememberMe": True, "verifyEmail": False, "loginTheme": "keycloak", "accountTheme": "keycloak", "adminTheme": "keycloak", "emailTheme": "keycloak" } try: response = self.session.post( f"{self.base_url}/admin/realms", json=realm_data, headers={ "Authorization": f"Bearer {self.admin_token}", "Content-Type": "application/json" } ) if response.status_code == 201: self.print_status(f"Realm {realm_name} créé avec succès", "SUCCESS") return True elif response.status_code == 409: self.print_status(f"Realm {realm_name} existe déjà", "WARNING") return True else: self.print_status(f"Erreur lors de la création du realm: {response.status_code}", "ERROR") return False except Exception as e: self.print_status(f"Exception lors de la création du realm: {e}", "ERROR") return False def create_client(self, realm_name: str = "unionflow", client_id: str = "unionflow-mobile") -> bool: """Crée le client pour l'application mobile""" self.print_status(f"Création du client {client_id}...") client_data = { "clientId": client_id, "enabled": True, "publicClient": True, "directAccessGrantsEnabled": True, "standardFlowEnabled": True, "implicitFlowEnabled": False, "serviceAccountsEnabled": False, "authorizationServicesEnabled": False, "redirectUris": ["*"], "webOrigins": ["*"], "protocol": "openid-connect", "attributes": { "access.token.lifespan": "300", "client.secret.creation.time": "0" } } try: response = self.session.post( f"{self.base_url}/admin/realms/{realm_name}/clients", json=client_data, headers={ "Authorization": f"Bearer {self.admin_token}", "Content-Type": "application/json" } ) if response.status_code == 201: self.print_status(f"Client {client_id} créé avec succès", "SUCCESS") return True elif response.status_code == 409: self.print_status(f"Client {client_id} existe déjà", "WARNING") return True else: self.print_status(f"Erreur lors de la création du client: {response.status_code}", "ERROR") return False except Exception as e: self.print_status(f"Exception lors de la création du client: {e}", "ERROR") return False def create_roles(self, realm_name: str = "unionflow") -> bool: """Crée tous les rôles nécessaires""" roles = [ "SUPER_ADMINISTRATEUR", "RESPONSABLE_TECHNIQUE", "RESPONSABLE_MEMBRES", "MEMBRE_ACTIF", "MEMBRE_SIMPLE" ] self.print_status("Création des rôles...") success_count = 0 for role in roles: role_data = { "name": role, "description": f"Rôle {role} pour UnionFlow" } try: response = self.session.post( f"{self.base_url}/admin/realms/{realm_name}/roles", json=role_data, headers={ "Authorization": f"Bearer {self.admin_token}", "Content-Type": "application/json" } ) if response.status_code in [201, 409]: # 201 = created, 409 = already exists self.print_status(f" ✓ Rôle {role} configuré", "SUCCESS") success_count += 1 else: self.print_status(f" ✗ Erreur pour le rôle {role}: {response.status_code}", "ERROR") except Exception as e: self.print_status(f" ✗ Exception pour le rôle {role}: {e}", "ERROR") return success_count == len(roles) def create_user(self, realm_name: str, username: str, email: str, first_name: str, last_name: str, password: str, role: str) -> bool: """Crée un utilisateur avec son mot de passe et son rôle""" self.print_status(f"Création de l'utilisateur {username}...") # 1. Créer l'utilisateur user_data = { "username": username, "email": email, "firstName": first_name, "lastName": last_name, "enabled": True, "emailVerified": True, "credentials": [{ "type": "password", "value": password, "temporary": False }] } try: # Créer l'utilisateur response = self.session.post( f"{self.base_url}/admin/realms/{realm_name}/users", json=user_data, headers={ "Authorization": f"Bearer {self.admin_token}", "Content-Type": "application/json" } ) if response.status_code == 201: self.print_status(f" ✓ Utilisateur {username} créé", "SUCCESS") elif response.status_code == 409: self.print_status(f" ⚠ Utilisateur {username} existe déjà", "WARNING") else: self.print_status(f" ✗ Erreur création utilisateur {username}: {response.status_code}", "ERROR") return False # 2. Obtenir l'ID de l'utilisateur response = self.session.get( f"{self.base_url}/admin/realms/{realm_name}/users?username={username}", headers={"Authorization": f"Bearer {self.admin_token}"} ) if response.status_code != 200: self.print_status(f" ✗ Impossible de récupérer l'ID de {username}", "ERROR") return False users = response.json() if not users: self.print_status(f" ✗ Utilisateur {username} non trouvé", "ERROR") return False user_id = users[0]["id"] # 3. Définir le mot de passe (au cas où) password_data = { "type": "password", "value": password, "temporary": False } self.session.put( f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/reset-password", json=password_data, headers={ "Authorization": f"Bearer {self.admin_token}", "Content-Type": "application/json" } ) # 4. Assigner le rôle role_response = self.session.get( f"{self.base_url}/admin/realms/{realm_name}/roles/{role}", headers={"Authorization": f"Bearer {self.admin_token}"} ) if role_response.status_code == 200: role_data = role_response.json() assign_response = self.session.post( f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm", json=[role_data], headers={ "Authorization": f"Bearer {self.admin_token}", "Content-Type": "application/json" } ) if assign_response.status_code in [204, 200]: self.print_status(f" ✓ Rôle {role} assigné à {username}", "SUCCESS") else: self.print_status(f" ⚠ Erreur assignation rôle à {username}", "WARNING") return True except Exception as e: self.print_status(f" ✗ Exception pour {username}: {e}", "ERROR") return False def create_all_users(self, realm_name: str = "unionflow") -> bool: """Crée tous les utilisateurs nécessaires""" users = [ { "username": "superadmin", "email": "superadmin@unionflow.com", "first_name": "Super", "last_name": "Admin", "password": "SuperAdmin123!", "role": "SUPER_ADMINISTRATEUR" }, { "username": "marie.active", "email": "marie.active@unionflow.com", "first_name": "Marie", "last_name": "Active", "password": "Marie123!", "role": "MEMBRE_ACTIF" }, { "username": "jean.simple", "email": "jean.simple@unionflow.com", "first_name": "Jean", "last_name": "Simple", "password": "Jean123!", "role": "MEMBRE_SIMPLE" }, { "username": "tech.lead", "email": "tech.lead@unionflow.com", "first_name": "Tech", "last_name": "Lead", "password": "TechLead123!", "role": "RESPONSABLE_TECHNIQUE" }, { "username": "rh.manager", "email": "rh.manager@unionflow.com", "first_name": "RH", "last_name": "Manager", "password": "RhManager123!", "role": "RESPONSABLE_MEMBRES" } ] self.print_status("Création de tous les utilisateurs...") success_count = 0 for user in users: if self.create_user(realm_name, **user): success_count += 1 return success_count == len(users) def test_authentication(self, realm_name: str = "unionflow", client_id: str = "unionflow-mobile") -> bool: """Teste l'authentification de tous les comptes""" test_accounts = [ ("marie.active", "Marie123!"), ("superadmin", "SuperAdmin123!"), ("jean.simple", "Jean123!") ] self.print_status("Test d'authentification des comptes...") success_count = 0 for username, password in test_accounts: try: data = { "username": username, "password": password, "grant_type": "password", "client_id": client_id } response = self.session.post( f"{self.base_url}/realms/{realm_name}/protocol/openid-connect/token", data=data, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if response.status_code == 200 and "access_token" in response.json(): self.print_status(f" ✓ {username} : AUTHENTIFICATION RÉUSSIE", "SUCCESS") success_count += 1 else: self.print_status(f" ✗ {username} : Échec d'authentification", "ERROR") except Exception as e: self.print_status(f" ✗ {username} : Exception {e}", "ERROR") return success_count == len(test_accounts) def setup_complete(self) -> bool: """Exécute la configuration complète""" self.print_status("=" * 80) self.print_status("🚀 CONFIGURATION AUTOMATIQUE KEYCLOAK UNIONFLOW") self.print_status("=" * 80) # 1. Attendre Keycloak if not self.wait_for_keycloak(): return False # 2. Obtenir le token admin if not self.get_admin_token(): self.print_status("Configuration manuelle requise:", "ERROR") self.print_status("1. Ouvrez http://localhost:8180", "INFO") self.print_status("2. Créez un compte admin", "INFO") self.print_status("3. Relancez ce script", "INFO") return False # 3. Créer le realm if not self.create_realm(): return False # 4. Créer le client if not self.create_client(): return False # 5. Créer les rôles if not self.create_roles(): return False # 6. Créer les utilisateurs if not self.create_all_users(): return False # 7. Tester l'authentification time.sleep(2) # Attendre un peu pour que tout soit prêt if not self.test_authentication(): self.print_status("Certains comptes ne fonctionnent pas encore", "WARNING") self.print_status("=" * 80) self.print_status("✅ CONFIGURATION TERMINÉE AVEC SUCCÈS !") self.print_status("=" * 80) self.print_status("") self.print_status("🎯 COMPTES CRÉÉS :") self.print_status(" • superadmin / SuperAdmin123! (SUPER_ADMINISTRATEUR)") self.print_status(" • marie.active / Marie123! (MEMBRE_ACTIF)") self.print_status(" • jean.simple / Jean123! (MEMBRE_SIMPLE)") self.print_status(" • tech.lead / TechLead123! (RESPONSABLE_TECHNIQUE)") self.print_status(" • rh.manager / RhManager123! (RESPONSABLE_MEMBRES)") self.print_status("") self.print_status("🚀 PRÊT POUR L'APPLICATION MOBILE UNIONFLOW !") self.print_status(" Testez avec: python test_auth.py") return True def main(): """Fonction principale""" setup = KeycloakSetup() try: success = setup.setup_complete() sys.exit(0 if success else 1) except KeyboardInterrupt: print("\n❌ Configuration interrompue par l'utilisateur") sys.exit(1) except Exception as e: print(f"❌ Erreur inattendue: {e}") sys.exit(1) if __name__ == "__main__": main()