516 lines
19 KiB
Python
516 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script de configuration automatique Keycloak pour UnionFlow
|
|
Crée le realm, les rôles, le client et tous les utilisateurs nécessaires
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import time
|
|
import sys
|
|
from typing import Dict, List, Optional
|
|
|
|
class KeycloakSetup:
|
|
def __init__(self, base_url: str = "http://localhost:8180", admin_user: str = "admin", admin_password: str = "admin123"):
|
|
self.base_url = base_url
|
|
self.admin_user = admin_user
|
|
self.admin_password = admin_password
|
|
self.admin_token = None
|
|
self.session = requests.Session()
|
|
|
|
def print_status(self, message: str, status: str = "INFO"):
|
|
"""Affiche un message avec un statut coloré"""
|
|
icons = {"INFO": "🔍", "SUCCESS": "✅", "ERROR": "❌", "WARNING": "⚠️"}
|
|
print(f"{icons.get(status, '📋')} {message}")
|
|
|
|
def wait_for_keycloak(self, max_attempts: int = 30) -> bool:
|
|
"""Attend que Keycloak soit disponible"""
|
|
self.print_status("Attente de la disponibilité de Keycloak...")
|
|
|
|
for attempt in range(max_attempts):
|
|
try:
|
|
response = self.session.get(f"{self.base_url}", timeout=5)
|
|
if response.status_code == 200:
|
|
self.print_status("Keycloak est disponible", "SUCCESS")
|
|
return True
|
|
except requests.exceptions.RequestException:
|
|
pass
|
|
|
|
if attempt < max_attempts - 1:
|
|
time.sleep(2)
|
|
|
|
self.print_status("Keycloak n'est pas disponible", "ERROR")
|
|
return False
|
|
|
|
def get_admin_token(self) -> bool:
|
|
"""Obtient le token d'administration"""
|
|
self.print_status("Obtention du token administrateur...")
|
|
|
|
# Essayons d'abord avec les credentials par défaut
|
|
credentials_to_try = [
|
|
(self.admin_user, self.admin_password),
|
|
("admin", "admin"),
|
|
("admin", "password"),
|
|
]
|
|
|
|
for username, password in credentials_to_try:
|
|
try:
|
|
data = {
|
|
"username": username,
|
|
"password": password,
|
|
"grant_type": "password",
|
|
"client_id": "admin-cli"
|
|
}
|
|
|
|
response = self.session.post(
|
|
f"{self.base_url}/realms/master/protocol/openid-connect/token",
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
token_data = response.json()
|
|
self.admin_token = token_data.get("access_token")
|
|
if self.admin_token:
|
|
self.print_status(f"Token obtenu avec {username}/{password}", "SUCCESS")
|
|
return True
|
|
|
|
except Exception as e:
|
|
continue
|
|
|
|
# Si aucun credential ne fonctionne, essayons de créer un admin
|
|
self.print_status("Tentative de création d'un compte admin...", "WARNING")
|
|
return self._create_initial_admin()
|
|
|
|
def _create_initial_admin(self) -> bool:
|
|
"""Tente de créer un compte admin initial"""
|
|
try:
|
|
# En mode dev, Keycloak peut permettre la création d'admin via l'API
|
|
admin_data = {
|
|
"username": self.admin_user,
|
|
"password": self.admin_password,
|
|
"enabled": True
|
|
}
|
|
|
|
# Essayons plusieurs endpoints possibles
|
|
endpoints = [
|
|
f"{self.base_url}/admin/realms/master/users",
|
|
f"{self.base_url}/auth/admin/realms/master/users"
|
|
]
|
|
|
|
for endpoint in endpoints:
|
|
try:
|
|
response = self.session.post(
|
|
endpoint,
|
|
json=admin_data,
|
|
headers={"Content-Type": "application/json"}
|
|
)
|
|
if response.status_code in [201, 409]: # 409 = already exists
|
|
return self.get_admin_token()
|
|
except:
|
|
continue
|
|
|
|
except Exception as e:
|
|
pass
|
|
|
|
self.print_status("Impossible d'obtenir un token admin. Configuration manuelle requise.", "ERROR")
|
|
return False
|
|
|
|
def create_realm(self, realm_name: str = "unionflow") -> bool:
|
|
"""Crée le realm UnionFlow"""
|
|
self.print_status(f"Création du realm {realm_name}...")
|
|
|
|
realm_data = {
|
|
"realm": realm_name,
|
|
"enabled": True,
|
|
"displayName": "UnionFlow",
|
|
"loginWithEmailAllowed": True,
|
|
"duplicateEmailsAllowed": False,
|
|
"resetPasswordAllowed": True,
|
|
"editUsernameAllowed": False,
|
|
"bruteForceProtected": False,
|
|
"registrationAllowed": False,
|
|
"rememberMe": True,
|
|
"verifyEmail": False,
|
|
"loginTheme": "keycloak",
|
|
"accountTheme": "keycloak",
|
|
"adminTheme": "keycloak",
|
|
"emailTheme": "keycloak"
|
|
}
|
|
|
|
try:
|
|
response = self.session.post(
|
|
f"{self.base_url}/admin/realms",
|
|
json=realm_data,
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
self.print_status(f"Realm {realm_name} créé avec succès", "SUCCESS")
|
|
return True
|
|
elif response.status_code == 409:
|
|
self.print_status(f"Realm {realm_name} existe déjà", "WARNING")
|
|
return True
|
|
else:
|
|
self.print_status(f"Erreur lors de la création du realm: {response.status_code}", "ERROR")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.print_status(f"Exception lors de la création du realm: {e}", "ERROR")
|
|
return False
|
|
|
|
def create_client(self, realm_name: str = "unionflow", client_id: str = "unionflow-mobile") -> bool:
|
|
"""Crée le client pour l'application mobile"""
|
|
self.print_status(f"Création du client {client_id}...")
|
|
|
|
client_data = {
|
|
"clientId": client_id,
|
|
"enabled": True,
|
|
"publicClient": True,
|
|
"directAccessGrantsEnabled": True,
|
|
"standardFlowEnabled": True,
|
|
"implicitFlowEnabled": False,
|
|
"serviceAccountsEnabled": False,
|
|
"authorizationServicesEnabled": False,
|
|
"redirectUris": ["*"],
|
|
"webOrigins": ["*"],
|
|
"protocol": "openid-connect",
|
|
"attributes": {
|
|
"access.token.lifespan": "300",
|
|
"client.secret.creation.time": "0"
|
|
}
|
|
}
|
|
|
|
try:
|
|
response = self.session.post(
|
|
f"{self.base_url}/admin/realms/{realm_name}/clients",
|
|
json=client_data,
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
self.print_status(f"Client {client_id} créé avec succès", "SUCCESS")
|
|
return True
|
|
elif response.status_code == 409:
|
|
self.print_status(f"Client {client_id} existe déjà", "WARNING")
|
|
return True
|
|
else:
|
|
self.print_status(f"Erreur lors de la création du client: {response.status_code}", "ERROR")
|
|
return False
|
|
|
|
except Exception as e:
|
|
self.print_status(f"Exception lors de la création du client: {e}", "ERROR")
|
|
return False
|
|
|
|
def create_roles(self, realm_name: str = "unionflow") -> bool:
|
|
"""Crée tous les rôles nécessaires"""
|
|
roles = [
|
|
"SUPER_ADMINISTRATEUR",
|
|
"RESPONSABLE_TECHNIQUE",
|
|
"RESPONSABLE_MEMBRES",
|
|
"MEMBRE_ACTIF",
|
|
"MEMBRE_SIMPLE"
|
|
]
|
|
|
|
self.print_status("Création des rôles...")
|
|
|
|
success_count = 0
|
|
for role in roles:
|
|
role_data = {
|
|
"name": role,
|
|
"description": f"Rôle {role} pour UnionFlow"
|
|
}
|
|
|
|
try:
|
|
response = self.session.post(
|
|
f"{self.base_url}/admin/realms/{realm_name}/roles",
|
|
json=role_data,
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if response.status_code in [201, 409]: # 201 = created, 409 = already exists
|
|
self.print_status(f" ✓ Rôle {role} configuré", "SUCCESS")
|
|
success_count += 1
|
|
else:
|
|
self.print_status(f" ✗ Erreur pour le rôle {role}: {response.status_code}", "ERROR")
|
|
|
|
except Exception as e:
|
|
self.print_status(f" ✗ Exception pour le rôle {role}: {e}", "ERROR")
|
|
|
|
return success_count == len(roles)
|
|
|
|
def create_user(self, realm_name: str, username: str, email: str, first_name: str,
|
|
last_name: str, password: str, role: str) -> bool:
|
|
"""Crée un utilisateur avec son mot de passe et son rôle"""
|
|
self.print_status(f"Création de l'utilisateur {username}...")
|
|
|
|
# 1. Créer l'utilisateur
|
|
user_data = {
|
|
"username": username,
|
|
"email": email,
|
|
"firstName": first_name,
|
|
"lastName": last_name,
|
|
"enabled": True,
|
|
"emailVerified": True,
|
|
"credentials": [{
|
|
"type": "password",
|
|
"value": password,
|
|
"temporary": False
|
|
}]
|
|
}
|
|
|
|
try:
|
|
# Créer l'utilisateur
|
|
response = self.session.post(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users",
|
|
json=user_data,
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
self.print_status(f" ✓ Utilisateur {username} créé", "SUCCESS")
|
|
elif response.status_code == 409:
|
|
self.print_status(f" ⚠ Utilisateur {username} existe déjà", "WARNING")
|
|
else:
|
|
self.print_status(f" ✗ Erreur création utilisateur {username}: {response.status_code}", "ERROR")
|
|
return False
|
|
|
|
# 2. Obtenir l'ID de l'utilisateur
|
|
response = self.session.get(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users?username={username}",
|
|
headers={"Authorization": f"Bearer {self.admin_token}"}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
self.print_status(f" ✗ Impossible de récupérer l'ID de {username}", "ERROR")
|
|
return False
|
|
|
|
users = response.json()
|
|
if not users:
|
|
self.print_status(f" ✗ Utilisateur {username} non trouvé", "ERROR")
|
|
return False
|
|
|
|
user_id = users[0]["id"]
|
|
|
|
# 3. Définir le mot de passe (au cas où)
|
|
password_data = {
|
|
"type": "password",
|
|
"value": password,
|
|
"temporary": False
|
|
}
|
|
|
|
self.session.put(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/reset-password",
|
|
json=password_data,
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
# 4. Assigner le rôle
|
|
role_response = self.session.get(
|
|
f"{self.base_url}/admin/realms/{realm_name}/roles/{role}",
|
|
headers={"Authorization": f"Bearer {self.admin_token}"}
|
|
)
|
|
|
|
if role_response.status_code == 200:
|
|
role_data = role_response.json()
|
|
|
|
assign_response = self.session.post(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm",
|
|
json=[role_data],
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if assign_response.status_code in [204, 200]:
|
|
self.print_status(f" ✓ Rôle {role} assigné à {username}", "SUCCESS")
|
|
else:
|
|
self.print_status(f" ⚠ Erreur assignation rôle à {username}", "WARNING")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
self.print_status(f" ✗ Exception pour {username}: {e}", "ERROR")
|
|
return False
|
|
|
|
def create_all_users(self, realm_name: str = "unionflow") -> bool:
|
|
"""Crée tous les utilisateurs nécessaires"""
|
|
users = [
|
|
{
|
|
"username": "superadmin",
|
|
"email": "superadmin@unionflow.com",
|
|
"first_name": "Super",
|
|
"last_name": "Admin",
|
|
"password": "SuperAdmin123!",
|
|
"role": "SUPER_ADMINISTRATEUR"
|
|
},
|
|
{
|
|
"username": "marie.active",
|
|
"email": "marie.active@unionflow.com",
|
|
"first_name": "Marie",
|
|
"last_name": "Active",
|
|
"password": "Marie123!",
|
|
"role": "MEMBRE_ACTIF"
|
|
},
|
|
{
|
|
"username": "jean.simple",
|
|
"email": "jean.simple@unionflow.com",
|
|
"first_name": "Jean",
|
|
"last_name": "Simple",
|
|
"password": "Jean123!",
|
|
"role": "MEMBRE_SIMPLE"
|
|
},
|
|
{
|
|
"username": "tech.lead",
|
|
"email": "tech.lead@unionflow.com",
|
|
"first_name": "Tech",
|
|
"last_name": "Lead",
|
|
"password": "TechLead123!",
|
|
"role": "RESPONSABLE_TECHNIQUE"
|
|
},
|
|
{
|
|
"username": "rh.manager",
|
|
"email": "rh.manager@unionflow.com",
|
|
"first_name": "RH",
|
|
"last_name": "Manager",
|
|
"password": "RhManager123!",
|
|
"role": "RESPONSABLE_MEMBRES"
|
|
}
|
|
]
|
|
|
|
self.print_status("Création de tous les utilisateurs...")
|
|
success_count = 0
|
|
|
|
for user in users:
|
|
if self.create_user(realm_name, **user):
|
|
success_count += 1
|
|
|
|
return success_count == len(users)
|
|
|
|
def test_authentication(self, realm_name: str = "unionflow", client_id: str = "unionflow-mobile") -> bool:
|
|
"""Teste l'authentification de tous les comptes"""
|
|
test_accounts = [
|
|
("marie.active", "Marie123!"),
|
|
("superadmin", "SuperAdmin123!"),
|
|
("jean.simple", "Jean123!")
|
|
]
|
|
|
|
self.print_status("Test d'authentification des comptes...")
|
|
success_count = 0
|
|
|
|
for username, password in test_accounts:
|
|
try:
|
|
data = {
|
|
"username": username,
|
|
"password": password,
|
|
"grant_type": "password",
|
|
"client_id": client_id
|
|
}
|
|
|
|
response = self.session.post(
|
|
f"{self.base_url}/realms/{realm_name}/protocol/openid-connect/token",
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
if response.status_code == 200 and "access_token" in response.json():
|
|
self.print_status(f" ✓ {username} : AUTHENTIFICATION RÉUSSIE", "SUCCESS")
|
|
success_count += 1
|
|
else:
|
|
self.print_status(f" ✗ {username} : Échec d'authentification", "ERROR")
|
|
|
|
except Exception as e:
|
|
self.print_status(f" ✗ {username} : Exception {e}", "ERROR")
|
|
|
|
return success_count == len(test_accounts)
|
|
|
|
def setup_complete(self) -> bool:
|
|
"""Exécute la configuration complète"""
|
|
self.print_status("=" * 80)
|
|
self.print_status("🚀 CONFIGURATION AUTOMATIQUE KEYCLOAK UNIONFLOW")
|
|
self.print_status("=" * 80)
|
|
|
|
# 1. Attendre Keycloak
|
|
if not self.wait_for_keycloak():
|
|
return False
|
|
|
|
# 2. Obtenir le token admin
|
|
if not self.get_admin_token():
|
|
self.print_status("Configuration manuelle requise:", "ERROR")
|
|
self.print_status("1. Ouvrez http://localhost:8180", "INFO")
|
|
self.print_status("2. Créez un compte admin", "INFO")
|
|
self.print_status("3. Relancez ce script", "INFO")
|
|
return False
|
|
|
|
# 3. Créer le realm
|
|
if not self.create_realm():
|
|
return False
|
|
|
|
# 4. Créer le client
|
|
if not self.create_client():
|
|
return False
|
|
|
|
# 5. Créer les rôles
|
|
if not self.create_roles():
|
|
return False
|
|
|
|
# 6. Créer les utilisateurs
|
|
if not self.create_all_users():
|
|
return False
|
|
|
|
# 7. Tester l'authentification
|
|
time.sleep(2) # Attendre un peu pour que tout soit prêt
|
|
if not self.test_authentication():
|
|
self.print_status("Certains comptes ne fonctionnent pas encore", "WARNING")
|
|
|
|
self.print_status("=" * 80)
|
|
self.print_status("✅ CONFIGURATION TERMINÉE AVEC SUCCÈS !")
|
|
self.print_status("=" * 80)
|
|
self.print_status("")
|
|
self.print_status("🎯 COMPTES CRÉÉS :")
|
|
self.print_status(" • superadmin / SuperAdmin123! (SUPER_ADMINISTRATEUR)")
|
|
self.print_status(" • marie.active / Marie123! (MEMBRE_ACTIF)")
|
|
self.print_status(" • jean.simple / Jean123! (MEMBRE_SIMPLE)")
|
|
self.print_status(" • tech.lead / TechLead123! (RESPONSABLE_TECHNIQUE)")
|
|
self.print_status(" • rh.manager / RhManager123! (RESPONSABLE_MEMBRES)")
|
|
self.print_status("")
|
|
self.print_status("🚀 PRÊT POUR L'APPLICATION MOBILE UNIONFLOW !")
|
|
self.print_status(" Testez avec: python test_auth.py")
|
|
|
|
return True
|
|
|
|
|
|
def main():
|
|
"""Fonction principale"""
|
|
setup = KeycloakSetup()
|
|
|
|
try:
|
|
success = setup.setup_complete()
|
|
sys.exit(0 if success else 1)
|
|
except KeyboardInterrupt:
|
|
print("\n❌ Configuration interrompue par l'utilisateur")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"❌ Erreur inattendue: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|