This commit is contained in:
Platform CI
2026-02-21 17:54:44 -05:00
commit f9bfc3afbd
18 changed files with 2020 additions and 0 deletions

13
Dockerfile Normal file
View File

@@ -0,0 +1,13 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ app/
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

0
app/__init__.py Normal file
View File

0
app/core/__init__.py Normal file
View File

43
app/core/config.py Normal file
View File

@@ -0,0 +1,43 @@
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
# Database
DATABASE_URL: str = "postgresql+asyncpg://platform:Platform@DB2024!@platform-db.platform-db:5432/platform"
# Keycloak
KEYCLOAK_URL: str = "https://auth.platform.dlytica.com"
KEYCLOAK_ADMIN_USER: str = "admin"
KEYCLOAK_ADMIN_PASSWORD: str = "Architecture@9988#"
KEYCLOAK_MASTER_REALM: str = "master"
KEYCLOAK_PLATFORM_CLIENT: str = "platform-api"
KEYCLOAK_PLATFORM_SECRET: str = ""
# Platform
DOMAIN: str = "platform.dlytica.com"
DATA_PLANE_DOMAIN: str = "platform.dlytica.com"
DEFAULT_TRIAL_DAYS: int = 30
SECRET_KEY: str = "change-me-in-production"
# Kubernetes (in-cluster)
VCLUSTER_NAMESPACE: str = "tenant-vclusters"
DATA_PLANE_KUBECONFIG: str = "" # path to DP kubeconfig; empty = in-cluster
GITOPS_REPO_URL: str = "http://gitea_admin:ChangeMe123!@gitea.platform.dlytica.com/platform/gitops.git"
GITOPS_BRANCH: str = "main"
GITOPS_LOCAL_PATH: str = "/tmp/gitops"
# Email
SMTP_HOST: str = ""
SMTP_PORT: int = 587
SMTP_USER: str = ""
SMTP_PASSWORD: str = ""
SMTP_FROM: str = "noreply@dlytica.com"
class Config:
env_file = ".env"
@lru_cache
def get_settings() -> Settings:
return Settings()

35
app/core/database.py Normal file
View File

@@ -0,0 +1,35 @@
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from .config import get_settings
class Base(DeclarativeBase):
pass
def get_engine():
settings = get_settings()
return create_async_engine(settings.DATABASE_URL, pool_size=10, max_overflow=20, echo=False)
_engine = None
_session_factory = None
def get_session_factory():
global _engine, _session_factory
if _session_factory is None:
_engine = get_engine()
_session_factory = async_sessionmaker(_engine, expire_on_commit=False, class_=AsyncSession)
return _session_factory
async def get_db():
factory = get_session_factory()
async with factory() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise

58
app/main.py Normal file
View File

@@ -0,0 +1,58 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from .routers import auth, tenants, admin
from .services.trial_monitor import check_trials
from .core.config import get_settings
scheduler = AsyncIOScheduler()
@asynccontextmanager
async def lifespan(app: FastAPI):
# Start trial monitor (runs every hour)
scheduler.add_job(check_trials, "interval", hours=1, id="trial_monitor")
scheduler.start()
yield
scheduler.shutdown()
s = get_settings()
app = FastAPI(
title="Platform API",
description="Managed Data Platform — Control Plane API",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=[
f"https://app.{s.DOMAIN}",
f"https://data.{s.DOMAIN}",
"http://localhost:3000",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router)
app.include_router(tenants.router)
app.include_router(admin.router)
@app.get("/health")
async def health():
return {"status": "ok"}
@app.get("/")
async def root():
return {
"service": "Platform API",
"docs": "/docs",
"health": "/health",
}

0
app/models/__init__.py Normal file
View File

129
app/models/tenant.py Normal file
View File

@@ -0,0 +1,129 @@
import uuid
from datetime import datetime
from sqlalchemy import String, Boolean, Integer, Numeric, Text, DateTime, ForeignKey, JSON
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.dialects.postgresql import UUID, JSONB, INET
from ..core.database import Base
class Plan(Base):
__tablename__ = "plans"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
display_name: Mapped[str] = mapped_column(String(100), nullable=False)
trial_days: Mapped[int] = mapped_column(Integer, nullable=False, default=30)
max_users: Mapped[int] = mapped_column(Integer, nullable=False, default=5)
max_vclusters: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
features: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
price_monthly: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False, default=0)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
tenants: Mapped[list["Tenant"]] = relationship("Tenant", back_populates="plan")
class Tenant(Base):
__tablename__ = "tenants"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
slug: Mapped[str] = mapped_column(String(63), unique=True, nullable=False)
company_name: Mapped[str] = mapped_column(String(255), nullable=False)
plan_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("plans.id"), nullable=False)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
# Trial
trial_started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
trial_ends_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
trial_duration_days: Mapped[int] = mapped_column(Integer, nullable=False, default=30)
# vCluster
vcluster_name: Mapped[str | None] = mapped_column(String(63), nullable=True)
vcluster_status: Mapped[str] = mapped_column(String(20), nullable=False, default="not_created")
vcluster_url: Mapped[str | None] = mapped_column(Text, nullable=True)
# Keycloak / SSO
keycloak_realm: Mapped[str | None] = mapped_column(String(63), nullable=True)
sso_type: Mapped[str] = mapped_column(String(20), nullable=False, default="system")
custom_sso_config: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
# Tools
tools_enabled: Mapped[dict] = mapped_column(JSONB, nullable=False, default=lambda: {"superset": True})
# Contact
billing_email: Mapped[str | None] = mapped_column(String(255), nullable=True)
domain: Mapped[str | None] = mapped_column(String(255), nullable=True)
metadata_: Mapped[dict] = mapped_column("metadata", JSONB, nullable=False, default=dict)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
plan: Mapped["Plan"] = relationship("Plan", back_populates="tenants")
users: Mapped[list["User"]] = relationship("User", back_populates="tenant")
tools: Mapped[list["TenantTool"]] = relationship("TenantTool", back_populates="tenant")
jobs: Mapped[list["ProvisioningJob"]] = relationship("ProvisioningJob", back_populates="tenant")
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False)
keycloak_user_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
email: Mapped[str] = mapped_column(String(255), nullable=False)
first_name: Mapped[str | None] = mapped_column(String(100), nullable=True)
last_name: Mapped[str | None] = mapped_column(String(100), nullable=True)
role: Mapped[str] = mapped_column(String(20), nullable=False, default="member")
status: Mapped[str] = mapped_column(String(20), nullable=False, default="active")
last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="users")
class TenantTool(Base):
__tablename__ = "tenant_tools"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False)
tool_name: Mapped[str] = mapped_column(String(50), nullable=False)
enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
replicas: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
config: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)
tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="tools")
class ProvisioningJob(Base):
__tablename__ = "provisioning_jobs"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False)
job_type: Mapped[str] = mapped_column(String(50), nullable=False)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
payload: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow)
tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="jobs")
class AuditLog(Base):
__tablename__ = "audit_log"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
tenant_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True)
user_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True)
action: Mapped[str] = mapped_column(String(100), nullable=False)
resource: Mapped[str | None] = mapped_column(String(100), nullable=True)
details: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow)
class PlatformSetting(Base):
__tablename__ = "platform_settings"
key: Mapped[str] = mapped_column(String(100), primary_key=True)
value: Mapped[str] = mapped_column(Text, nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow)

0
app/routers/__init__.py Normal file
View File

216
app/routers/admin.py Normal file
View File

