276 lines
9.7 KiB
Python
276 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Script pour corriger et créer les utilisateurs UnionFlow
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
|
|
class UserFixer:
|
|
def __init__(self, base_url: str = "http://localhost:8180"):
|
|
self.base_url = base_url
|
|
self.session = requests.Session()
|
|
self.admin_token = None
|
|
|
|
def get_admin_token(self) -> bool:
|
|
"""Obtient le token admin"""
|
|
try:
|
|
data = {
|
|
"username": "admin",
|
|
"password": "admin",
|
|
"grant_type": "password",
|
|
"client_id": "admin-cli"
|
|
}
|
|
|
|
response = self.session.post(
|
|
f"{self.base_url}/realms/master/protocol/openid-connect/token",
|
|
data=data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
token_data = response.json()
|
|
self.admin_token = token_data.get("access_token")
|
|
return self.admin_token is not None
|
|
|
|
except Exception as e:
|
|
print(f"Erreur obtention token: {e}")
|
|
|
|
return False
|
|
|
|
def delete_user_if_exists(self, realm_name: str, username: str) -> bool:
|
|
"""Supprime un utilisateur s'il existe"""
|
|
try:
|
|
# Chercher l'utilisateur
|
|
response = self.session.get(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users?username={username}",
|
|
headers={"Authorization": f"Bearer {self.admin_token}"}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
users = response.json()
|
|
if users:
|
|
user_id = users[0]["id"]
|
|
# Supprimer l'utilisateur
|
|
delete_response = self.session.delete(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}",
|
|
headers={"Authorization": f"Bearer {self.admin_token}"}
|
|
)
|
|
if delete_response.status_code == 204:
|
|
print(f" ✓ Utilisateur {username} supprimé")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ⚠ Erreur suppression {username}: {e}")
|
|
|
|
return False
|
|
|
|
def create_user_complete(self, realm_name: str, username: str, email: str,
|
|
first_name: str, last_name: str, password: str, role: str) -> bool:
|
|
"""Crée un utilisateur complet avec toutes les vérifications"""
|
|
print(f"🔧 Création complète de {username}...")
|
|
|
|
# 1. Supprimer s'il existe
|
|
self.delete_user_if_exists(realm_name, username)
|
|
|
|
# 2. Créer l'utilisateur
|
|
user_data = {
|
|
"username": username,
|
|
"email": email,
|
|
"firstName": first_name,
|
|
"lastName": last_name,
|
|
"enabled": True,
|
|
"emailVerified": True
|
|
}
|
|
|
|
try:
|
|
response = self.session.post(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users",
|
|
json=user_data,
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
print(f" ✗ Erreur création: {response.status_code} - {response.text}")
|
|
return False
|
|
|
|
print(f" ✓ Utilisateur créé")
|
|
|
|
# 3. Obtenir l'ID utilisateur
|
|
response = self.session.get(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users?username={username}",
|
|
headers={"Authorization": f"Bearer {self.admin_token}"}
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
print(f" ✗ Impossible de récupérer l'ID")
|
|
return False
|
|
|
|
users = response.json()
|
|
if not users:
|
|
print(f" ✗ Utilisateur non trouvé après création")
|
|
return False
|
|
|
|
user_id = users[0]["id"]
|
|
print(f" ✓ ID utilisateur: {user_id}")
|
|
|
|
# 4. Définir le mot de passe
|
|
password_data = {
|
|
"type": "password",
|
|
"value": password,
|
|
"temporary": False
|
|
}
|
|
|
|
response = self.session.put(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/reset-password",
|
|
json=password_data,
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if response.status_code == 204:
|
|
print(f" ✓ Mot de passe défini")
|
|
else:
|
|
print(f" ⚠ Erreur mot de passe: {response.status_code}")
|
|
|
|
# 5. Assigner le rôle
|
|
role_response = self.session.get(
|
|
f"{self.base_url}/admin/realms/{realm_name}/roles/{role}",
|
|
headers={"Authorization": f"Bearer {self.admin_token}"}
|
|
)
|
|
|
|
if role_response.status_code == 200:
|
|
role_data = role_response.json()
|
|
|
|
assign_response = self.session.post(
|
|
f"{self.base_url}/admin/realms/{realm_name}/users/{user_id}/role-mappings/realm",
|
|
json=[role_data],
|
|
headers={
|
|
"Authorization": f"Bearer {self.admin_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
)
|
|
|
|
if assign_response.status_code in [204, 200]:
|
|
print(f" ✓ Rôle {role} assigné")
|
|
else:
|
|
print(f" ⚠ Erreur assignation rôle: {assign_response.status_code}")
|
|
else:
|
|
print(f" ⚠ Rôle {role} non trouvé")
|
|
|
|
# 6. Test d'authentification immédiat
|
|
test_data = {
|
|
"username": username,
|
|
"password": password,
|
|
"grant_type": "password",
|
|
"client_id": "unionflow-mobile"
|
|
}
|
|
|
|
test_response = self.session.post(
|
|
f"{self.base_url}/realms/{realm_name}/protocol/openid-connect/token",
|
|
data=test_data,
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"}
|
|
)
|
|
|
|
if test_response.status_code == 200 and "access_token" in test_response.json():
|
|
print(f" ✅ Test d'authentification RÉUSSI")
|
|
return True
|
|
else:
|
|
print(f" ❌ Test d'authentification ÉCHOUÉ: {test_response.status_code}")
|
|
print(f" Réponse: {test_response.text[:100]}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f" ✗ Exception: {e}")
|
|
return False
|
|
|
|
def fix_all_users(self, realm_name: str = "unionflow"):
|
|
"""Corrige tous les utilisateurs"""
|
|
users = [
|
|
{
|
|
"username": "superadmin",
|
|
"email": "superadmin@unionflow.com",
|
|
"first_name": "Super",
|
|
"last_name": "Admin",
|
|
"password": "SuperAdmin123!",
|
|
"role": "SUPER_ADMINISTRATEUR"
|
|
},
|
|
{
|
|
"username": "marie.active",
|
|
"email": "marie.active@unionflow.com",
|
|
"first_name": "Marie",
|
|
"last_name": "Active",
|
|
"password": "Marie123!",
|
|
"role": "MEMBRE_ACTIF"
|
|
},
|
|
{
|
|
"username": "jean.simple",
|
|
"email": "jean.simple@unionflow.com",
|
|
"first_name": "Jean",
|
|
"last_name": "Simple",
|
|
"password": "Jean123!",
|
|
"role": "MEMBRE_SIMPLE"
|
|
},
|
|
{
|
|
"username": "tech.lead",
|
|
"email": "tech.lead@unionflow.com",
|
|
"first_name": "Tech",
|
|
"last_name": "Lead",
|
|
"password": "TechLead123!",
|
|
"role": "RESPONSABLE_TECHNIQUE"
|
|
},
|
|
{
|
|
"username": "rh.manager",
|
|
"email": "rh.manager@unionflow.com",
|
|
"first_name": "RH",
|
|
"last_name": "Manager",
|
|
"password": "RhManager123!",
|
|
"role": "RESPONSABLE_MEMBRES"
|
|
}
|
|
]
|
|
|
|
print("=" * 80)
|
|
print("🔧 CORRECTION DES UTILISATEURS UNIONFLOW")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
if not self.get_admin_token():
|
|
print("❌ Impossible d'obtenir le token admin")
|
|
return False
|
|
|
|
print("✅ Token admin obtenu")
|
|
print()
|
|
|
|
success_count = 0
|
|
for user in users:
|
|
if self.create_user_complete(realm_name, **user):
|
|
success_count += 1
|
|
print()
|
|
|
|
print("=" * 80)
|
|
print(f"📊 RÉSULTAT: {success_count}/{len(users)} utilisateurs créés avec succès")
|
|
print("=" * 80)
|
|
|
|
if success_count == len(users):
|
|
print("🎉 TOUS LES COMPTES FONCTIONNENT !")
|
|
print()
|
|
print("🚀 Testez maintenant avec: python test_auth.py")
|
|
else:
|
|
print("⚠️ Certains comptes ont des problèmes")
|
|
|
|
return success_count == len(users)
|
|
|
|
|
|
def main():
|
|
fixer = UserFixer()
|
|
fixer.fix_all_users()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|