183 lines
5.8 KiB
Python
183 lines
5.8 KiB
Python
"""
|
|
Keycloak Service — manages realms, clients and users per tenant.
|
|
|
|
Each tenant gets its own Keycloak realm, providing full SSO isolation.
|
|
Users can optionally configure a custom OIDC/SAML IdP within their realm.
|
|
"""
|
|
import logging
|
|
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
|
|
from ..core.config import get_settings
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _admin_client() -> KeycloakAdmin:
|
|
s = get_settings()
|
|
conn = KeycloakOpenIDConnection(
|
|
server_url=s.KEYCLOAK_URL,
|
|
username=s.KEYCLOAK_ADMIN_USER,
|
|
password=s.KEYCLOAK_ADMIN_PASSWORD,
|
|
realm_name=s.KEYCLOAK_MASTER_REALM,
|
|
verify=False,
|
|
)
|
|
return KeycloakAdmin(connection=conn)
|
|
|
|
|
|
def realm_name(tenant_slug: str) -> str:
|
|
return f"tenant-{tenant_slug}"
|
|
|
|
|
|
async def create_tenant_realm(tenant_slug: str, company_name: str, admin_email: str) -> str:
|
|
"""
|
|
Create an isolated Keycloak realm for the tenant.
|
|
Returns the realm name.
|
|
"""
|
|
rname = realm_name(tenant_slug)
|
|
ka = _admin_client()
|
|
|
|
existing = [r["realm"] for r in ka.get_realms()]
|
|
if rname in existing:
|
|
log.info("Realm %s already exists", rname)
|
|
return rname
|
|
|
|
ka.create_realm({
|
|
"realm": rname,
|
|
"displayName": company_name,
|
|
"enabled": True,
|
|
"registrationAllowed": False,
|
|
"resetPasswordAllowed": True,
|
|
"rememberMe": True,
|
|
"loginWithEmailAllowed": True,
|
|
"duplicateEmailsAllowed": False,
|
|
"sslRequired": "external",
|
|
"bruteForceProtected": True,
|
|
"accessTokenLifespan": 3600,
|
|
"ssoSessionIdleTimeout": 86400,
|
|
})
|
|
|
|
# Create platform-api client in the new realm
|
|
ka_realm = KeycloakAdmin(
|
|
server_url=get_settings().KEYCLOAK_URL,
|
|
username=get_settings().KEYCLOAK_ADMIN_USER,
|
|
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
|
|
realm_name=rname,
|
|
verify=False,
|
|
)
|
|
s = get_settings()
|
|
base = f"https://app.{s.DOMAIN}"
|
|
ka_realm.create_client({
|
|
"clientId": "platform-portal",
|
|
"name": "Platform Portal",
|
|
"enabled": True,
|
|
"publicClient": True,
|
|
"standardFlowEnabled": True,
|
|
"directAccessGrantsEnabled": True,
|
|
"redirectUris": [f"{base}/*", f"https://data.{s.DOMAIN}/*"],
|
|
"webOrigins": [f"https://app.{s.DOMAIN}", f"https://data.{s.DOMAIN}"],
|
|
"protocol": "openid-connect",
|
|
})
|
|
|
|
# Create the initial tenant admin user
|
|
ka_realm.create_user({
|
|
"email": admin_email,
|
|
"username": admin_email,
|
|
"enabled": True,
|
|
"emailVerified": True,
|
|
"realmRoles": ["realm-admin"],
|
|
})
|
|
|
|
log.info("Created realm %s", rname)
|
|
return rname
|
|
|
|
|
|
async def delete_tenant_realm(tenant_slug: str) -> None:
|
|
rname = realm_name(tenant_slug)
|
|
ka = _admin_client()
|
|
existing = [r["realm"] for r in ka.get_realms()]
|
|
if rname in existing:
|
|
ka.delete_realm(rname)
|
|
log.info("Deleted realm %s", rname)
|
|
|
|
|
|
async def add_custom_idp(tenant_slug: str, idp_config: dict) -> None:
|
|
"""
|
|
Add a custom OIDC/SAML Identity Provider to the tenant realm.
|
|
idp_config keys: alias, display_name, provider_id (oidc/saml),
|
|
authorization_url, token_url, client_id, client_secret
|
|
"""
|
|
rname = realm_name(tenant_slug)
|
|
ka = KeycloakAdmin(
|
|
server_url=get_settings().KEYCLOAK_URL,
|
|
username=get_settings().KEYCLOAK_ADMIN_USER,
|
|
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
|
|
realm_name=rname,
|
|
verify=False,
|
|
)
|
|
ka.create_idp({
|
|
"alias": idp_config.get("alias", "custom-sso"),
|
|
"displayName": idp_config.get("display_name", "Company SSO"),
|
|
"providerId": idp_config.get("provider_id", "oidc"),
|
|
"enabled": True,
|
|
"trustEmail": True,
|
|
"config": {
|
|
"authorizationUrl": idp_config.get("authorization_url", ""),
|
|
"tokenUrl": idp_config.get("token_url", ""),
|
|
"clientId": idp_config.get("client_id", ""),
|
|
"clientSecret": idp_config.get("client_secret", ""),
|
|
"defaultScope": "openid email profile",
|
|
"syncMode": "IMPORT",
|
|
},
|
|
})
|
|
|
|
|
|
async def create_tenant_user(tenant_slug: str, email: str, first_name: str,
|
|
last_name: str, role: str = "member",
|
|
temp_password: str = None) -> str:
|
|
"""Create a user in the tenant's realm. Returns KC user ID."""
|
|
rname = realm_name(tenant_slug)
|
|
ka = KeycloakAdmin(
|
|
server_url=get_settings().KEYCLOAK_URL,
|
|
username=get_settings().KEYCLOAK_ADMIN_USER,
|
|
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
|
|
realm_name=rname,
|
|
verify=False,
|
|
)
|
|
user_id = ka.create_user({
|
|
"email": email,
|
|
"username": email,
|
|
"firstName": first_name,
|
|
"lastName": last_name,
|
|
"enabled": True,
|
|
"emailVerified": False,
|
|
"attributes": {"platform_role": [role]},
|
|
})
|
|
if temp_password:
|
|
ka.set_user_password(user_id, temp_password, temporary=True)
|
|
else:
|
|
ka.send_verify_email(user_id)
|
|
return user_id
|
|
|
|
|
|
async def get_tenant_users(tenant_slug: str) -> list[dict]:
|
|
rname = realm_name(tenant_slug)
|
|
ka = KeycloakAdmin(
|
|
server_url=get_settings().KEYCLOAK_URL,
|
|
username=get_settings().KEYCLOAK_ADMIN_USER,
|
|
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
|
|
realm_name=rname,
|
|
verify=False,
|
|
)
|
|
return ka.get_users()
|
|
|
|
|
|
async def delete_tenant_user(tenant_slug: str, kc_user_id: str) -> None:
|
|
rname = realm_name(tenant_slug)
|
|
ka = KeycloakAdmin(
|
|
server_url=get_settings().KEYCLOAK_URL,
|
|
username=get_settings().KEYCLOAK_ADMIN_USER,
|
|
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
|
|
realm_name=rname,
|
|
verify=False,
|
|
)
|
|
ka.delete_user(kc_user_id)
|