Files
unionflow-mobile-apps/setup_keycloak.py
2025-09-19 16:09:21 +00:00

516 lines
19 KiB
Python

#!/usr/bin/env python3
"""
Script de configuration automatique Keycloak pour UnionFlow
Crée le realm, les rôles, le client et tous les utilisateurs nécessaires
"""
import requests
import json
import time
import sys
from typing import Dict, List, Optional
class KeycloakSetup:
def __init__(self, base_url: str = "http://localhost:8180", admin_user: str = "admin", admin_password: str = "admin123"):
self.base_url = base_url
self.admin_user = admin_user
self.admin_password = admin_password
self.admin_token = None
self.session = requests.Session()
def print_status(self, message: str, status: str = "INFO"):
"""Affiche un message avec un statut coloré"""
icons = {"INFO": "🔍", "SUCCESS": "", "ERROR": "", "WARNING": "⚠️"}
print(f"{icons.get(status, '📋')} {message}")
def wait_for_keycloak(self, max_attempts: int = 30) -> bool:
"""Attend que Keycloak soit disponible"""
self.print_status("Attente de la disponibilité de Keycloak...")
for attempt in range(max_attempts):
try:
response = self.session.get(f"{self.base_url}", timeout=5)
if response.status_code == 200:
self.print_status("Keycloak est disponible", "SUCCESS")
return True
except requests.exceptions.RequestException:
pass
if attempt < max_attempts - 1:
time.sleep(2)
self.print_status("Keycloak n'est pas disponible", "ERROR")
return False
def get_admin_token(self) -> bool:
"""Obtient le token d'administration"""
self.print_status("Obtention du token administrateur...")
# Essayons d'abord avec les credentials par défaut
credentials_to_try = [
(self.admin_user, self.admin_password),
("admin", "admin"),
("admin", "password"),
]
for username, password in credentials_to_try:
try:
data = {
"username": username,
"password": password,
"grant_type": "password",
"client_id": "admin-cli"
}
response = self.session.post(
f"{self.base_url}/realms/master/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200:
token_data = response.json()
self.admin_token = token_data.get("access_token")
if self.admin_token:
self.print_status(f"Token obtenu avec {username}/{password}", "SUCCESS")
return True
except Exception as e:
continue
# Si aucun credential ne fonctionne, essayons de créer un admin
self.print_status("Tentative de création d'un compte admin...", "WARNING")
return self._create_initial_admin()
def _create_initial_admin(self) -> bool:
"""Tente de créer un compte admin initial"""
try:
# En mode dev, Keycloak peut permettre la création d'admin via l'API
admin_data = {
"username": self.admin_user,
"password": self.admin_password,
"enabled": True
}
# Essayons plusieurs endpoints possibles
endpoints = [
f"{self.base_url}/admin/realms/master/users",
f"{self.base_url}/auth/admin/realms/master/users"
]
for endpoint in endpoints:
try:
response = self.session.post(
endpoint,
json=admin_data,
headers={"Content-Type": "application/json"}
)
if response.status_code in [201, 409]: # 409 = already exists
return self.get_admin_token()
except:
continue
except Exception as e:
pass
self.print_status("Impossible d'obtenir un token admin. Configuration manuelle requise.", "ERROR")
return False
def create_realm(self, realm_name: str = "unionflow") -> bool:
"""Crée le realm UnionFlow"""
self.print_status(f"Création du realm {realm_name}...")
realm_data = {
"realm": realm_name,
"enabled": True,
"displayName": "UnionFlow",
"loginWithEmailAllowed": True,
"duplicateEmailsAllowed": False,
"resetPasswordAllowed": True,
"editUsernameAllowed": False,
"bruteForceProtected": False,
"registrationAllowed": False,
"rememberMe": True,
"verifyEmail": False,
"loginTheme": "keycloak",
"accountTheme": "keycloak",
"adminTheme": "keycloak",
"emailTheme": "keycloak"
}
try:
response = self.session.post(
f"{self.base_url}/admin/realms",
json=realm_data,
headers={
"Authorization": f"Bearer {self.admin_token}",
"Content-Type": "application/json"
}
)
if response.status_code == 201:
self.print_status(f"Realm {realm_name} créé avec succès", "SUCCESS")
return True
elif response.status_code == 409:
self.print_status(f"Realm {realm_name} existe déjà", "WARNING")
return True
else:
self.print_status(f"Erreur lors de la création du realm: {response.status_code}", "ERROR")
return False
except Exception as e:
self.print_status(f"Exception lors de la création du realm: {e}", "ERROR")
return False
def create_client(self, realm_name: str = "unionflow", client_id: str = "unionflow-mobile") -> bool:
"""Crée le client pour l'application mobile"""
self.print_status(f"Création du client {client_id}...")
client_data = {
"clientId": client_id,
"enabled": True,
"publicClient": True,
"directAccessGrantsEnabled": True,
"standardFlowEnabled": True,
"implicitFlowEnabled": False,
"serviceAccountsEnabled": False,
"authorizationServicesEnabled": False,
"redirectUris": ["*"],
"webOrigins": ["*"],
"protocol": "openid-connect",
"attributes": {
"access.token.lifespan": "300",
"client.secret.creation.time": "0"
}
}
try:
response = self.session.post(
f"{self.base_url}/admin/realms/{realm_name}/clients",
json=client_data,
headers={
"Authorization": f"Bearer {self.admin_token}",
"Content-Type": "application/json"
}
)
if response.status_code == 201:
self.print_status(f"Client {client_id} créé avec succès", "SUCCESS")
return True
elif response.status_code == 409:
self.print_status(f"Client {client_id} existe déjà", "WARNING")
return True
else:
self.print_status(f"Erreur lors de la création du client: {response.status_code}", "ERROR")
return False
except Exception as e:
self.print_status(f"Exception lors de la création du client: {e}", "ERROR")
return False
def create_roles(self, realm_name: str = "unionflow") -> bool:
"""Crée tous les rôles nécessaires"""
roles = [
"SUPER_ADMINISTRATEUR",
"RESPONSABLE_TECHNIQUE",
"RESPONSABLE_MEMBRES",
"MEMBRE_ACTIF",
"MEMBRE_SIMPLE"
]
self.print_status("Création des rôles...")
success_count = 0
for role in roles:
role_data = {
"name": role,
"description": f"Rôle {role} pour UnionFlow"
}
try:
response = self.session.post(
f"{self.base_url}/admin/realms/{realm_name}/roles",
json=role_data,
headers={
"Authorization": f"Bearer {self.admin_token}",
"Content-Type": "application/json"
}
)
if response.status_code in [201, 409]: # 201 = created, 409 = already exists
self.print_status(f" ✓ Rôle {role} configuré", "SUCCESS")
success_count += 1
else:
self.print_status(f" ✗ Erreur pour le rôle {role}: {response.status_code}", "ERROR")
except Exception as e:
self.print_status(f" ✗ Exception pour le rôle {role}: {e}", "ERROR")
return success_count == len(roles)
def create_user(self, realm_name: str, username: str, email: str, first_name: str,
last_name: str, password: str, role: str) -> bool:
"""Crée un utilisateur avec son mot de passe et son rôle"""
self.print_status(f"Création de l'utilisateur {username}...")
# 1. Créer l'utilisateur
user_data = {
"username": username,
"email": email,
"firstName": first_name,
"lastName": last_name,
"enabled": True,
"emailVerified": True,
"credentials": [{
"type": "password",
"value": password,
"temporary": False
}]
}
try:
# Créer l'utilisateur
response = self.session.post(
f"{self.base_url}/admin/realms/{realm_name}/users",
json=user_data,
headers={
"Authorization": f"Bearer {self.admin_token}",
"Content-Type": "application/json"
}
)
if response.status_code == 201:
self.print_status(f" ✓ Utilisateur {username} créé", "SUCCESS")
elif response.status_code == 409:
self.print_status(f" ⚠ Utilisateur {username} existe déjà", "WARNING")
else:
self.print_status(f" ✗ Erreur création utilisateur {username}: {response.status_code}", "ERROR")
return False
# 2. Obtenir l'ID de l'utilisateur
response = self.session.get(
f"{self.base_url}/admin/realms/{realm_name}/users?username={username}",
headers={"Authorization": f"Bearer {self.admin_token}"}
)
if response.status_code != 200:
self.print_status(f" ✗ Impossible de récupérer l'ID de {username}", "ERROR")
return False
users = response.json()
if not users:
self.print_status(f" ✗ Utilisateur {username} non trouvé", "ERROR")
return False
user_id = users[0]["id"]
# 3. Définir le mot de passe (au cas où)
password_data = {
"type": "password",
"value": password,
"temporary": False
}
self.session.put(
f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/reset-password",
json=password_data,
headers={
"Authorization": f"Bearer {self.admin_token}",
"Content-Type": "application/json"
}
)
# 4. Assigner le rôle
role_response = self.session.get(
f"{self.base_url}/admin/realms/{realm_name}/roles/{role}",
headers={"Authorization": f"Bearer {self.admin_token}"}
)
if role_response.status_code == 200:
role_data = role_response.json()
assign_response = self.session.post(
f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm",
json=[role_data],
headers={
"Authorization": f"Bearer {self.admin_token}",
"Content-Type": "application/json"
}
)
if assign_response.status_code in [204, 200]:
self.print_status(f" ✓ Rôle {role} assigné à {username}", "SUCCESS")
else:
self.print_status(f" ⚠ Erreur assignation rôle à {username}", "WARNING")
return True
except Exception as e:
self.print_status(f" ✗ Exception pour {username}: {e}", "ERROR")
return False
def create_all_users(self, realm_name: str = "unionflow") -> bool:
"""Crée tous les utilisateurs nécessaires"""
users = [
{
"username": "superadmin",
"email": "superadmin@unionflow.com",
"first_name": "Super",
"last_name": "Admin",
"password": "SuperAdmin123!",
"role": "SUPER_ADMINISTRATEUR"
},
{
"username": "marie.active",
"email": "marie.active@unionflow.com",
"first_name": "Marie",
"last_name": "Active",
"password": "Marie123!",
"role": "MEMBRE_ACTIF"
},
{
"username": "jean.simple",
"email": "jean.simple@unionflow.com",
"first_name": "Jean",
"last_name": "Simple",
"password": "Jean123!",
"role": "MEMBRE_SIMPLE"
},
{
"username": "tech.lead",
"email": "tech.lead@unionflow.com",
"first_name": "Tech",
"last_name": "Lead",
"password": "TechLead123!",
"role": "RESPONSABLE_TECHNIQUE"
},
{
"username": "rh.manager",
"email": "rh.manager@unionflow.com",
"first_name": "RH",
"last_name": "Manager",
"password": "RhManager123!",
"role": "RESPONSABLE_MEMBRES"
}
]
self.print_status("Création de tous les utilisateurs...")
success_count = 0
for user in users:
if self.create_user(realm_name, **user):
success_count += 1
return success_count == len(users)
def test_authentication(self, realm_name: str = "unionflow", client_id: str = "unionflow-mobile") -> bool:
"""Teste l'authentification de tous les comptes"""
test_accounts = [
("marie.active", "Marie123!"),
("superadmin", "SuperAdmin123!"),
("jean.simple", "Jean123!")
]
self.print_status("Test d'authentification des comptes...")
success_count = 0
for username, password in test_accounts:
try:
data = {
"username": username,
"password": password,
"grant_type": "password",
"client_id": client_id
}
response = self.session.post(
f"{self.base_url}/realms/{realm_name}/protocol/openid-connect/token",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if response.status_code == 200 and "access_token" in response.json():
self.print_status(f"{username} : AUTHENTIFICATION RÉUSSIE", "SUCCESS")
success_count += 1
else:
self.print_status(f"{username} : Échec d'authentification", "ERROR")
except Exception as e:
self.print_status(f"{username} : Exception {e}", "ERROR")
return success_count == len(test_accounts)
def setup_complete(self) -> bool:
"""Exécute la configuration complète"""
self.print_status("=" * 80)
self.print_status("🚀 CONFIGURATION AUTOMATIQUE KEYCLOAK UNIONFLOW")
self.print_status("=" * 80)
# 1. Attendre Keycloak
if not self.wait_for_keycloak():
return False
# 2. Obtenir le token admin
if not self.get_admin_token():
self.print_status("Configuration manuelle requise:", "ERROR")
self.print_status("1. Ouvrez http://localhost:8180", "INFO")
self.print_status("2. Créez un compte admin", "INFO")
self.print_status("3. Relancez ce script", "INFO")
return False
# 3. Créer le realm
if not self.create_realm():
return False
# 4. Créer le client
if not self.create_client():
return False
# 5. Créer les rôles
if not self.create_roles():
return False
# 6. Créer les utilisateurs
if not self.create_all_users():
return False
# 7. Tester l'authentification
time.sleep(2) # Attendre un peu pour que tout soit prêt
if not self.test_authentication():
self.print_status("Certains comptes ne fonctionnent pas encore", "WARNING")
self.print_status("=" * 80)
self.print_status("✅ CONFIGURATION TERMINÉE AVEC SUCCÈS !")
self.print_status("=" * 80)
self.print_status("")
self.print_status("🎯 COMPTES CRÉÉS :")
self.print_status(" • superadmin / SuperAdmin123! (SUPER_ADMINISTRATEUR)")
self.print_status(" • marie.active / Marie123! (MEMBRE_ACTIF)")
self.print_status(" • jean.simple / Jean123! (MEMBRE_SIMPLE)")
self.print_status(" • tech.lead / TechLead123! (RESPONSABLE_TECHNIQUE)")
self.print_status(" • rh.manager / RhManager123! (RESPONSABLE_MEMBRES)")
self.print_status("")
self.print_status("🚀 PRÊT POUR L'APPLICATION MOBILE UNIONFLOW !")
self.print_status(" Testez avec: python test_auth.py")
return True
def main():
"""Fonction principale"""
setup = KeycloakSetup()
try:
success = setup.setup_complete()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n❌ Configuration interrompue par l'utilisateur")
sys.exit(1)
except Exception as e:
print(f"❌ Erreur inattendue: {e}")
sys.exit(1)
if __name__ == "__main__":
main()