@@ -0,0 +1,216 @@
"""
/admin — Admin panel: manage tenants, trials, plans, platform settings.
Protected by a simple bearer token (admin API key).
"""
from fastapi import APIRouter, Depends, HTTPException, status, Header
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update as sa_update, func
from uuid import UUID
from datetime import datetime, timedelta
from typing import Optional
from ..core.database import get_db
from ..core.config import get_settings
from ..models.tenant import (
Tenant, User, Plan, ProvisioningJob, AuditLog, PlatformSetting
)
from ..schemas.tenant import (
AdminTrialUpdateRequest, AdminPlanUpdateRequest, PlatformSettingUpdate
)
from ..services import keycloak_service, provisioner
router = APIRouter(prefix="/admin", tags=["admin"])
ADMIN_API_KEY = "Platform-Admin-2024!" # in prod: from secret/env
def _require_admin(x_admin_key: str = Header(...)):
if x_admin_key != ADMIN_API_KEY:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized")
# ── Dashboard ─────────────────────────────────────────────────────────────────
@router.get("/dashboard", dependencies=[Depends(_require_admin)])
async def dashboard(db: AsyncSession = Depends(get_db)):
total = (await db.execute(select(func.count()).select_from(Tenant))).scalar()
active = (await db.execute(select(func.count()).select_from(Tenant).where(Tenant.status == "active"))).scalar()
trial = (await db.execute(select(func.count()).select_from(Tenant).where(Tenant.status == "trial"))).scalar()
suspended = (await db.execute(select(func.count()).select_from(Tenant).where(Tenant.status == "suspended"))).scalar()
expiring = (await db.execute(
select(func.count()).select_from(Tenant)
.where(Tenant.status == "trial", Tenant.trial_ends_at <= datetime.utcnow() + timedelta(days=7))
)).scalar()
users_total = (await db.execute(select(func.count()).select_from(User))).scalar()
return {
"tenants": {"total": total, "active": active, "trial": trial, "suspended": suspended},
"expiring_trials_7d": expiring,
"total_users": users_total,
}
# ── Tenant management ─────────────────────────────────────────────────────────
@router.get("/tenants", dependencies=[Depends(_require_admin)])
async def list_tenants(
status: Optional[str] = None,
db: AsyncSession = Depends(get_db),
):
q = select(Tenant, Plan.name.label("plan_name")).join(Plan, Tenant.plan_id == Plan.id)
if status:
q = q.where(Tenant.status == status)
rows = (await db.execute(q)).all()
return [
{
"id": str(t.id), "slug": t.slug, "company_name": t.company_name,
"status": t.status, "plan": plan_name,
"trial_ends_at": t.trial_ends_at.isoformat() if t.trial_ends_at else None,
"vcluster_status": t.vcluster_status,
"created_at": t.created_at.isoformat(),
}
for t, plan_name in rows
]
@router.get("/tenants/{tenant_id}", dependencies=[Depends(_require_admin)])
async def get_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
t = await db.get(Tenant, tenant_id)
if not t:
raise HTTPException(status_code=404, detail="Not found")
jobs = (await db.execute(
select(ProvisioningJob).where(ProvisioningJob.tenant_id == tenant_id)
.order_by(ProvisioningJob.created_at.desc()).limit(5)
)).scalars().all()
users = (await db.execute(
select(User).where(User.tenant_id == tenant_id, User.status == "active")
)).scalars().all()
return {
"tenant": t,
"jobs": [{"type": j.job_type, "status": j.status, "created": j.created_at} for j in jobs],
"user_count": len(users),
}
@router.post("/tenants/{tenant_id}/extend-trial", dependencies=[Depends(_require_admin)])
async def extend_trial(
tenant_id: UUID,
payload: AdminTrialUpdateRequest,
db: AsyncSession = Depends(get_db),
):
t = await db.get(Tenant, tenant_id)
if not t:
raise HTTPException(status_code=404, detail="Not found")
base = max(t.trial_ends_at or datetime.utcnow(), datetime.utcnow())
new_end = base + timedelta(days=payload.trial_days)
await db.execute(
sa_update(Tenant).where(Tenant.id == tenant_id)
.values(trial_ends_at=new_end, status="trial")
)
db.add(AuditLog(tenant_id=tenant_id, action="admin.trial_extended",
details={"extra_days": payload.trial_days, "new_end": new_end.isoformat()}))
await db.commit()
return {"new_trial_end": new_end.isoformat()}
@router.post("/tenants/{tenant_id}/suspend", dependencies=[Depends(_require_admin)])
async def suspend_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
t = await db.get(Tenant, tenant_id)
if not t:
raise HTTPException(status_code=404, detail="Not found")
await db.execute(
sa_update(Tenant).where(Tenant.id == tenant_id).values(status="suspended")
)
db.add(AuditLog(tenant_id=tenant_id, action="admin.tenant_suspended"))
await db.commit()
return {"status": "suspended"}
@router.post("/tenants/{tenant_id}/activate", dependencies=[Depends(_require_admin)])
async def activate_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
t = await db.get(Tenant, tenant_id)
if not t:
raise HTTPException(status_code=404, detail="Not found")
await db.execute(
sa_update(Tenant).where(Tenant.id == tenant_id).values(status="active")
)
db.add(AuditLog(tenant_id=tenant_id, action="admin.tenant_activated"))
await db.commit()
return {"status": "active"}
@router.delete("/tenants/{tenant_id}", dependencies=[Depends(_require_admin)])
async def delete_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
"""Full deprovisioning: remove vCluster + Keycloak realm."""
t = await db.get(Tenant, tenant_id)
if not t:
raise HTTPException(status_code=404, detail="Not found")
# Remove GitOps manifests (triggers ArgoCD prune)
await provisioner.deprovision_tenant(t.slug)
# Remove Keycloak realm
await keycloak_service.delete_tenant_realm(t.slug)
await db.execute(
sa_update(Tenant).where(Tenant.id == tenant_id)
.values(status="deleted", deleted_at=datetime.utcnow(), vcluster_status="deleted")
)
db.add(AuditLog(tenant_id=tenant_id, action="admin.tenant_deleted"))
await db.commit()
return {"status": "deleted"}
@router.post("/tenants/{tenant_id}/change-plan", dependencies=[Depends(_require_admin)])
async def change_plan(
tenant_id: UUID,
payload: AdminPlanUpdateRequest,
db: AsyncSession = Depends(get_db),
):
plan = (await db.execute(select(Plan).where(Plan.name == payload.plan_name))).scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail=f"Plan '{payload.plan_name}' not found")
await db.execute(
sa_update(Tenant).where(Tenant.id == tenant_id).values(plan_id=plan.id)
)
db.add(AuditLog(tenant_id=tenant_id, action="admin.plan_changed",
details={"new_plan": payload.plan_name}))
await db.commit()
return {"plan": payload.plan_name}
# ── Platform Settings ─────────────────────────────────────────────────────────
@router.get("/settings", dependencies=[Depends(_require_admin)])
async def list_settings(db: AsyncSession = Depends(get_db)):
rows = (await db.execute(select(PlatformSetting))).scalars().all()
return {r.key: {"value": r.value, "description": r.description} for r in rows}
@router.put("/settings/{key}", dependencies=[Depends(_require_admin)])
async def update_setting(key: str, payload: PlatformSettingUpdate, db: AsyncSession = Depends(get_db)):
row = await db.get(PlatformSetting, key)
if not row:
raise HTTPException(status_code=404, detail=f"Setting '{key}' not found")
row.value = payload.value
await db.commit()
return {"key": key, "value": payload.value}
# ── Plans ──────────────────────────────────────────────────────────────────────
@router.get("/plans", dependencies=[Depends(_require_admin)])
async def list_plans(db: AsyncSession = Depends(get_db)):
rows = (await db.execute(select(Plan))).scalars().all()
return rows
# ── Audit Log ─────────────────────────────────────────────────────────────────
@router.get("/audit", dependencies=[Depends(_require_admin)])
async def audit_log(
tenant_id: Optional[UUID] = None,
limit: int = 100,
db: AsyncSession = Depends(get_db),
):
q = select(AuditLog).order_by(AuditLog.created_at.desc()).limit(limit)
if tenant_id:
q = q.where(AuditLog.tenant_id == tenant_id)
rows = (await db.execute(q)).scalars().all()
return rows

