Files
platform-api/app/services/keycloak_service.py
Platform CI f9bfc3afbd init
2026-02-21 17:54:44 -05:00

183 lines
5.8 KiB
Python

"""
Keycloak Service — manages realms, clients and users per tenant.
Each tenant gets its own Keycloak realm, providing full SSO isolation.
Users can optionally configure a custom OIDC/SAML IdP within their realm.
"""
import logging
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from ..core.config import get_settings
log = logging.getLogger(__name__)
def _admin_client() -> KeycloakAdmin:
s = get_settings()
conn = KeycloakOpenIDConnection(
server_url=s.KEYCLOAK_URL,
username=s.KEYCLOAK_ADMIN_USER,
password=s.KEYCLOAK_ADMIN_PASSWORD,
realm_name=s.KEYCLOAK_MASTER_REALM,
verify=False,
)
return KeycloakAdmin(connection=conn)
def realm_name(tenant_slug: str) -> str:
return f"tenant-{tenant_slug}"
async def create_tenant_realm(tenant_slug: str, company_name: str, admin_email: str) -> str:
"""
Create an isolated Keycloak realm for the tenant.
Returns the realm name.
"""
rname = realm_name(tenant_slug)
ka = _admin_client()
existing = [r["realm"] for r in ka.get_realms()]
if rname in existing:
log.info("Realm %s already exists", rname)
return rname
ka.create_realm({
"realm": rname,
"displayName": company_name,
"enabled": True,
"registrationAllowed": False,
"resetPasswordAllowed": True,
"rememberMe": True,
"loginWithEmailAllowed": True,
"duplicateEmailsAllowed": False,
"sslRequired": "external",
"bruteForceProtected": True,
"accessTokenLifespan": 3600,
"ssoSessionIdleTimeout": 86400,
})
# Create platform-api client in the new realm
ka_realm = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
s = get_settings()
base = f"https://app.{s.DOMAIN}"
ka_realm.create_client({
"clientId": "platform-portal",
"name": "Platform Portal",
"enabled": True,
"publicClient": True,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": True,
"redirectUris": [f"{base}/*", f"https://data.{s.DOMAIN}/*"],
"webOrigins": [f"https://app.{s.DOMAIN}", f"https://data.{s.DOMAIN}"],
"protocol": "openid-connect",
})
# Create the initial tenant admin user
ka_realm.create_user({
"email": admin_email,
"username": admin_email,
"enabled": True,
"emailVerified": True,
"realmRoles": ["realm-admin"],
})
log.info("Created realm %s", rname)
return rname
async def delete_tenant_realm(tenant_slug: str) -> None:
rname = realm_name(tenant_slug)
ka = _admin_client()
existing = [r["realm"] for r in ka.get_realms()]
if rname in existing:
ka.delete_realm(rname)
log.info("Deleted realm %s", rname)
async def add_custom_idp(tenant_slug: str, idp_config: dict) -> None:
"""
Add a custom OIDC/SAML Identity Provider to the tenant realm.
idp_config keys: alias, display_name, provider_id (oidc/saml),
authorization_url, token_url, client_id, client_secret
"""
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
ka.create_idp({
"alias": idp_config.get("alias", "custom-sso"),
"displayName": idp_config.get("display_name", "Company SSO"),
"providerId": idp_config.get("provider_id", "oidc"),
"enabled": True,
"trustEmail": True,
"config": {
"authorizationUrl": idp_config.get("authorization_url", ""),
"tokenUrl": idp_config.get("token_url", ""),
"clientId": idp_config.get("client_id", ""),
"clientSecret": idp_config.get("client_secret", ""),
"defaultScope": "openid email profile",
"syncMode": "IMPORT",
},
})
async def create_tenant_user(tenant_slug: str, email: str, first_name: str,
last_name: str, role: str = "member",
temp_password: str = None) -> str:
"""Create a user in the tenant's realm. Returns KC user ID."""
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
user_id = ka.create_user({
"email": email,
"username": email,
"firstName": first_name,
"lastName": last_name,
"enabled": True,
"emailVerified": False,
"attributes": {"platform_role": [role]},
})
if temp_password:
ka.set_user_password(user_id, temp_password, temporary=True)
else:
ka.send_verify_email(user_id)
return user_id
async def get_tenant_users(tenant_slug: str) -> list[dict]:
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
return ka.get_users()
async def delete_tenant_user(tenant_slug: str, kc_user_id: str) -> None:
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
ka.delete_user(kc_user_id)