210
app/routers/auth.py Normal file
View File

@@ -0,0 +1,210 @@
"""
/auth — Sign-up, sign-in (delegates to Keycloak)
"""
from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from datetime import datetime, timedelta
from slugify import slugify
import secrets
from ..core.database import get_db
from ..core.config import get_settings
from ..models.tenant import Tenant, User, Plan, TenantTool, ProvisioningJob, AuditLog
from ..schemas.tenant import SignUpRequest, TenantResponse
from ..services import keycloak_service, provisioner
router = APIRouter(prefix="/auth", tags=["auth"])
async def _default_plan(db: AsyncSession) -> Plan:
settings = get_settings()
result = await db.execute(select(Plan).where(Plan.name == settings.DEFAULT_TRIAL_DAYS or "trial"))
plan = result.scalar_one_or_none()
if not plan:
result = await db.execute(select(Plan).where(Plan.name == "trial"))
plan = result.scalar_one()
return plan
@router.post("/signup", response_model=TenantResponse, status_code=status.HTTP_201_CREATED)
async def signup(
payload: SignUpRequest,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db),
):
"""
User signs up with company name → creates Tenant + Keycloak realm + starts trial.
vCluster provisioning is triggered asynchronously.
"""
s = get_settings()
slug = slugify(payload.company_name, max_length=40, separator="-")
# Check slug uniqueness
existing = await db.execute(select(Tenant).where(Tenant.slug == slug))
if existing.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Company name '{payload.company_name}' is already registered."
)
plan = await _default_plan(db)
now = datetime.utcnow()
trial_days = plan.trial_days
# Create tenant record
tenant = Tenant(
slug=slug,
company_name=payload.company_name,
plan_id=plan.id,
status="trial",
trial_started_at=now,
trial_ends_at=now + timedelta(days=trial_days),
trial_duration_days=trial_days,
vcluster_status="provisioning",
tools_enabled=plan.features,
billing_email=payload.email,
keycloak_realm=f"tenant-{slug}",
sso_type="system",
)
db.add(tenant)
await db.flush()
# Create owner user record
user = User(
tenant_id=tenant.id,
email=payload.email,
first_name=payload.first_name,
last_name=payload.last_name,
role="owner",
status="active",
)
db.add(user)
# Default tool rows
for tool, enabled in plan.features.items():
db.add(TenantTool(tenant_id=tenant.id, tool_name=tool, enabled=bool(enabled)))
# Provisioning job
job = ProvisioningJob(
tenant_id=tenant.id,
job_type="create_vcluster",
payload={
"slug": slug,
"company_name": payload.company_name,
"plan": plan.name,
"tools": plan.features,
"admin_email": payload.email,
}
)
db.add(job)
db.add(AuditLog(
tenant_id=tenant.id,
action="tenant.signup",
details={"email": payload.email, "plan": plan.name},
))
await db.commit()
await db.refresh(tenant)
# Async: create Keycloak realm + provision vCluster
background_tasks.add_task(
_provision_async,
tenant_id=str(tenant.id),
job_id=str(job.id),
slug=slug,
company_name=payload.company_name,
admin_email=payload.email,
plan_name=plan.name,
tools=plan.features,
)
return _to_response(tenant, plan.name)
async def _provision_async(tenant_id, job_id, slug, company_name,
admin_email, plan_name, tools):
"""Background task: Keycloak realm + GitOps vCluster write."""
from ..core.database import get_session_factory
from sqlalchemy import update as sa_update
from datetime import datetime
import logging
log = logging.getLogger(__name__)
factory = get_session_factory()
async with factory() as db:
try:
# Mark job running
await db.execute(
sa_update(ProvisioningJob)
.where(ProvisioningJob.id == job_id)
.values(status="running", started_at=datetime.utcnow())
)
await db.commit()
# 1. Create Keycloak realm
await keycloak_service.create_tenant_realm(slug, company_name, admin_email)
# 2. Write gitops manifests
result = await provisioner.provision_tenant(
tenant_slug=slug,
company_name=company_name,
plan_name=plan_name,
tools_enabled=tools,
)
# 3. Update tenant with vcluster info
await db.execute(
sa_update(Tenant)
.where(Tenant.id == tenant_id)
.values(
vcluster_name=result["vcluster_name"],
vcluster_url=result["vcluster_url"],
vcluster_status="ready",
status="trial",
)
)
await db.execute(
sa_update(ProvisioningJob)
.where(ProvisioningJob.id == job_id)
.values(status="succeeded", completed_at=datetime.utcnow())
)
await db.commit()
log.info("Provisioned tenant %s", slug)
except Exception as exc:
log.exception("Provisioning failed for %s", slug)
await db.execute(
sa_update(ProvisioningJob)
.where(ProvisioningJob.id == job_id)
.values(status="failed", error=str(exc), completed_at=datetime.utcnow())
)
await db.execute(
sa_update(Tenant)
.where(Tenant.id == tenant_id)
.values(vcluster_status="not_created")
)
await db.commit()
def _to_response(tenant: Tenant, plan_name: str) -> TenantResponse:
s = get_settings()
dp_url = (
f"https://data.{s.DATA_PLANE_DOMAIN}?tenant={tenant.slug}"
if tenant.vcluster_status in ("ready", "provisioning")
else None
)
return TenantResponse(
id=tenant.id,
slug=tenant.slug,
company_name=tenant.company_name,
status=tenant.status,
plan_name=plan_name,
trial_ends_at=tenant.trial_ends_at,
vcluster_status=tenant.vcluster_status,
vcluster_url=tenant.vcluster_url,
data_plane_url=dp_url,
tools_enabled=tenant.tools_enabled,
keycloak_realm=tenant.keycloak_realm,
sso_type=tenant.sso_type,
created_at=tenant.created_at,
)

213
app/routers/tenants.py Normal file
View File

@@ -0,0 +1,213 @@
"""
/tenants — Tenant self-service: tools, SSO, users
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update as sa_update
from uuid import UUID
from datetime import datetime
from ..core.database import get_db
from ..core.config import get_settings
from ..models.tenant import Tenant, User, TenantTool, AuditLog, Plan
from ..schemas.tenant import (
TenantResponse, ToolUpdateRequest, CustomSSORequest, UserInviteRequest
)
from ..services import keycloak_service, provisioner
router = APIRouter(prefix="/tenants", tags=["tenants"])
async def _get_tenant(tenant_id: UUID, db: AsyncSession) -> Tenant:
r = await db.execute(select(Tenant).where(Tenant.id == tenant_id, Tenant.deleted_at.is_(None)))
t = r.scalar_one_or_none()
if not t:
raise HTTPException(status_code=404, detail="Tenant not found")
return t
# ── GET tenant info ──────────────────────────────────────────────────────────
@router.get("/by-slug/{slug}", response_model=TenantResponse)
async def get_tenant_by_slug(slug: str, db: AsyncSession = Depends(get_db)):
r = await db.execute(select(Tenant).where(Tenant.slug == slug, Tenant.deleted_at.is_(None)))
t = r.scalar_one_or_none()
if not t:
raise HTTPException(status_code=404, detail="Tenant not found")
plan = await db.get(Plan, t.plan_id)
return _to_response(t, plan.name if plan else "unknown")
@router.get("/{tenant_id}", response_model=TenantResponse)
async def get_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
t = await _get_tenant(tenant_id, db)
plan = await db.get(Plan, t.plan_id)
return _to_response(t, plan.name if plan else "unknown")
# ── Tool management ──────────────────────────────────────────────────────────
@router.patch("/{tenant_id}/tools")
async def update_tools(
tenant_id: UUID,
payload: ToolUpdateRequest,
db: AsyncSession = Depends(get_db),
):
"""Enable/disable/scale data tools. Superset is always on by default."""
t = await _get_tenant(tenant_id, db)
plan = await db.get(Plan, t.plan_id)
# Enforce plan feature gating
new_tools = payload.to_dict(t.tools_enabled)
if plan:
for tool, enabled in new_tools.items():
if enabled and not plan.features.get(tool, False):
raise HTTPException(
status_code=402,
detail=f"Tool '{tool}' is not available on plan '{plan.name}'. Please upgrade."
)
# Always keep Superset on (default tool)
new_tools["superset"] = True
# Update DB
await db.execute(
sa_update(Tenant).where(Tenant.id == tenant_id).values(tools_enabled=new_tools)
)
for tool, enabled in new_tools.items():
existing = await db.execute(
select(TenantTool).where(TenantTool.tenant_id == tenant_id, TenantTool.tool_name == tool)
)
row = existing.scalar_one_or_none()
if row:
row.enabled = bool(enabled)
else:
db.add(TenantTool(tenant_id=tenant_id, tool_name=tool, enabled=bool(enabled)))
db.add(AuditLog(tenant_id=tenant_id, action="tools.updated", details={"tools": new_tools}))
await db.commit()
# Push gitops update
s = get_settings()
await provisioner.update_tenant_tools(t.slug, new_tools)
tool_urls = {
tool: f"https://{t.slug}-{tool}.{s.DATA_PLANE_DOMAIN}"
for tool, en in new_tools.items() if en
}
return {"tools_enabled": new_tools, "tool_urls": tool_urls}
# ── SSO management ───────────────────────────────────────────────────────────
@router.get("/{tenant_id}/sso")
async def get_sso_config(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
t = await _get_tenant(tenant_id, db)
s = get_settings()
return {
"sso_type": t.sso_type,
"system_sso_url": f"{s.KEYCLOAK_URL}/realms/{t.keycloak_realm}",
"custom_config": t.custom_sso_config,
}
@router.post("/{tenant_id}/sso/custom")
async def configure_custom_sso(
tenant_id: UUID,
payload: CustomSSORequest,
db: AsyncSession = Depends(get_db),
):
"""Configure a custom OIDC/SAML IdP within the tenant's Keycloak realm."""
t = await _get_tenant(tenant_id, db)
plan = await db.get(Plan, t.plan_id)
if not (plan and plan.features.get("custom_sso")):
raise HTTPException(status_code=402, detail="Custom SSO requires Enterprise plan.")
await keycloak_service.add_custom_idp(t.slug, payload.model_dump())
await db.execute(
sa_update(Tenant)
.where(Tenant.id == tenant_id)
.values(sso_type="custom", custom_sso_config=payload.model_dump())
)
db.add(AuditLog(tenant_id=tenant_id, action="sso.custom_idp_added",
details={"alias": payload.alias}))
await db.commit()
return {"status": "Custom SSO configured", "alias": payload.alias}
@router.delete("/{tenant_id}/sso/custom")
async def remove_custom_sso(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
t = await _get_tenant(tenant_id, db)
await db.execute(
sa_update(Tenant).where(Tenant.id == tenant_id)
.values(sso_type="system", custom_sso_config=None)
)
db.add(AuditLog(tenant_id=tenant_id, action="sso.reverted_to_system"))
await db.commit()
return {"status": "Reverted to system SSO"}
# ── User management ──────────────────────────────────────────────────────────
@router.get("/{tenant_id}/users")
async def list_users(tenant_id: UUID, db: AsyncSession = Depends(get_db)):
t = await _get_tenant(tenant_id, db)
kc_users = await keycloak_service.get_tenant_users(t.slug)
return {"users": kc_users}
@router.post("/{tenant_id}/users", status_code=201)
async def invite_user(
tenant_id: UUID,
payload: UserInviteRequest,
db: AsyncSession = Depends(get_db),
):
t = await _get_tenant(tenant_id, db)
kc_id = await keycloak_service.create_tenant_user(
t.slug, payload.email, payload.first_name, payload.last_name, payload.role
)
user = User(
tenant_id=tenant_id,
keycloak_user_id=kc_id,
email=payload.email,
first_name=payload.first_name,
last_name=payload.last_name,
role=payload.role,
)
db.add(user)
db.add(AuditLog(tenant_id=tenant_id, action="user.invited",
details={"email": payload.email, "role": payload.role}))
await db.commit()
return {"status": "invited", "keycloak_id": kc_id}
@router.delete("/{tenant_id}/users/{kc_user_id}")
async def remove_user(tenant_id: UUID, kc_user_id: str, db: AsyncSession = Depends(get_db)):
t = await _get_tenant(tenant_id, db)
await keycloak_service.delete_tenant_user(t.slug, kc_user_id)
await db.execute(
sa_update(User)
.where(User.tenant_id == tenant_id, User.keycloak_user_id == kc_user_id)
.values(status="deleted")
)
db.add(AuditLog(tenant_id=tenant_id, action="user.removed",
details={"kc_user_id": kc_user_id}))
await db.commit()
return {"status": "removed"}
def _to_response(tenant: Tenant, plan_name: str) -> TenantResponse:
s = get_settings()
dp_url = (
f"https://data.{s.DATA_PLANE_DOMAIN}?tenant={tenant.slug}"
if tenant.vcluster_status in ("ready", "provisioning") else None
)
return TenantResponse(
id=tenant.id, slug=tenant.slug, company_name=tenant.company_name,
status=tenant.status, plan_name=plan_name,
trial_ends_at=tenant.trial_ends_at,
vcluster_status=tenant.vcluster_status, vcluster_url=tenant.vcluster_url,
data_plane_url=dp_url, tools_enabled=tenant.tools_enabled,
keycloak_realm=tenant.keycloak_realm, sso_type=tenant.sso_type,
created_at=tenant.created_at,
)

91
app/schemas/tenant.py Normal file
View File

@@ -0,0 +1,91 @@
from pydantic import BaseModel, EmailStr, Field, field_validator
from typing import Optional
from datetime import datetime
from uuid import UUID
import re
def _validate_slug(v: str) -> str:
slug = re.sub(r"[^a-z0-9-]", "-", v.lower()).strip("-")
if len(slug) < 2:
raise ValueError("Company name too short")
if len(slug) > 40:
raise ValueError("Company name too long (max 40 chars)")
return slug
class SignUpRequest(BaseModel):
email: EmailStr
first_name: str = Field(..., min_length=1, max_length=100)
last_name: str = Field(..., min_length=1, max_length=100)
company_name: str = Field(..., min_length=2, max_length=100)
password: str = Field(..., min_length=8)
@field_validator("company_name")
@classmethod
def validate_company(cls, v: str) -> str:
return v # raw company name; slug derived server-side
class TenantResponse(BaseModel):
id: UUID
slug: str
company_name: str
status: str
plan_name: Optional[str] = None
trial_ends_at: Optional[datetime] = None
vcluster_status: str
vcluster_url: Optional[str] = None
data_plane_url: Optional[str] = None
tools_enabled: dict
keycloak_realm: Optional[str] = None
sso_type: str
created_at: datetime
class Config:
from_attributes = True
class ToolUpdateRequest(BaseModel):
superset: Optional[bool] = None
airflow: Optional[bool] = None
trino: Optional[bool] = None
nifi: Optional[bool] = None
spark: Optional[bool] = None
def to_dict(self, current: dict) -> dict:
updated = dict(current)
for tool in ["superset", "airflow", "trino", "nifi", "spark"]:
val = getattr(self, tool)
if val is not None:
updated[tool] = val
return updated
class CustomSSORequest(BaseModel):
alias: str = Field(..., min_length=1)
display_name: str = Field(..., min_length=1)
provider_id: str = Field(default="oidc", pattern="^(oidc|saml)$")
authorization_url: str = Field(...)
token_url: str = Field(...)
client_id: str = Field(...)
client_secret: str = Field(...)
class UserInviteRequest(BaseModel):
email: EmailStr
first_name: str = Field(..., min_length=1)
last_name: str = Field(..., min_length=1)
role: str = Field(default="member", pattern="^(admin|member|viewer)$")
class AdminTrialUpdateRequest(BaseModel):
trial_days: int = Field(..., ge=1, le=365)
class AdminPlanUpdateRequest(BaseModel):
plan_name: str
class PlatformSettingUpdate(BaseModel):
value: str

0
app/services/__init__.py Normal file
View File

View File

@@ -0,0 +1,182 @@
"""
Keycloak Service — manages realms, clients and users per tenant.
Each tenant gets its own Keycloak realm, providing full SSO isolation.
Users can optionally configure a custom OIDC/SAML IdP within their realm.
"""
import logging
from keycloak import KeycloakAdmin, KeycloakOpenIDConnection
from ..core.config import get_settings
log = logging.getLogger(__name__)
def _admin_client() -> KeycloakAdmin:
s = get_settings()
conn = KeycloakOpenIDConnection(
server_url=s.KEYCLOAK_URL,
username=s.KEYCLOAK_ADMIN_USER,
password=s.KEYCLOAK_ADMIN_PASSWORD,
realm_name=s.KEYCLOAK_MASTER_REALM,
verify=False,
)
return KeycloakAdmin(connection=conn)
def realm_name(tenant_slug: str) -> str:
return f"tenant-{tenant_slug}"
async def create_tenant_realm(tenant_slug: str, company_name: str, admin_email: str) -> str:
"""
Create an isolated Keycloak realm for the tenant.
Returns the realm name.
"""
rname = realm_name(tenant_slug)
ka = _admin_client()
existing = [r["realm"] for r in ka.get_realms()]
if rname in existing:
log.info("Realm %s already exists", rname)
return rname
ka.create_realm({
"realm": rname,
"displayName": company_name,
"enabled": True,
"registrationAllowed": False,
"resetPasswordAllowed": True,
"rememberMe": True,
"loginWithEmailAllowed": True,
"duplicateEmailsAllowed": False,
"sslRequired": "external",
"bruteForceProtected": True,
"accessTokenLifespan": 3600,
"ssoSessionIdleTimeout": 86400,
})
# Create platform-api client in the new realm
ka_realm = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
s = get_settings()
base = f"https://app.{s.DOMAIN}"
ka_realm.create_client({
"clientId": "platform-portal",
"name": "Platform Portal",
"enabled": True,
"publicClient": True,
"standardFlowEnabled": True,
"directAccessGrantsEnabled": True,
"redirectUris": [f"{base}/*", f"https://data.{s.DOMAIN}/*"],
"webOrigins": [f"https://app.{s.DOMAIN}", f"https://data.{s.DOMAIN}"],
"protocol": "openid-connect",
})
# Create the initial tenant admin user
ka_realm.create_user({
"email": admin_email,
"username": admin_email,
"enabled": True,
"emailVerified": True,
"realmRoles": ["realm-admin"],
})
log.info("Created realm %s", rname)
return rname
async def delete_tenant_realm(tenant_slug: str) -> None:
rname = realm_name(tenant_slug)
ka = _admin_client()
existing = [r["realm"] for r in ka.get_realms()]
if rname in existing:
ka.delete_realm(rname)
log.info("Deleted realm %s", rname)
async def add_custom_idp(tenant_slug: str, idp_config: dict) -> None:
"""
Add a custom OIDC/SAML Identity Provider to the tenant realm.
idp_config keys: alias, display_name, provider_id (oidc/saml),
authorization_url, token_url, client_id, client_secret
"""
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
ka.create_idp({
"alias": idp_config.get("alias", "custom-sso"),
"displayName": idp_config.get("display_name", "Company SSO"),
"providerId": idp_config.get("provider_id", "oidc"),
"enabled": True,
"trustEmail": True,
"config": {
"authorizationUrl": idp_config.get("authorization_url", ""),
"tokenUrl": idp_config.get("token_url", ""),
"clientId": idp_config.get("client_id", ""),
"clientSecret": idp_config.get("client_secret", ""),
"defaultScope": "openid email profile",
"syncMode": "IMPORT",
},
})
async def create_tenant_user(tenant_slug: str, email: str, first_name: str,
last_name: str, role: str = "member",
temp_password: str = None) -> str:
"""Create a user in the tenant's realm. Returns KC user ID."""
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
user_id = ka.create_user({
"email": email,
"username": email,
"firstName": first_name,
"lastName": last_name,
"enabled": True,
"emailVerified": False,
"attributes": {"platform_role": [role]},
})
if temp_password:
ka.set_user_password(user_id, temp_password, temporary=True)
else:
ka.send_verify_email(user_id)
return user_id
async def get_tenant_users(tenant_slug: str) -> list[dict]:
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
return ka.get_users()
async def delete_tenant_user(tenant_slug: str, kc_user_id: str) -> None:
rname = realm_name(tenant_slug)
ka = KeycloakAdmin(
server_url=get_settings().KEYCLOAK_URL,
username=get_settings().KEYCLOAK_ADMIN_USER,
password=get_settings().KEYCLOAK_ADMIN_PASSWORD,
realm_name=rname,
verify=False,
)
ka.delete_user(kc_user_id)

717
app/services/provisioner.py Normal file
View File

@@ -0,0 +1,717 @@
"""
Tenant Provisioner — manages vCluster + tool lifecycle via GitOps.
Flow:
1. Tenant signs up → platform-api calls provision_tenant()
2. provisioner writes two files to the gitops repo:
data-plane/tenants/<slug>/vcluster.yaml ← ArgoCD Application (vCluster)
data-plane/tenants/<slug>/appset.yaml ← ApplicationSet (tool apps)
3. For each enabled tool it writes:
data-plane/tenants/<slug>/apps/<tool>.yaml ← ArgoCD Application (Helm chart)
4. Git push → ArgoCD on data-plane picks it all up automatically.
5. deprovision_tenant() removes the directory → ArgoCD prunes everything.
"""
import logging
import os
import re
import secrets
import shutil
import subprocess
from pathlib import Path
from ..core.config import get_settings
log = logging.getLogger(__name__)
# ── Tool Helm chart templates ────────────────────────────────────────────────
def _superset_app(tenant: str, domain: str, kc_base: str, kc_secret: str) -> str:
secret_key = secrets.token_urlsafe(42)
return f"""\
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: {tenant}-superset
namespace: argocd
annotations:
argocd.argoproj.io/sync-wave: "3"
spec:
project: default
source:
repoURL: https://apache.github.io/superset
chart: superset
targetRevision: "0.15.2"
helm:
valuesObject:
replicaCount: 1
supersetNode:
replicaCount: 1
supersetCeleryBeat:
enabled: false
supersetWorker:
replicaCount: 0
init:
adminUser:
username: admin
firstname: Admin
lastname: {tenant}
email: admin@{domain}
password: "Architecture@9988#"
loadExamples: false
postgresql:
enabled: false
redis:
enabled: false
service:
type: ClusterIP
port: 8088
ingress:
enabled: true
ingressClassName: nginx
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/proxy-buffer-size: "16k"
hosts:
- host: {tenant}-superset.{domain}
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- {tenant}-superset.{domain}
secretName: {tenant}-superset-tls
# initContainer installs psycopg2 (missing from apache/superset:5.x image)
initContainers:
- name: install-drivers
image: apachesuperset.docker.scarf.sh/apache/superset:5.0.0
command:
- sh
- -c
- pip install psycopg2-binary Authlib --user --no-cache-dir --quiet
volumeMounts:
- name: pip-packages
mountPath: /app/superset_home/.local
extraVolumes:
- name: pip-packages
emptyDir: {{}}
extraVolumeMounts:
- name: pip-packages
mountPath: /app/superset_home/.local
extraEnv:
PYTHONPATH: "/app/superset_home/.local/lib/python3.10/site-packages"
DB_HOST: "{tenant}-superset-pg"
DB_PORT: "5432"
DB_USER: "superset"
DB_PASS: "superset-pass"
DB_NAME: "superset"
REDIS_HOST: "{tenant}-superset-redis"
REDIS_PORT: "6379"
bootstrapScript: |
#!/bin/bash
echo "Drivers pre-installed via initContainer"
configOverrides:
keycloak_sso: |
from flask_appbuilder.security.manager import AUTH_OAUTH
from superset.security import SupersetSecurityManager
import logging
log = logging.getLogger(__name__)
AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Gamma"
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_ROLES_MAPPING = {{"admin": ["Admin"], "platform_admin": ["Admin"]}}
OAUTH_PROVIDERS = [{{
"name": "keycloak",
"icon": "fa-key",
"token_key": "access_token",
"remote_app": {{
"client_id": "superset",
"client_secret": "{kc_secret}",
"server_metadata_url": "{kc_base}/realms/{tenant}/.well-known/openid-configuration",
"api_base_url": "{kc_base}/realms/{tenant}/protocol/openid-connect",
"client_kwargs": {{"scope": "openid email profile roles"}},
}},
}}]
class KeycloakSM(SupersetSecurityManager):
def oauth_user_info(self, provider, response=None):
if provider == "keycloak":
me = self.appbuilder.sm.oauth_remoteapp.get("userinfo").json()
roles = me.get("realm_access", {{}}).get("roles", [])
return {{
"username": me.get("preferred_username", me.get("email")),
"first_name": me.get("given_name", ""),
"last_name": me.get("family_name", ""),
"email": me.get("email", ""),
"role_keys": roles,
}}
return {{}}
CUSTOM_SECURITY_MANAGER = KeycloakSM
secret: |
SECRET_KEY = '{secret_key}'
SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://superset:superset-pass@{tenant}-superset-pg:5432/superset'
REDIS_HOST = '{tenant}-superset-redis'
REDIS_PORT = 6379
DATA_CACHE_CONFIG = {{'CACHE_TYPE': 'SimpleCache'}}
CACHE_CONFIG = {{'CACHE_TYPE': 'SimpleCache'}}
WTF_CSRF_ENABLED = False
TALISMAN_ENABLED = False
SESSION_COOKIE_SAMESITE = None
SESSION_COOKIE_SECURE = True
ENABLE_PROXY_FIX = True
PREFERRED_URL_SCHEME = 'https'
destination:
server: https://kubernetes.default.svc
namespace: {tenant}-superset
syncPolicy:
automated: {{prune: true, selfHeal: true}}
syncOptions: [CreateNamespace=true]
"""
def _superset_infra(tenant: str) -> str:
"""Standalone PostgreSQL + Redis for Superset (using registry-accessible images)."""
return f"""\
apiVersion: v1
kind: Secret
metadata:
name: {tenant}-superset-pg-secret
namespace: {tenant}-superset
stringData:
POSTGRES_DB: superset
POSTGRES_USER: superset
POSTGRES_PASSWORD: "superset-pass"
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: {tenant}-superset-pg
namespace: {tenant}-superset
spec:
serviceName: {tenant}-superset-pg
replicas: 1
selector:
matchLabels:
app: {tenant}-superset-pg
template:
metadata:
labels:
app: {tenant}-superset-pg
spec:
containers:
- name: postgres
image: postgres:16-alpine
envFrom:
- secretRef:
name: {tenant}-superset-pg-secret
ports:
- containerPort: 5432
volumeMounts:
- name: data
mountPath: /var/lib/postgresql/data
subPath: pgdata
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: [ReadWriteOnce]
resources:
requests:
storage: 5Gi
---
apiVersion: v1
kind: Service
metadata:
name: {tenant}-superset-pg
namespace: {tenant}-superset
spec:
selector:
app: {tenant}-superset-pg
ports:
- port: 5432
targetPort: 5432
---
# Alias so the Superset Helm chart's default 'superset-postgresql' hostname resolves
apiVersion: v1
kind: Service
metadata:
name: superset-postgresql
namespace: {tenant}-superset
spec:
selector:
app: {tenant}-superset-pg
ports:
- port: 5432
targetPort: 5432
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: {tenant}-superset-redis
namespace: {tenant}-superset
spec:
replicas: 1
selector:
matchLabels:
app: {tenant}-superset-redis
template:
metadata:
labels:
app: {tenant}-superset-redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
---
apiVersion: v1
kind: Service
metadata:
name: {tenant}-superset-redis
namespace: {tenant}-superset
spec:
selector:
app: {tenant}-superset-redis
ports:
- port: 6379
targetPort: 6379
---
# Alias for Helm chart default hostname
apiVersion: v1
kind: Service
metadata:
name: superset-redis-headless
namespace: {tenant}-superset
spec:
selector:
app: {tenant}-superset-redis
ports:
- port: 6379
targetPort: 6379
"""
def _airflow_app(tenant: str, domain: str, kc_base: str) -> str:
return f"""\
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: {tenant}-airflow
namespace: argocd
annotations:
argocd.argoproj.io/sync-wave: "3"
spec:
project: default
source:
repoURL: https://airflow.apache.org
chart: airflow
targetRevision: "1.14.0"
helm:
valuesObject:
images:
airflow:
repository: quay.io/dlytica_dev/airflow
tag: v2-airflow-nifi-pipeline-utils-2.0.1
pullPolicy: IfNotPresent
executor: KubernetesExecutor
config:
core:
load_examples: "False"
postgresql:
enabled: true
image:
registry: quay.io
repository: dlytica_dev/postgresql
tag: "16.1"
redis:
enabled: true
ingress:
web:
enabled: true
ingressClassName: nginx
hosts:
- name: {tenant}-airflow.{domain}
tls:
enabled: true
secretName: {tenant}-airflow-tls
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
destination:
server: https://kubernetes.default.svc
namespace: {tenant}-airflow
syncPolicy:
automated: {{prune: true, selfHeal: true}}
syncOptions: [CreateNamespace=true]
"""
def _trino_app(tenant: str, domain: str) -> str:
return f"""\
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: {tenant}-trino
namespace: argocd
annotations:
argocd.argoproj.io/sync-wave: "3"
spec:
project: default
source:
repoURL: https://trinodb.github.io/charts
chart: trino
targetRevision: "0.13.0"
helm:
valuesObject:
image:
repository: quay.io/dlytica_dev/trino
tag: "476"
server:
workers: 1
ingress:
enabled: true
className: nginx
hosts:
- host: {tenant}-trino.{domain}
paths:
- path: /
pathType: Prefix
tls:
- hosts: [{tenant}-trino.{domain}]
secretName: {tenant}-trino-tls
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
destination:
server: https://kubernetes.default.svc
namespace: {tenant}-trino
syncPolicy:
automated: {{prune: true, selfHeal: true}}
syncOptions: [CreateNamespace=true]
"""
def _gitea_app(tenant: str, domain: str) -> str:
return f"""\
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: {tenant}-gitea
namespace: argocd
annotations:
argocd.argoproj.io/sync-wave: "2"
spec:
project: default
source:
repoURL: https://dl.gitea.com/charts
chart: gitea
targetRevision: "10.6.0"
helm:
valuesObject:
gitea:
admin:
username: admin
password: "Architecture@9988#"
email: admin@{domain}
ingress:
enabled: true
className: nginx
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
hosts:
- host: {tenant}-git.{domain}
paths:
- path: /
pathType: Prefix
tls:
- hosts: [{tenant}-git.{domain}]
secretName: {tenant}-gitea-tls
postgresql-ha:
enabled: false
postgresql:
enabled: true
image:
registry: quay.io
repository: dlytica_dev/postgresql
tag: "16.1"
destination:
server: https://kubernetes.default.svc
namespace: {tenant}-gitea
syncPolicy:
automated: {{prune: true, selfHeal: true}}
syncOptions: [CreateNamespace=true]
"""
# Map tool names to their generator functions
TOOL_GENERATORS = {
"superset": lambda tenant, domain, kc_base, kc_secret: _superset_app(tenant, domain, kc_base, kc_secret),
"airflow": lambda tenant, domain, kc_base, kc_secret: _airflow_app(tenant, domain, kc_base),
"trino": lambda tenant, domain, kc_base, kc_secret: _trino_app(tenant, domain),
"gitea": lambda tenant, domain, kc_base, kc_secret: _gitea_app(tenant, domain),
}
# Tools that need companion infra manifests
INFRA_GENERATORS = {
"superset": _superset_infra,
}
# Default tools for every new tenant (always on)
DEFAULT_TOOLS = {"superset": True}
# ── vCluster template ────────────────────────────────────────────────────────
def _vcluster_app(tenant: str, domain: str, namespace: str, gitops_repo: str, gitops_branch: str) -> str:
return f"""\
# vCluster for tenant: {tenant}
# ArgoCD on the data-plane deploys this. Once the vCluster is up, the
# ApplicationSet below drives all tool deployments inside it.
apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
name: vcluster-{tenant}
namespace: argocd
labels:
tenant: {tenant}
managed-by: platform
annotations:
argocd.argoproj.io/sync-wave: "1"
spec:
project: default
source:
repoURL: https://charts.loft.sh
chart: vcluster
targetRevision: "0.20.0"
helm:
valuesObject:
sync:
ingresses:
enabled: true
exportKubeConfig:
secret:
name: vcluster-{tenant}-kubeconfig
controlPlane:
ingress:
enabled: true
host: vc-{tenant}.{domain}
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
init:
manifests: |
apiVersion: v1
kind: Namespace
metadata:
name: argocd
destination:
server: https://kubernetes.default.svc
namespace: {namespace}
syncPolicy:
automated: {{prune: true, selfHeal: true}}
syncOptions: [CreateNamespace=true]
"""
def _appset(tenant: str, gitops_repo: str, gitops_branch: str) -> str:
"""ApplicationSet that watches data-plane/tenants/<slug>/apps/ and deploys each app."""
return f"""\
# ApplicationSet for tenant: {tenant}
# Watches gitops/data-plane/tenants/{tenant}/apps/* and creates one ArgoCD Application per tool.
# Add/remove a file → ArgoCD adds/removes the tool automatically.
apiVersion: argoproj.io/v1alpha1
kind: ApplicationSet
metadata:
name: {tenant}-tools
namespace: argocd
labels:
tenant: {tenant}
managed-by: platform
spec:
generators:
- git:
repoURL: {gitops_repo}
revision: {gitops_branch}
directories:
- path: data-plane/tenants/{tenant}/apps/*
template:
metadata:
name: "{tenant}-{{{{path.basename}}}}"
namespace: argocd
labels:
tenant: {tenant}
spec:
project: default
source:
repoURL: {gitops_repo}
targetRevision: {gitops_branch}
path: "{{{{path}}}}"
destination:
server: https://kubernetes.default.svc
namespace: "{tenant}-{{{{path.basename}}}}"
syncPolicy:
automated: {{prune: true, selfHeal: true}}
syncOptions: [CreateNamespace=true]
retry:
limit: 5
backoff: {{duration: 30s, factor: 2, maxDuration: 5m}}
"""
# ── Git helpers ──────────────────────────────────────────────────────────────
def _git_clone_or_pull(repo_url: str, local_path: str, branch: str) -> None:
p = Path(local_path)
if p.exists():
subprocess.run(
["git", "-C", local_path, "pull", "--rebase", "origin", branch],
check=True, capture_output=True,
)
else:
subprocess.run(
["git", "clone", "--depth=1", "-b", branch, repo_url, local_path],
check=True, capture_output=True,
)
def _git_push(local_path: str, message: str, branch: str) -> None:
cmds = [
["git", "-C", local_path, "config", "user.email", "platform-api@dlytica.com"],
["git", "-C", local_path, "config", "user.name", "Platform API"],
["git", "-C", local_path, "add", "-A"],
["git", "-C", local_path, "commit", "-m", message, "--allow-empty"],
["git", "-C", local_path, "push", "origin", branch],
]
for cmd in cmds:
subprocess.run(cmd, check=True, capture_output=True)
def _slug_safe(s: str) -> str:
return re.sub(r"[^a-z0-9-]", "-", s.lower()).strip("-")[:40]
# ── Public API ───────────────────────────────────────────────────────────────
async def provision_tenant(
tenant_slug: str,
company_name: str,
plan_name: str,
tools_enabled: dict,
domain: str = None,
kc_client_secret: str = "",
) -> dict:
"""
Write vCluster + ApplicationSet + tool Applications to gitops repo.
ArgoCD on the data-plane picks them up automatically.
Returns dict with vcluster_name, vcluster_url, tool_urls.
"""
s = get_settings()
base_domain = domain or s.DATA_PLANE_DOMAIN
kc_base = s.KEYCLOAK_URL
tenant = _slug_safe(tenant_slug)
namespace = s.VCLUSTER_NAMESPACE
local = s.GITOPS_LOCAL_PATH
_git_clone_or_pull(s.GITOPS_REPO_URL, local, s.GITOPS_BRANCH)
tenant_dir = Path(local) / "data-plane" / "tenants" / tenant
tenant_dir.mkdir(parents=True, exist_ok=True)
apps_dir = tenant_dir / "apps"
apps_dir.mkdir(exist_ok=True)
# 1. vCluster Application
(tenant_dir / "vcluster.yaml").write_text(
_vcluster_app(tenant, base_domain, namespace, s.GITOPS_REPO_URL, s.GITOPS_BRANCH)
)
# 2. ApplicationSet (auto-discovers apps/)
(tenant_dir / "appset.yaml").write_text(
_appset(tenant, s.GITOPS_REPO_URL, s.GITOPS_BRANCH)
)
# 3. Tool Applications — always include superset; add others per plan
merged = {**DEFAULT_TOOLS, **tools_enabled}
tool_urls = {}
for tool, gen in TOOL_GENERATORS.items():
tool_dir = apps_dir / tool
infra_path = apps_dir / f"{tool}-infra.yaml"
if merged.get(tool):
tool_dir.mkdir(exist_ok=True)
(tool_dir / "application.yaml").write_text(
gen(tenant, base_domain, kc_base, kc_client_secret)
)
# Write companion infra manifests if needed (e.g. postgres + redis for superset)
if tool in INFRA_GENERATORS:
(tool_dir / "infra.yaml").write_text(INFRA_GENERATORS[tool](tenant))
tool_urls[tool] = f"https://{tenant}-{tool}.{base_domain}"
else:
# Remove if disabled
if tool_dir.exists():
shutil.rmtree(tool_dir)
infra_path.unlink(missing_ok=True)
_git_push(local, f"feat: provision tenant {tenant} ({company_name})", s.GITOPS_BRANCH)
return {
"vcluster_name": f"vcluster-{tenant}",
"vcluster_url": f"https://vc-{tenant}.{base_domain}",
"tool_urls": tool_urls,
"data_plane_url": f"https://data.{base_domain}?tenant={tenant}",
}
async def deprovision_tenant(tenant_slug: str) -> None:
"""
Remove tenant directory from gitops.
ArgoCD prunes the vCluster and all tool Applications automatically.
"""
s = get_settings()
tenant = _slug_safe(tenant_slug)
local = s.GITOPS_LOCAL_PATH
_git_clone_or_pull(s.GITOPS_REPO_URL, local, s.GITOPS_BRANCH)
tenant_dir = Path(local) / "data-plane" / "tenants" / tenant
if tenant_dir.exists():
shutil.rmtree(tenant_dir)
_git_push(local, f"feat: deprovision tenant {tenant}", s.GITOPS_BRANCH)
log.info("Deprovisioned tenant %s", tenant)
async def update_tenant_tools(
tenant_slug: str,
tools_enabled: dict,
domain: str = None,
kc_client_secret: str = "",
) -> dict:
"""
Enable/disable tools for a tenant by adding/removing Application files.
ArgoCD prunes disabled tools automatically.
"""
s = get_settings()
base_domain = domain or s.DATA_PLANE_DOMAIN
kc_base = s.KEYCLOAK_URL
tenant = _slug_safe(tenant_slug)
local = s.GITOPS_LOCAL_PATH
_git_clone_or_pull(s.GITOPS_REPO_URL, local, s.GITOPS_BRANCH)
apps_dir = Path(local) / "data-plane" / "tenants" / tenant / "apps"
apps_dir.mkdir(parents=True, exist_ok=True)
merged = {**DEFAULT_TOOLS, **tools_enabled}
tool_urls = {}
for tool, gen in TOOL_GENERATORS.items():
tool_dir = apps_dir / tool
if merged.get(tool):
tool_dir.mkdir(exist_ok=True)
(tool_dir / "application.yaml").write_text(
gen(tenant, base_domain, kc_base, kc_client_secret)
)
if tool in INFRA_GENERATORS:
(tool_dir / "infra.yaml").write_text(INFRA_GENERATORS[tool](tenant))
tool_urls[tool] = f"https://{tenant}-{tool}.{base_domain}"
else:
if tool_dir.exists():
shutil.rmtree(tool_dir)
_git_push(local, f"chore: update tools for tenant {tenant}", s.GITOPS_BRANCH)
return tool_urls

View File

@@ -0,0 +1,96 @@
"""
Trial Monitor — APScheduler job that runs every hour.
- Warns tenants 7 days before trial expiry
- Suspends vClusters when trial expires
- Deletes vClusters after a grace period (configurable)
"""
import logging
from datetime import datetime, timedelta
from sqlalchemy import select, update as sa_update
from ..core.database import get_session_factory
from ..models.tenant import Tenant, PlatformSetting, ProvisioningJob
from ..services import provisioner
log = logging.getLogger(__name__)
async def _get_setting(db, key: str, default: str) -> str:
row = await db.get(PlatformSetting, key)
return row.value if row else default
async def check_trials():
"""Called by APScheduler every hour."""
factory = get_session_factory()
async with factory() as db:
now = datetime.utcnow()
warn_days_str = await _get_setting(db, "trial_warning_days", "7")
warn_days = int(warn_days_str)
# ── Warn approaching expiry ───────────────────────────────
warn_before = now + timedelta(days=warn_days)
approaching = (await db.execute(
select(Tenant)
.where(
Tenant.status == "trial",
Tenant.trial_ends_at <= warn_before,
Tenant.trial_ends_at > now,
)
)).scalars().all()
for t in approaching:
days_left = max(0, (t.trial_ends_at - now).days)
log.info("Trial warning: %s expires in %d days", t.slug, days_left)
# TODO: send email notification
# ── Expire trials ─────────────────────────────────────────
expired = (await db.execute(
select(Tenant)
.where(
Tenant.status == "trial",
Tenant.trial_ends_at <= now,
Tenant.vcluster_status == "ready",
)
)).scalars().all()
for t in expired:
log.info("Trial expired: %s — suspending vCluster", t.slug)
await db.execute(
sa_update(Tenant)
.where(Tenant.id == t.id)
.values(status="suspended", vcluster_status="suspended")
)
db.add(ProvisioningJob(
tenant_id=t.id,
job_type="suspend_vcluster",
payload={"slug": t.slug, "reason": "trial_expired"},
))
await db.commit()
# ── Delete suspended vclusters after grace period (7 days) ─
grace_days = int(await _get_setting(db, "suspension_grace_days", "7"))
grace_before = now - timedelta(days=grace_days)
old_suspended = (await db.execute(
select(Tenant)
.where(
Tenant.status == "suspended",
Tenant.updated_at <= grace_before,
Tenant.vcluster_status == "suspended",
)
)).scalars().all()
for t in old_suspended:
log.info("Grace period ended: %s — deprovisioning", t.slug)
try:
await provisioner.deprovision_tenant(t.slug)
await db.execute(
sa_update(Tenant)
.where(Tenant.id == t.id)
.values(vcluster_status="deleted")
)
except Exception as exc:
log.error("Failed to deprovision %s: %s", t.slug, exc)
await db.commit()

17
requirements.txt Normal file
View File

@@ -0,0 +1,17 @@
fastapi>=0.111.0
uvicorn[standard]>=0.30.0
asyncpg>=0.29.0
sqlalchemy[asyncio]>=2.0.30
alembic>=1.13.0
pydantic>=2.7.0
pydantic-settings>=2.2.0
python-keycloak>=3.9.0
httpx>=0.27.0
python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
python-multipart>=0.0.9
APScheduler>=3.10.4
kubernetes>=29.0.0
python-slugify>=8.0.4
jinja2>=3.1.4
emails>=0.6