From f9bfc3afbd34659fea211990169ece77d1971e94 Mon Sep 17 00:00:00 2001 From: Platform CI Date: Sat, 21 Feb 2026 17:54:44 -0500 Subject: [PATCH] init --- Dockerfile | 13 + app/__init__.py | 0 app/core/__init__.py | 0 app/core/config.py | 43 ++ app/core/database.py | 35 ++ app/main.py | 58 +++ app/models/__init__.py | 0 app/models/tenant.py | 129 ++++++ app/routers/__init__.py | 0 app/routers/admin.py | 216 ++++++++++ app/routers/auth.py | 210 +++++++++ app/routers/tenants.py | 213 +++++++++ app/schemas/tenant.py | 91 ++++ app/services/__init__.py | 0 app/services/keycloak_service.py | 182 ++++++++ app/services/provisioner.py | 717 +++++++++++++++++++++++++++++++ app/services/trial_monitor.py | 96 +++++ requirements.txt | 17 + 18 files changed, 2020 insertions(+) create mode 100644 Dockerfile create mode 100644 app/__init__.py create mode 100644 app/core/__init__.py create mode 100644 app/core/config.py create mode 100644 app/core/database.py create mode 100644 app/main.py create mode 100644 app/models/__init__.py create mode 100644 app/models/tenant.py create mode 100644 app/routers/__init__.py create mode 100644 app/routers/admin.py create mode 100644 app/routers/auth.py create mode 100644 app/routers/tenants.py create mode 100644 app/schemas/tenant.py create mode 100644 app/services/__init__.py create mode 100644 app/services/keycloak_service.py create mode 100644 app/services/provisioner.py create mode 100644 app/services/trial_monitor.py create mode 100644 requirements.txt diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e8dda56 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-slim + +WORKDIR /app + +RUN apt-get update && apt-get install -y git && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY app/ app/ + +EXPOSE 8000 +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"] diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/__init__.py b/app/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/core/config.py b/app/core/config.py new file mode 100644 index 0000000..b309219 --- /dev/null +++ b/app/core/config.py @@ -0,0 +1,43 @@ +from pydantic_settings import BaseSettings +from functools import lru_cache + + +class Settings(BaseSettings): + # Database + DATABASE_URL: str = "postgresql+asyncpg://platform:Platform@DB2024!@platform-db.platform-db:5432/platform" + + # Keycloak + KEYCLOAK_URL: str = "https://auth.platform.dlytica.com" + KEYCLOAK_ADMIN_USER: str = "admin" + KEYCLOAK_ADMIN_PASSWORD: str = "Architecture@9988#" + KEYCLOAK_MASTER_REALM: str = "master" + KEYCLOAK_PLATFORM_CLIENT: str = "platform-api" + KEYCLOAK_PLATFORM_SECRET: str = "" + + # Platform + DOMAIN: str = "platform.dlytica.com" + DATA_PLANE_DOMAIN: str = "platform.dlytica.com" + DEFAULT_TRIAL_DAYS: int = 30 + SECRET_KEY: str = "change-me-in-production" + + # Kubernetes (in-cluster) + VCLUSTER_NAMESPACE: str = "tenant-vclusters" + DATA_PLANE_KUBECONFIG: str = "" # path to DP kubeconfig; empty = in-cluster + GITOPS_REPO_URL: str = "http://gitea_admin:ChangeMe123!@gitea.platform.dlytica.com/platform/gitops.git" + GITOPS_BRANCH: str = "main" + GITOPS_LOCAL_PATH: str = "/tmp/gitops" + + # Email + SMTP_HOST: str = "" + SMTP_PORT: int = 587 + SMTP_USER: str = "" + SMTP_PASSWORD: str = "" + SMTP_FROM: str = "noreply@dlytica.com" + + class Config: + env_file = ".env" + + +@lru_cache +def get_settings() -> Settings: + return Settings() diff --git a/app/core/database.py b/app/core/database.py new file mode 100644 index 0000000..b2722b0 --- /dev/null +++ b/app/core/database.py @@ -0,0 +1,35 @@ +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker +from sqlalchemy.orm import DeclarativeBase +from .config import get_settings + + +class Base(DeclarativeBase): + pass + + +def get_engine(): + settings = get_settings() + return create_async_engine(settings.DATABASE_URL, pool_size=10, max_overflow=20, echo=False) + + +_engine = None +_session_factory = None + + +def get_session_factory(): + global _engine, _session_factory + if _session_factory is None: + _engine = get_engine() + _session_factory = async_sessionmaker(_engine, expire_on_commit=False, class_=AsyncSession) + return _session_factory + + +async def get_db(): + factory = get_session_factory() + async with factory() as session: + try: + yield session + await session.commit() + except Exception: + await session.rollback() + raise diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..6d562bd --- /dev/null +++ b/app/main.py @@ -0,0 +1,58 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from contextlib import asynccontextmanager +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +from .routers import auth, tenants, admin +from .services.trial_monitor import check_trials +from .core.config import get_settings + +scheduler = AsyncIOScheduler() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Start trial monitor (runs every hour) + scheduler.add_job(check_trials, "interval", hours=1, id="trial_monitor") + scheduler.start() + yield + scheduler.shutdown() + + +s = get_settings() +app = FastAPI( + title="Platform API", + description="Managed Data Platform — Control Plane API", + version="1.0.0", + lifespan=lifespan, +) + +app.add_middleware( + CORSMiddleware, + allow_origins=[ + f"https://app.{s.DOMAIN}", + f"https://data.{s.DOMAIN}", + "http://localhost:3000", + ], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(auth.router) +app.include_router(tenants.router) +app.include_router(admin.router) + + +@app.get("/health") +async def health(): + return {"status": "ok"} + + +@app.get("/") +async def root(): + return { + "service": "Platform API", + "docs": "/docs", + "health": "/health", + } diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/models/tenant.py b/app/models/tenant.py new file mode 100644 index 0000000..cd42355 --- /dev/null +++ b/app/models/tenant.py @@ -0,0 +1,129 @@ +import uuid +from datetime import datetime +from sqlalchemy import String, Boolean, Integer, Numeric, Text, DateTime, ForeignKey, JSON +from sqlalchemy.orm import Mapped, mapped_column, relationship +from sqlalchemy.dialects.postgresql import UUID, JSONB, INET +from ..core.database import Base + + +class Plan(Base): + __tablename__ = "plans" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False) + display_name: Mapped[str] = mapped_column(String(100), nullable=False) + trial_days: Mapped[int] = mapped_column(Integer, nullable=False, default=30) + max_users: Mapped[int] = mapped_column(Integer, nullable=False, default=5) + max_vclusters: Mapped[int] = mapped_column(Integer, nullable=False, default=1) + features: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict) + price_monthly: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False, default=0) + is_active: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) + + tenants: Mapped[list["Tenant"]] = relationship("Tenant", back_populates="plan") + + +class Tenant(Base): + __tablename__ = "tenants" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + slug: Mapped[str] = mapped_column(String(63), unique=True, nullable=False) + company_name: Mapped[str] = mapped_column(String(255), nullable=False) + plan_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("plans.id"), nullable=False) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending") + # Trial + trial_started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + trial_ends_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + trial_duration_days: Mapped[int] = mapped_column(Integer, nullable=False, default=30) + # vCluster + vcluster_name: Mapped[str | None] = mapped_column(String(63), nullable=True) + vcluster_status: Mapped[str] = mapped_column(String(20), nullable=False, default="not_created") + vcluster_url: Mapped[str | None] = mapped_column(Text, nullable=True) + # Keycloak / SSO + keycloak_realm: Mapped[str | None] = mapped_column(String(63), nullable=True) + sso_type: Mapped[str] = mapped_column(String(20), nullable=False, default="system") + custom_sso_config: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + # Tools + tools_enabled: Mapped[dict] = mapped_column(JSONB, nullable=False, default=lambda: {"superset": True}) + # Contact + billing_email: Mapped[str | None] = mapped_column(String(255), nullable=True) + domain: Mapped[str | None] = mapped_column(String(255), nullable=True) + metadata_: Mapped[dict] = mapped_column("metadata", JSONB, nullable=False, default=dict) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) + deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + + plan: Mapped["Plan"] = relationship("Plan", back_populates="tenants") + users: Mapped[list["User"]] = relationship("User", back_populates="tenant") + tools: Mapped[list["TenantTool"]] = relationship("TenantTool", back_populates="tenant") + jobs: Mapped[list["ProvisioningJob"]] = relationship("ProvisioningJob", back_populates="tenant") + + +class User(Base): + __tablename__ = "users" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False) + keycloak_user_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + email: Mapped[str] = mapped_column(String(255), nullable=False) + first_name: Mapped[str | None] = mapped_column(String(100), nullable=True) + last_name: Mapped[str | None] = mapped_column(String(100), nullable=True) + role: Mapped[str] = mapped_column(String(20), nullable=False, default="member") + status: Mapped[str] = mapped_column(String(20), nullable=False, default="active") + last_login_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) + + tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="users") + + +class TenantTool(Base): + __tablename__ = "tenant_tools" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False) + tool_name: Mapped[str] = mapped_column(String(50), nullable=False) + enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) + replicas: Mapped[int] = mapped_column(Integer, nullable=False, default=1) + config: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) + + tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="tools") + + +class ProvisioningJob(Base): + __tablename__ = "provisioning_jobs" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("tenants.id"), nullable=False) + job_type: Mapped[str] = mapped_column(String(50), nullable=False) + status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending") + payload: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict) + error: Mapped[str | None] = mapped_column(Text, nullable=True) + started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow) + + tenant: Mapped["Tenant"] = relationship("Tenant", back_populates="jobs") + + +class AuditLog(Base): + __tablename__ = "audit_log" + + id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + tenant_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True) + user_id: Mapped[uuid.UUID | None] = mapped_column(UUID(as_uuid=True), nullable=True) + action: Mapped[str] = mapped_column(String(100), nullable=False) + resource: Mapped[str | None] = mapped_column(String(100), nullable=True) + details: Mapped[dict] = mapped_column(JSONB, nullable=False, default=dict) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow) + + +class PlatformSetting(Base): + __tablename__ = "platform_settings" + + key: Mapped[str] = mapped_column(String(100), primary_key=True) + value: Mapped[str] = mapped_column(Text, nullable=False) + description: Mapped[str | None] = mapped_column(Text, nullable=True) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow) diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/routers/admin.py b/app/routers/admin.py new file mode 100644 index 0000000..2c67ca2 --- /dev/null +++ b/app/routers/admin.py @@ -0,0 +1,216 @@ +""" +/admin — Admin panel: manage tenants, trials, plans, platform settings. +Protected by a simple bearer token (admin API key). +""" +from fastapi import APIRouter, Depends, HTTPException, status, Header +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, update as sa_update, func +from uuid import UUID +from datetime import datetime, timedelta +from typing import Optional + +from ..core.database import get_db +from ..core.config import get_settings +from ..models.tenant import ( + Tenant, User, Plan, ProvisioningJob, AuditLog, PlatformSetting +) +from ..schemas.tenant import ( + AdminTrialUpdateRequest, AdminPlanUpdateRequest, PlatformSettingUpdate +) +from ..services import keycloak_service, provisioner + +router = APIRouter(prefix="/admin", tags=["admin"]) + +ADMIN_API_KEY = "Platform-Admin-2024!" # in prod: from secret/env + + +def _require_admin(x_admin_key: str = Header(...)): + if x_admin_key != ADMIN_API_KEY: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized") + + +# ── Dashboard ───────────────────────────────────────────────────────────────── + +@router.get("/dashboard", dependencies=[Depends(_require_admin)]) +async def dashboard(db: AsyncSession = Depends(get_db)): + total = (await db.execute(select(func.count()).select_from(Tenant))).scalar() + active = (await db.execute(select(func.count()).select_from(Tenant).where(Tenant.status == "active"))).scalar() + trial = (await db.execute(select(func.count()).select_from(Tenant).where(Tenant.status == "trial"))).scalar() + suspended = (await db.execute(select(func.count()).select_from(Tenant).where(Tenant.status == "suspended"))).scalar() + expiring = (await db.execute( + select(func.count()).select_from(Tenant) + .where(Tenant.status == "trial", Tenant.trial_ends_at <= datetime.utcnow() + timedelta(days=7)) + )).scalar() + users_total = (await db.execute(select(func.count()).select_from(User))).scalar() + return { + "tenants": {"total": total, "active": active, "trial": trial, "suspended": suspended}, + "expiring_trials_7d": expiring, + "total_users": users_total, + } + + +# ── Tenant management ───────────────────────────────────────────────────────── + +@router.get("/tenants", dependencies=[Depends(_require_admin)]) +async def list_tenants( + status: Optional[str] = None, + db: AsyncSession = Depends(get_db), +): + q = select(Tenant, Plan.name.label("plan_name")).join(Plan, Tenant.plan_id == Plan.id) + if status: + q = q.where(Tenant.status == status) + rows = (await db.execute(q)).all() + return [ + { + "id": str(t.id), "slug": t.slug, "company_name": t.company_name, + "status": t.status, "plan": plan_name, + "trial_ends_at": t.trial_ends_at.isoformat() if t.trial_ends_at else None, + "vcluster_status": t.vcluster_status, + "created_at": t.created_at.isoformat(), + } + for t, plan_name in rows + ] + + +@router.get("/tenants/{tenant_id}", dependencies=[Depends(_require_admin)]) +async def get_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + t = await db.get(Tenant, tenant_id) + if not t: + raise HTTPException(status_code=404, detail="Not found") + jobs = (await db.execute( + select(ProvisioningJob).where(ProvisioningJob.tenant_id == tenant_id) + .order_by(ProvisioningJob.created_at.desc()).limit(5) + )).scalars().all() + users = (await db.execute( + select(User).where(User.tenant_id == tenant_id, User.status == "active") + )).scalars().all() + return { + "tenant": t, + "jobs": [{"type": j.job_type, "status": j.status, "created": j.created_at} for j in jobs], + "user_count": len(users), + } + + +@router.post("/tenants/{tenant_id}/extend-trial", dependencies=[Depends(_require_admin)]) +async def extend_trial( + tenant_id: UUID, + payload: AdminTrialUpdateRequest, + db: AsyncSession = Depends(get_db), +): + t = await db.get(Tenant, tenant_id) + if not t: + raise HTTPException(status_code=404, detail="Not found") + base = max(t.trial_ends_at or datetime.utcnow(), datetime.utcnow()) + new_end = base + timedelta(days=payload.trial_days) + await db.execute( + sa_update(Tenant).where(Tenant.id == tenant_id) + .values(trial_ends_at=new_end, status="trial") + ) + db.add(AuditLog(tenant_id=tenant_id, action="admin.trial_extended", + details={"extra_days": payload.trial_days, "new_end": new_end.isoformat()})) + await db.commit() + return {"new_trial_end": new_end.isoformat()} + + +@router.post("/tenants/{tenant_id}/suspend", dependencies=[Depends(_require_admin)]) +async def suspend_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + t = await db.get(Tenant, tenant_id) + if not t: + raise HTTPException(status_code=404, detail="Not found") + await db.execute( + sa_update(Tenant).where(Tenant.id == tenant_id).values(status="suspended") + ) + db.add(AuditLog(tenant_id=tenant_id, action="admin.tenant_suspended")) + await db.commit() + return {"status": "suspended"} + + +@router.post("/tenants/{tenant_id}/activate", dependencies=[Depends(_require_admin)]) +async def activate_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + t = await db.get(Tenant, tenant_id) + if not t: + raise HTTPException(status_code=404, detail="Not found") + await db.execute( + sa_update(Tenant).where(Tenant.id == tenant_id).values(status="active") + ) + db.add(AuditLog(tenant_id=tenant_id, action="admin.tenant_activated")) + await db.commit() + return {"status": "active"} + + +@router.delete("/tenants/{tenant_id}", dependencies=[Depends(_require_admin)]) +async def delete_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + """Full deprovisioning: remove vCluster + Keycloak realm.""" + t = await db.get(Tenant, tenant_id) + if not t: + raise HTTPException(status_code=404, detail="Not found") + # Remove GitOps manifests (triggers ArgoCD prune) + await provisioner.deprovision_tenant(t.slug) + # Remove Keycloak realm + await keycloak_service.delete_tenant_realm(t.slug) + await db.execute( + sa_update(Tenant).where(Tenant.id == tenant_id) + .values(status="deleted", deleted_at=datetime.utcnow(), vcluster_status="deleted") + ) + db.add(AuditLog(tenant_id=tenant_id, action="admin.tenant_deleted")) + await db.commit() + return {"status": "deleted"} + + +@router.post("/tenants/{tenant_id}/change-plan", dependencies=[Depends(_require_admin)]) +async def change_plan( + tenant_id: UUID, + payload: AdminPlanUpdateRequest, + db: AsyncSession = Depends(get_db), +): + plan = (await db.execute(select(Plan).where(Plan.name == payload.plan_name))).scalar_one_or_none() + if not plan: + raise HTTPException(status_code=404, detail=f"Plan '{payload.plan_name}' not found") + await db.execute( + sa_update(Tenant).where(Tenant.id == tenant_id).values(plan_id=plan.id) + ) + db.add(AuditLog(tenant_id=tenant_id, action="admin.plan_changed", + details={"new_plan": payload.plan_name})) + await db.commit() + return {"plan": payload.plan_name} + + +# ── Platform Settings ───────────────────────────────────────────────────────── + +@router.get("/settings", dependencies=[Depends(_require_admin)]) +async def list_settings(db: AsyncSession = Depends(get_db)): + rows = (await db.execute(select(PlatformSetting))).scalars().all() + return {r.key: {"value": r.value, "description": r.description} for r in rows} + + +@router.put("/settings/{key}", dependencies=[Depends(_require_admin)]) +async def update_setting(key: str, payload: PlatformSettingUpdate, db: AsyncSession = Depends(get_db)): + row = await db.get(PlatformSetting, key) + if not row: + raise HTTPException(status_code=404, detail=f"Setting '{key}' not found") + row.value = payload.value + await db.commit() + return {"key": key, "value": payload.value} + + +# ── Plans ────────────────────────────────────────────────────────────────────── + +@router.get("/plans", dependencies=[Depends(_require_admin)]) +async def list_plans(db: AsyncSession = Depends(get_db)): + rows = (await db.execute(select(Plan))).scalars().all() + return rows + + +# ── Audit Log ───────────────────────────────────────────────────────────────── + +@router.get("/audit", dependencies=[Depends(_require_admin)]) +async def audit_log( + tenant_id: Optional[UUID] = None, + limit: int = 100, + db: AsyncSession = Depends(get_db), +): + q = select(AuditLog).order_by(AuditLog.created_at.desc()).limit(limit) + if tenant_id: + q = q.where(AuditLog.tenant_id == tenant_id) + rows = (await db.execute(q)).scalars().all() + return rows diff --git a/app/routers/auth.py b/app/routers/auth.py new file mode 100644 index 0000000..5d813ab --- /dev/null +++ b/app/routers/auth.py @@ -0,0 +1,210 @@ +""" +/auth — Sign-up, sign-in (delegates to Keycloak) +""" +from fastapi import APIRouter, Depends, HTTPException, status, BackgroundTasks +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select +from datetime import datetime, timedelta +from slugify import slugify +import secrets + +from ..core.database import get_db +from ..core.config import get_settings +from ..models.tenant import Tenant, User, Plan, TenantTool, ProvisioningJob, AuditLog +from ..schemas.tenant import SignUpRequest, TenantResponse +from ..services import keycloak_service, provisioner + +router = APIRouter(prefix="/auth", tags=["auth"]) + + +async def _default_plan(db: AsyncSession) -> Plan: + settings = get_settings() + result = await db.execute(select(Plan).where(Plan.name == settings.DEFAULT_TRIAL_DAYS or "trial")) + plan = result.scalar_one_or_none() + if not plan: + result = await db.execute(select(Plan).where(Plan.name == "trial")) + plan = result.scalar_one() + return plan + + +@router.post("/signup", response_model=TenantResponse, status_code=status.HTTP_201_CREATED) +async def signup( + payload: SignUpRequest, + background_tasks: BackgroundTasks, + db: AsyncSession = Depends(get_db), +): + """ + User signs up with company name → creates Tenant + Keycloak realm + starts trial. + vCluster provisioning is triggered asynchronously. + """ + s = get_settings() + slug = slugify(payload.company_name, max_length=40, separator="-") + + # Check slug uniqueness + existing = await db.execute(select(Tenant).where(Tenant.slug == slug)) + if existing.scalar_one_or_none(): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"Company name '{payload.company_name}' is already registered." + ) + + plan = await _default_plan(db) + now = datetime.utcnow() + trial_days = plan.trial_days + + # Create tenant record + tenant = Tenant( + slug=slug, + company_name=payload.company_name, + plan_id=plan.id, + status="trial", + trial_started_at=now, + trial_ends_at=now + timedelta(days=trial_days), + trial_duration_days=trial_days, + vcluster_status="provisioning", + tools_enabled=plan.features, + billing_email=payload.email, + keycloak_realm=f"tenant-{slug}", + sso_type="system", + ) + db.add(tenant) + await db.flush() + + # Create owner user record + user = User( + tenant_id=tenant.id, + email=payload.email, + first_name=payload.first_name, + last_name=payload.last_name, + role="owner", + status="active", + ) + db.add(user) + + # Default tool rows + for tool, enabled in plan.features.items(): + db.add(TenantTool(tenant_id=tenant.id, tool_name=tool, enabled=bool(enabled))) + + # Provisioning job + job = ProvisioningJob( + tenant_id=tenant.id, + job_type="create_vcluster", + payload={ + "slug": slug, + "company_name": payload.company_name, + "plan": plan.name, + "tools": plan.features, + "admin_email": payload.email, + } + ) + db.add(job) + db.add(AuditLog( + tenant_id=tenant.id, + action="tenant.signup", + details={"email": payload.email, "plan": plan.name}, + )) + await db.commit() + await db.refresh(tenant) + + # Async: create Keycloak realm + provision vCluster + background_tasks.add_task( + _provision_async, + tenant_id=str(tenant.id), + job_id=str(job.id), + slug=slug, + company_name=payload.company_name, + admin_email=payload.email, + plan_name=plan.name, + tools=plan.features, + ) + + return _to_response(tenant, plan.name) + + +async def _provision_async(tenant_id, job_id, slug, company_name, + admin_email, plan_name, tools): + """Background task: Keycloak realm + GitOps vCluster write.""" + from ..core.database import get_session_factory + from sqlalchemy import update as sa_update + from datetime import datetime + import logging + log = logging.getLogger(__name__) + + factory = get_session_factory() + async with factory() as db: + try: + # Mark job running + await db.execute( + sa_update(ProvisioningJob) + .where(ProvisioningJob.id == job_id) + .values(status="running", started_at=datetime.utcnow()) + ) + await db.commit() + + # 1. Create Keycloak realm + await keycloak_service.create_tenant_realm(slug, company_name, admin_email) + + # 2. Write gitops manifests + result = await provisioner.provision_tenant( + tenant_slug=slug, + company_name=company_name, + plan_name=plan_name, + tools_enabled=tools, + ) + + # 3. Update tenant with vcluster info + await db.execute( + sa_update(Tenant) + .where(Tenant.id == tenant_id) + .values( + vcluster_name=result["vcluster_name"], + vcluster_url=result["vcluster_url"], + vcluster_status="ready", + status="trial", + ) + ) + await db.execute( + sa_update(ProvisioningJob) + .where(ProvisioningJob.id == job_id) + .values(status="succeeded", completed_at=datetime.utcnow()) + ) + await db.commit() + log.info("Provisioned tenant %s", slug) + + except Exception as exc: + log.exception("Provisioning failed for %s", slug) + await db.execute( + sa_update(ProvisioningJob) + .where(ProvisioningJob.id == job_id) + .values(status="failed", error=str(exc), completed_at=datetime.utcnow()) + ) + await db.execute( + sa_update(Tenant) + .where(Tenant.id == tenant_id) + .values(vcluster_status="not_created") + ) + await db.commit() + + +def _to_response(tenant: Tenant, plan_name: str) -> TenantResponse: + s = get_settings() + dp_url = ( + f"https://data.{s.DATA_PLANE_DOMAIN}?tenant={tenant.slug}" + if tenant.vcluster_status in ("ready", "provisioning") + else None + ) + return TenantResponse( + id=tenant.id, + slug=tenant.slug, + company_name=tenant.company_name, + status=tenant.status, + plan_name=plan_name, + trial_ends_at=tenant.trial_ends_at, + vcluster_status=tenant.vcluster_status, + vcluster_url=tenant.vcluster_url, + data_plane_url=dp_url, + tools_enabled=tenant.tools_enabled, + keycloak_realm=tenant.keycloak_realm, + sso_type=tenant.sso_type, + created_at=tenant.created_at, + ) diff --git a/app/routers/tenants.py b/app/routers/tenants.py new file mode 100644 index 0000000..bf9e25e --- /dev/null +++ b/app/routers/tenants.py @@ -0,0 +1,213 @@ +""" +/tenants — Tenant self-service: tools, SSO, users +""" +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select, update as sa_update +from uuid import UUID +from datetime import datetime + +from ..core.database import get_db +from ..core.config import get_settings +from ..models.tenant import Tenant, User, TenantTool, AuditLog, Plan +from ..schemas.tenant import ( + TenantResponse, ToolUpdateRequest, CustomSSORequest, UserInviteRequest +) +from ..services import keycloak_service, provisioner + +router = APIRouter(prefix="/tenants", tags=["tenants"]) + + +async def _get_tenant(tenant_id: UUID, db: AsyncSession) -> Tenant: + r = await db.execute(select(Tenant).where(Tenant.id == tenant_id, Tenant.deleted_at.is_(None))) + t = r.scalar_one_or_none() + if not t: + raise HTTPException(status_code=404, detail="Tenant not found") + return t + + +# ── GET tenant info ────────────────────────────────────────────────────────── + +@router.get("/by-slug/{slug}", response_model=TenantResponse) +async def get_tenant_by_slug(slug: str, db: AsyncSession = Depends(get_db)): + r = await db.execute(select(Tenant).where(Tenant.slug == slug, Tenant.deleted_at.is_(None))) + t = r.scalar_one_or_none() + if not t: + raise HTTPException(status_code=404, detail="Tenant not found") + plan = await db.get(Plan, t.plan_id) + return _to_response(t, plan.name if plan else "unknown") + + +@router.get("/{tenant_id}", response_model=TenantResponse) +async def get_tenant(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + t = await _get_tenant(tenant_id, db) + plan = await db.get(Plan, t.plan_id) + return _to_response(t, plan.name if plan else "unknown") + + +# ── Tool management ────────────────────────────────────────────────────────── + +@router.patch("/{tenant_id}/tools") +async def update_tools( + tenant_id: UUID, + payload: ToolUpdateRequest, + db: AsyncSession = Depends(get_db), +): + """Enable/disable/scale data tools. Superset is always on by default.""" + t = await _get_tenant(tenant_id, db) + plan = await db.get(Plan, t.plan_id) + + # Enforce plan feature gating + new_tools = payload.to_dict(t.tools_enabled) + if plan: + for tool, enabled in new_tools.items(): + if enabled and not plan.features.get(tool, False): + raise HTTPException( + status_code=402, + detail=f"Tool '{tool}' is not available on plan '{plan.name}'. Please upgrade." + ) + + # Always keep Superset on (default tool) + new_tools["superset"] = True + + # Update DB + await db.execute( + sa_update(Tenant).where(Tenant.id == tenant_id).values(tools_enabled=new_tools) + ) + for tool, enabled in new_tools.items(): + existing = await db.execute( + select(TenantTool).where(TenantTool.tenant_id == tenant_id, TenantTool.tool_name == tool) + ) + row = existing.scalar_one_or_none() + if row: + row.enabled = bool(enabled) + else: + db.add(TenantTool(tenant_id=tenant_id, tool_name=tool, enabled=bool(enabled))) + + db.add(AuditLog(tenant_id=tenant_id, action="tools.updated", details={"tools": new_tools})) + await db.commit() + + # Push gitops update + s = get_settings() + await provisioner.update_tenant_tools(t.slug, new_tools) + + tool_urls = { + tool: f"https://{t.slug}-{tool}.{s.DATA_PLANE_DOMAIN}" + for tool, en in new_tools.items() if en + } + return {"tools_enabled": new_tools, "tool_urls": tool_urls} + + +# ── SSO management ─────────────────────────────────────────────────────────── + +@router.get("/{tenant_id}/sso") +async def get_sso_config(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + t = await _get_tenant(tenant_id, db) + s = get_settings() + return { + "sso_type": t.sso_type, + "system_sso_url": f"{s.KEYCLOAK_URL}/realms/{t.keycloak_realm}", + "custom_config": t.custom_sso_config, + } + + +@router.post("/{tenant_id}/sso/custom") +async def configure_custom_sso( + tenant_id: UUID, + payload: CustomSSORequest, + db: AsyncSession = Depends(get_db), +): + """Configure a custom OIDC/SAML IdP within the tenant's Keycloak realm.""" + t = await _get_tenant(tenant_id, db) + plan = await db.get(Plan, t.plan_id) + if not (plan and plan.features.get("custom_sso")): + raise HTTPException(status_code=402, detail="Custom SSO requires Enterprise plan.") + + await keycloak_service.add_custom_idp(t.slug, payload.model_dump()) + await db.execute( + sa_update(Tenant) + .where(Tenant.id == tenant_id) + .values(sso_type="custom", custom_sso_config=payload.model_dump()) + ) + db.add(AuditLog(tenant_id=tenant_id, action="sso.custom_idp_added", + details={"alias": payload.alias})) + await db.commit() + return {"status": "Custom SSO configured", "alias": payload.alias} + + +@router.delete("/{tenant_id}/sso/custom") +async def remove_custom_sso(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + t = await _get_tenant(tenant_id, db) + await db.execute( + sa_update(Tenant).where(Tenant.id == tenant_id) + .values(sso_type="system", custom_sso_config=None) + ) + db.add(AuditLog(tenant_id=tenant_id, action="sso.reverted_to_system")) + await db.commit() + return {"status": "Reverted to system SSO"} + + +# ── User management ────────────────────────────────────────────────────────── + +@router.get("/{tenant_id}/users") +async def list_users(tenant_id: UUID, db: AsyncSession = Depends(get_db)): + t = await _get_tenant(tenant_id, db) + kc_users = await keycloak_service.get_tenant_users(t.slug) + return {"users": kc_users} + + +@router.post("/{tenant_id}/users", status_code=201) +async def invite_user( + tenant_id: UUID, + payload: UserInviteRequest, + db: AsyncSession = Depends(get_db), +): + t = await _get_tenant(tenant_id, db) + kc_id = await keycloak_service.create_tenant_user( + t.slug, payload.email, payload.first_name, payload.last_name, payload.role + ) + user = User( + tenant_id=tenant_id, + keycloak_user_id=kc_id, + email=payload.email, + first_name=payload.first_name, + last_name=payload.last_name, + role=payload.role, + ) + db.add(user) + db.add(AuditLog(tenant_id=tenant_id, action="user.invited", + details={"email": payload.email, "role": payload.role})) + await db.commit() + return {"status": "invited", "keycloak_id": kc_id} + + +@router.delete("/{tenant_id}/users/{kc_user_id}") +async def remove_user(tenant_id: UUID, kc_user_id: str, db: AsyncSession = Depends(get_db)): + t = await _get_tenant(tenant_id, db) + await keycloak_service.delete_tenant_user(t.slug, kc_user_id) + await db.execute( + sa_update(User) + .where(User.tenant_id == tenant_id, User.keycloak_user_id == kc_user_id) + .values(status="deleted") + ) + db.add(AuditLog(tenant_id=tenant_id, action="user.removed", + details={"kc_user_id": kc_user_id})) + await db.commit() + return {"status": "removed"} + + +def _to_response(tenant: Tenant, plan_name: str) -> TenantResponse: + s = get_settings() + dp_url = ( + f"https://data.{s.DATA_PLANE_DOMAIN}?tenant={tenant.slug}" + if tenant.vcluster_status in ("ready", "provisioning") else None + ) + return TenantResponse( + id=tenant.id, slug=tenant.slug, company_name=tenant.company_name, + status=tenant.status, plan_name=plan_name, + trial_ends_at=tenant.trial_ends_at, + vcluster_status=tenant.vcluster_status, vcluster_url=tenant.vcluster_url, + data_plane_url=dp_url, tools_enabled=tenant.tools_enabled, + keycloak_realm=tenant.keycloak_realm, sso_type=tenant.sso_type, + created_at=tenant.created_at, + ) diff --git a/app/schemas/tenant.py b/app/schemas/tenant.py new file mode 100644 index 0000000..64da40f --- /dev/null +++ b/app/schemas/tenant.py @@ -0,0 +1,91 @@ +from pydantic import BaseModel, EmailStr, Field, field_validator +from typing import Optional +from datetime import datetime +from uuid import UUID +import re + + +def _validate_slug(v: str) -> str: + slug = re.sub(r"[^a-z0-9-]", "-", v.lower()).strip("-") + if len(slug) < 2: + raise ValueError("Company name too short") + if len(slug) > 40: + raise ValueError("Company name too long (max 40 chars)") + return slug + + +class SignUpRequest(BaseModel): + email: EmailStr + first_name: str = Field(..., min_length=1, max_length=100) + last_name: str = Field(..., min_length=1, max_length=100) + company_name: str = Field(..., min_length=2, max_length=100) + password: str = Field(..., min_length=8) + + @field_validator("company_name") + @classmethod + def validate_company(cls, v: str) -> str: + return v # raw company name; slug derived server-side + + +class TenantResponse(BaseModel): + id: UUID + slug: str + company_name: str + status: str + plan_name: Optional[str] = None + trial_ends_at: Optional[datetime] = None + vcluster_status: str + vcluster_url: Optional[str] = None + data_plane_url: Optional[str] = None + tools_enabled: dict + keycloak_realm: Optional[str] = None + sso_type: str + created_at: datetime + + class Config: + from_attributes = True + + +class ToolUpdateRequest(BaseModel): + superset: Optional[bool] = None + airflow: Optional[bool] = None + trino: Optional[bool] = None + nifi: Optional[bool] = None + spark: Optional[bool] = None + + def to_dict(self, current: dict) -> dict: + updated = dict(current) + for tool in ["superset", "airflow", "trino", "nifi", "spark"]: + val = getattr(self, tool) + if val is not None: + updated[tool] = val + return updated + + +class CustomSSORequest(BaseModel): + alias: str = Field(..., min_length=1) + display_name: str = Field(..., min_length=1) + provider_id: str = Field(default="oidc", pattern="^(oidc|saml)$") + authorization_url: str = Field(...) + token_url: str = Field(...) + client_id: str = Field(...) + client_secret: str = Field(...) + + +class UserInviteRequest(BaseModel): + email: EmailStr + first_name: str = Field(..., min_length=1) + last_name: str = Field(..., min_length=1) + role: str = Field(default="member", pattern="^(admin|member|viewer)$") + + +class AdminTrialUpdateRequest(BaseModel): + trial_days: int = Field(..., ge=1, le=365) + + +class AdminPlanUpdateRequest(BaseModel): + plan_name: str + + +class PlatformSettingUpdate(BaseModel): + value: str diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/keycloak_service.py b/app/services/keycloak_service.py new file mode 100644 index 0000000..e82e26d --- /dev/null +++ b/app/services/keycloak_service.py @@ -0,0 +1,182 @@ +""" +Keycloak Service — manages realms, clients and users per tenant. + +Each tenant gets its own Keycloak realm, providing full SSO isolation. +Users can optionally configure a custom OIDC/SAML IdP within their realm. +""" +import logging +from keycloak import KeycloakAdmin, KeycloakOpenIDConnection +from ..core.config import get_settings + +log = logging.getLogger(__name__) + + +def _admin_client() -> KeycloakAdmin: + s = get_settings() + conn = KeycloakOpenIDConnection( + server_url=s.KEYCLOAK_URL, + username=s.KEYCLOAK_ADMIN_USER, + password=s.KEYCLOAK_ADMIN_PASSWORD, + realm_name=s.KEYCLOAK_MASTER_REALM, + verify=False, + ) + return KeycloakAdmin(connection=conn) + + +def realm_name(tenant_slug: str) -> str: + return f"tenant-{tenant_slug}" + + +async def create_tenant_realm(tenant_slug: str, company_name: str, admin_email: str) -> str: + """ + Create an isolated Keycloak realm for the tenant. + Returns the realm name. + """ + rname = realm_name(tenant_slug) + ka = _admin_client() + + existing = [r["realm"] for r in ka.get_realms()] + if rname in existing: + log.info("Realm %s already exists", rname) + return rname + + ka.create_realm({ + "realm": rname, + "displayName": company_name, + "enabled": True, + "registrationAllowed": False, + "resetPasswordAllowed": True, + "rememberMe": True, + "loginWithEmailAllowed": True, + "duplicateEmailsAllowed": False, + "sslRequired": "external", + "bruteForceProtected": True, + "accessTokenLifespan": 3600, + "ssoSessionIdleTimeout": 86400, + }) + + # Create platform-api client in the new realm + ka_realm = KeycloakAdmin( + server_url=get_settings().KEYCLOAK_URL, + username=get_settings().KEYCLOAK_ADMIN_USER, + password=get_settings().KEYCLOAK_ADMIN_PASSWORD, + realm_name=rname, + verify=False, + ) + s = get_settings() + base = f"https://app.{s.DOMAIN}" + ka_realm.create_client({ + "clientId": "platform-portal", + "name": "Platform Portal", + "enabled": True, + "publicClient": True, + "standardFlowEnabled": True, + "directAccessGrantsEnabled": True, + "redirectUris": [f"{base}/*", f"https://data.{s.DOMAIN}/*"], + "webOrigins": [f"https://app.{s.DOMAIN}", f"https://data.{s.DOMAIN}"], + "protocol": "openid-connect", + }) + + # Create the initial tenant admin user + ka_realm.create_user({ + "email": admin_email, + "username": admin_email, + "enabled": True, + "emailVerified": True, + "realmRoles": ["realm-admin"], + }) + + log.info("Created realm %s", rname) + return rname + + +async def delete_tenant_realm(tenant_slug: str) -> None: + rname = realm_name(tenant_slug) + ka = _admin_client() + existing = [r["realm"] for r in ka.get_realms()] + if rname in existing: + ka.delete_realm(rname) + log.info("Deleted realm %s", rname) + + +async def add_custom_idp(tenant_slug: str, idp_config: dict) -> None: + """ + Add a custom OIDC/SAML Identity Provider to the tenant realm. + idp_config keys: alias, display_name, provider_id (oidc/saml), + authorization_url, token_url, client_id, client_secret + """ + rname = realm_name(tenant_slug) + ka = KeycloakAdmin( + server_url=get_settings().KEYCLOAK_URL, + username=get_settings().KEYCLOAK_ADMIN_USER, + password=get_settings().KEYCLOAK_ADMIN_PASSWORD, + realm_name=rname, + verify=False, + ) + ka.create_idp({ + "alias": idp_config.get("alias", "custom-sso"), + "displayName": idp_config.get("display_name", "Company SSO"), + "providerId": idp_config.get("provider_id", "oidc"), + "enabled": True, + "trustEmail": True, + "config": { + "authorizationUrl": idp_config.get("authorization_url", ""), + "tokenUrl": idp_config.get("token_url", ""), + "clientId": idp_config.get("client_id", ""), + "clientSecret": idp_config.get("client_secret", ""), + "defaultScope": "openid email profile", + "syncMode": "IMPORT", + }, + }) + + +async def create_tenant_user(tenant_slug: str, email: str, first_name: str, + last_name: str, role: str = "member", + temp_password: str = None) -> str: + """Create a user in the tenant's realm. Returns KC user ID.""" + rname = realm_name(tenant_slug) + ka = KeycloakAdmin( + server_url=get_settings().KEYCLOAK_URL, + username=get_settings().KEYCLOAK_ADMIN_USER, + password=get_settings().KEYCLOAK_ADMIN_PASSWORD, + realm_name=rname, + verify=False, + ) + user_id = ka.create_user({ + "email": email, + "username": email, + "firstName": first_name, + "lastName": last_name, + "enabled": True, + "emailVerified": False, + "attributes": {"platform_role": [role]}, + }) + if temp_password: + ka.set_user_password(user_id, temp_password, temporary=True) + else: + ka.send_verify_email(user_id) + return user_id + + +async def get_tenant_users(tenant_slug: str) -> list[dict]: + rname = realm_name(tenant_slug) + ka = KeycloakAdmin( + server_url=get_settings().KEYCLOAK_URL, + username=get_settings().KEYCLOAK_ADMIN_USER, + password=get_settings().KEYCLOAK_ADMIN_PASSWORD, + realm_name=rname, + verify=False, + ) + return ka.get_users() + + +async def delete_tenant_user(tenant_slug: str, kc_user_id: str) -> None: + rname = realm_name(tenant_slug) + ka = KeycloakAdmin( + server_url=get_settings().KEYCLOAK_URL, + username=get_settings().KEYCLOAK_ADMIN_USER, + password=get_settings().KEYCLOAK_ADMIN_PASSWORD, + realm_name=rname, + verify=False, + ) + ka.delete_user(kc_user_id) diff --git a/app/services/provisioner.py b/app/services/provisioner.py new file mode 100644 index 0000000..34e4e88 --- /dev/null +++ b/app/services/provisioner.py @@ -0,0 +1,717 @@ +""" +Tenant Provisioner — manages vCluster + tool lifecycle via GitOps. + +Flow: + 1. Tenant signs up → platform-api calls provision_tenant() + 2. provisioner writes two files to the gitops repo: + data-plane/tenants//vcluster.yaml ← ArgoCD Application (vCluster) + data-plane/tenants//appset.yaml ← ApplicationSet (tool apps) + 3. For each enabled tool it writes: + data-plane/tenants//apps/.yaml ← ArgoCD Application (Helm chart) + 4. Git push → ArgoCD on data-plane picks it all up automatically. + 5. deprovision_tenant() removes the directory → ArgoCD prunes everything. +""" +import logging +import os +import re +import secrets +import shutil +import subprocess +from pathlib import Path +from ..core.config import get_settings + +log = logging.getLogger(__name__) + + +# ── Tool Helm chart templates ──────────────────────────────────────────────── + +def _superset_app(tenant: str, domain: str, kc_base: str, kc_secret: str) -> str: + secret_key = secrets.token_urlsafe(42) + return f"""\ +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: {tenant}-superset + namespace: argocd + annotations: + argocd.argoproj.io/sync-wave: "3" +spec: + project: default + source: + repoURL: https://apache.github.io/superset + chart: superset + targetRevision: "0.15.2" + helm: + valuesObject: + replicaCount: 1 + supersetNode: + replicaCount: 1 + supersetCeleryBeat: + enabled: false + supersetWorker: + replicaCount: 0 + init: + adminUser: + username: admin + firstname: Admin + lastname: {tenant} + email: admin@{domain} + password: "Architecture@9988#" + loadExamples: false + postgresql: + enabled: false + redis: + enabled: false + service: + type: ClusterIP + port: 8088 + ingress: + enabled: true + ingressClassName: nginx + annotations: + cert-manager.io/cluster-issuer: letsencrypt-prod + nginx.ingress.kubernetes.io/proxy-buffer-size: "16k" + hosts: + - host: {tenant}-superset.{domain} + paths: + - path: / + pathType: Prefix + tls: + - hosts: + - {tenant}-superset.{domain} + secretName: {tenant}-superset-tls + # initContainer installs psycopg2 (missing from apache/superset:5.x image) + initContainers: + - name: install-drivers + image: apachesuperset.docker.scarf.sh/apache/superset:5.0.0 + command: + - sh + - -c + - pip install psycopg2-binary Authlib --user --no-cache-dir --quiet + volumeMounts: + - name: pip-packages + mountPath: /app/superset_home/.local + extraVolumes: + - name: pip-packages + emptyDir: {{}} + extraVolumeMounts: + - name: pip-packages + mountPath: /app/superset_home/.local + extraEnv: + PYTHONPATH: "/app/superset_home/.local/lib/python3.10/site-packages" + DB_HOST: "{tenant}-superset-pg" + DB_PORT: "5432" + DB_USER: "superset" + DB_PASS: "superset-pass" + DB_NAME: "superset" + REDIS_HOST: "{tenant}-superset-redis" + REDIS_PORT: "6379" + bootstrapScript: | + #!/bin/bash + echo "Drivers pre-installed via initContainer" + configOverrides: + keycloak_sso: | + from flask_appbuilder.security.manager import AUTH_OAUTH + from superset.security import SupersetSecurityManager + import logging + log = logging.getLogger(__name__) + AUTH_TYPE = AUTH_OAUTH + AUTH_USER_REGISTRATION = True + AUTH_USER_REGISTRATION_ROLE = "Gamma" + AUTH_ROLES_SYNC_AT_LOGIN = True + AUTH_ROLES_MAPPING = {{"admin": ["Admin"], "platform_admin": ["Admin"]}} + OAUTH_PROVIDERS = [{{ + "name": "keycloak", + "icon": "fa-key", + "token_key": "access_token", + "remote_app": {{ + "client_id": "superset", + "client_secret": "{kc_secret}", + "server_metadata_url": "{kc_base}/realms/{tenant}/.well-known/openid-configuration", + "api_base_url": "{kc_base}/realms/{tenant}/protocol/openid-connect", + "client_kwargs": {{"scope": "openid email profile roles"}}, + }}, + }}] + class KeycloakSM(SupersetSecurityManager): + def oauth_user_info(self, provider, response=None): + if provider == "keycloak": + me = self.appbuilder.sm.oauth_remoteapp.get("userinfo").json() + roles = me.get("realm_access", {{}}).get("roles", []) + return {{ + "username": me.get("preferred_username", me.get("email")), + "first_name": me.get("given_name", ""), + "last_name": me.get("family_name", ""), + "email": me.get("email", ""), + "role_keys": roles, + }} + return {{}} + CUSTOM_SECURITY_MANAGER = KeycloakSM + secret: | + SECRET_KEY = '{secret_key}' + SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://superset:superset-pass@{tenant}-superset-pg:5432/superset' + REDIS_HOST = '{tenant}-superset-redis' + REDIS_PORT = 6379 + DATA_CACHE_CONFIG = {{'CACHE_TYPE': 'SimpleCache'}} + CACHE_CONFIG = {{'CACHE_TYPE': 'SimpleCache'}} + WTF_CSRF_ENABLED = False + TALISMAN_ENABLED = False + SESSION_COOKIE_SAMESITE = None + SESSION_COOKIE_SECURE = True + ENABLE_PROXY_FIX = True + PREFERRED_URL_SCHEME = 'https' + destination: + server: https://kubernetes.default.svc + namespace: {tenant}-superset + syncPolicy: + automated: {{prune: true, selfHeal: true}} + syncOptions: [CreateNamespace=true] +""" + + +def _superset_infra(tenant: str) -> str: + """Standalone PostgreSQL + Redis for Superset (using registry-accessible images).""" + return f"""\ +apiVersion: v1 +kind: Secret +metadata: + name: {tenant}-superset-pg-secret + namespace: {tenant}-superset +stringData: + POSTGRES_DB: superset + POSTGRES_USER: superset + POSTGRES_PASSWORD: "superset-pass" +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {tenant}-superset-pg + namespace: {tenant}-superset +spec: + serviceName: {tenant}-superset-pg + replicas: 1 + selector: + matchLabels: + app: {tenant}-superset-pg + template: + metadata: + labels: + app: {tenant}-superset-pg + spec: + containers: + - name: postgres + image: postgres:16-alpine + envFrom: + - secretRef: + name: {tenant}-superset-pg-secret + ports: + - containerPort: 5432 + volumeMounts: + - name: data + mountPath: /var/lib/postgresql/data + subPath: pgdata + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: [ReadWriteOnce] + resources: + requests: + storage: 5Gi +--- +apiVersion: v1 +kind: Service +metadata: + name: {tenant}-superset-pg + namespace: {tenant}-superset +spec: + selector: + app: {tenant}-superset-pg + ports: + - port: 5432 + targetPort: 5432 +--- +# Alias so the Superset Helm chart's default 'superset-postgresql' hostname resolves +apiVersion: v1 +kind: Service +metadata: + name: superset-postgresql + namespace: {tenant}-superset +spec: + selector: + app: {tenant}-superset-pg + ports: + - port: 5432 + targetPort: 5432 +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {tenant}-superset-redis + namespace: {tenant}-superset +spec: + replicas: 1 + selector: + matchLabels: + app: {tenant}-superset-redis + template: + metadata: + labels: + app: {tenant}-superset-redis + spec: + containers: + - name: redis + image: redis:7-alpine + ports: + - containerPort: 6379 +--- +apiVersion: v1 +kind: Service +metadata: + name: {tenant}-superset-redis + namespace: {tenant}-superset +spec: + selector: + app: {tenant}-superset-redis + ports: + - port: 6379 + targetPort: 6379 +--- +# Alias for Helm chart default hostname +apiVersion: v1 +kind: Service +metadata: + name: superset-redis-headless + namespace: {tenant}-superset +spec: + selector: + app: {tenant}-superset-redis + ports: + - port: 6379 + targetPort: 6379 +""" + + +def _airflow_app(tenant: str, domain: str, kc_base: str) -> str: + return f"""\ +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: {tenant}-airflow + namespace: argocd + annotations: + argocd.argoproj.io/sync-wave: "3" +spec: + project: default + source: + repoURL: https://airflow.apache.org + chart: airflow + targetRevision: "1.14.0" + helm: + valuesObject: + images: + airflow: + repository: quay.io/dlytica_dev/airflow + tag: v2-airflow-nifi-pipeline-utils-2.0.1 + pullPolicy: IfNotPresent + executor: KubernetesExecutor + config: + core: + load_examples: "False" + postgresql: + enabled: true + image: + registry: quay.io + repository: dlytica_dev/postgresql + tag: "16.1" + redis: + enabled: true + ingress: + web: + enabled: true + ingressClassName: nginx + hosts: + - name: {tenant}-airflow.{domain} + tls: + enabled: true + secretName: {tenant}-airflow-tls + annotations: + cert-manager.io/cluster-issuer: letsencrypt-prod + destination: + server: https://kubernetes.default.svc + namespace: {tenant}-airflow + syncPolicy: + automated: {{prune: true, selfHeal: true}} + syncOptions: [CreateNamespace=true] +""" + + +def _trino_app(tenant: str, domain: str) -> str: + return f"""\ +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: {tenant}-trino + namespace: argocd + annotations: + argocd.argoproj.io/sync-wave: "3" +spec: + project: default + source: + repoURL: https://trinodb.github.io/charts + chart: trino + targetRevision: "0.13.0" + helm: + valuesObject: + image: + repository: quay.io/dlytica_dev/trino + tag: "476" + server: + workers: 1 + ingress: + enabled: true + className: nginx + hosts: + - host: {tenant}-trino.{domain} + paths: + - path: / + pathType: Prefix + tls: + - hosts: [{tenant}-trino.{domain}] + secretName: {tenant}-trino-tls + annotations: + cert-manager.io/cluster-issuer: letsencrypt-prod + destination: + server: https://kubernetes.default.svc + namespace: {tenant}-trino + syncPolicy: + automated: {{prune: true, selfHeal: true}} + syncOptions: [CreateNamespace=true] +""" + + +def _gitea_app(tenant: str, domain: str) -> str: + return f"""\ +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: {tenant}-gitea + namespace: argocd + annotations: + argocd.argoproj.io/sync-wave: "2" +spec: + project: default + source: + repoURL: https://dl.gitea.com/charts + chart: gitea + targetRevision: "10.6.0" + helm: + valuesObject: + gitea: + admin: + username: admin + password: "Architecture@9988#" + email: admin@{domain} + ingress: + enabled: true + className: nginx + annotations: + cert-manager.io/cluster-issuer: letsencrypt-prod + hosts: + - host: {tenant}-git.{domain} + paths: + - path: / + pathType: Prefix + tls: + - hosts: [{tenant}-git.{domain}] + secretName: {tenant}-gitea-tls + postgresql-ha: + enabled: false + postgresql: + enabled: true + image: + registry: quay.io + repository: dlytica_dev/postgresql + tag: "16.1" + destination: + server: https://kubernetes.default.svc + namespace: {tenant}-gitea + syncPolicy: + automated: {{prune: true, selfHeal: true}} + syncOptions: [CreateNamespace=true] +""" + + +# Map tool names to their generator functions +TOOL_GENERATORS = { + "superset": lambda tenant, domain, kc_base, kc_secret: _superset_app(tenant, domain, kc_base, kc_secret), + "airflow": lambda tenant, domain, kc_base, kc_secret: _airflow_app(tenant, domain, kc_base), + "trino": lambda tenant, domain, kc_base, kc_secret: _trino_app(tenant, domain), + "gitea": lambda tenant, domain, kc_base, kc_secret: _gitea_app(tenant, domain), +} + +# Tools that need companion infra manifests +INFRA_GENERATORS = { + "superset": _superset_infra, +} + +# Default tools for every new tenant (always on) +DEFAULT_TOOLS = {"superset": True} + + +# ── vCluster template ──────────────────────────────────────────────────────── + +def _vcluster_app(tenant: str, domain: str, namespace: str, gitops_repo: str, gitops_branch: str) -> str: + return f"""\ +# vCluster for tenant: {tenant} +# ArgoCD on the data-plane deploys this. Once the vCluster is up, the +# ApplicationSet below drives all tool deployments inside it. +apiVersion: argoproj.io/v1alpha1 +kind: Application +metadata: + name: vcluster-{tenant} + namespace: argocd + labels: + tenant: {tenant} + managed-by: platform + annotations: + argocd.argoproj.io/sync-wave: "1" +spec: + project: default + source: + repoURL: https://charts.loft.sh + chart: vcluster + targetRevision: "0.20.0" + helm: + valuesObject: + sync: + ingresses: + enabled: true + exportKubeConfig: + secret: + name: vcluster-{tenant}-kubeconfig + controlPlane: + ingress: + enabled: true + host: vc-{tenant}.{domain} + annotations: + cert-manager.io/cluster-issuer: letsencrypt-prod + init: + manifests: | + apiVersion: v1 + kind: Namespace + metadata: + name: argocd + destination: + server: https://kubernetes.default.svc + namespace: {namespace} + syncPolicy: + automated: {{prune: true, selfHeal: true}} + syncOptions: [CreateNamespace=true] +""" + + +def _appset(tenant: str, gitops_repo: str, gitops_branch: str) -> str: + """ApplicationSet that watches data-plane/tenants//apps/ and deploys each app.""" + return f"""\ +# ApplicationSet for tenant: {tenant} +# Watches gitops/data-plane/tenants/{tenant}/apps/* and creates one ArgoCD Application per tool. +# Add/remove a file → ArgoCD adds/removes the tool automatically. +apiVersion: argoproj.io/v1alpha1 +kind: ApplicationSet +metadata: + name: {tenant}-tools + namespace: argocd + labels: + tenant: {tenant} + managed-by: platform +spec: + generators: + - git: + repoURL: {gitops_repo} + revision: {gitops_branch} + directories: + - path: data-plane/tenants/{tenant}/apps/* + template: + metadata: + name: "{tenant}-{{{{path.basename}}}}" + namespace: argocd + labels: + tenant: {tenant} + spec: + project: default + source: + repoURL: {gitops_repo} + targetRevision: {gitops_branch} + path: "{{{{path}}}}" + destination: + server: https://kubernetes.default.svc + namespace: "{tenant}-{{{{path.basename}}}}" + syncPolicy: + automated: {{prune: true, selfHeal: true}} + syncOptions: [CreateNamespace=true] + retry: + limit: 5 + backoff: {{duration: 30s, factor: 2, maxDuration: 5m}} +""" + + +# ── Git helpers ────────────────────────────────────────────────────────────── + +def _git_clone_or_pull(repo_url: str, local_path: str, branch: str) -> None: + p = Path(local_path) + if p.exists(): + subprocess.run( + ["git", "-C", local_path, "pull", "--rebase", "origin", branch], + check=True, capture_output=True, + ) + else: + subprocess.run( + ["git", "clone", "--depth=1", "-b", branch, repo_url, local_path], + check=True, capture_output=True, + ) + + +def _git_push(local_path: str, message: str, branch: str) -> None: + cmds = [ + ["git", "-C", local_path, "config", "user.email", "platform-api@dlytica.com"], + ["git", "-C", local_path, "config", "user.name", "Platform API"], + ["git", "-C", local_path, "add", "-A"], + ["git", "-C", local_path, "commit", "-m", message, "--allow-empty"], + ["git", "-C", local_path, "push", "origin", branch], + ] + for cmd in cmds: + subprocess.run(cmd, check=True, capture_output=True) + + +def _slug_safe(s: str) -> str: + return re.sub(r"[^a-z0-9-]", "-", s.lower()).strip("-")[:40] + + +# ── Public API ─────────────────────────────────────────────────────────────── + +async def provision_tenant( + tenant_slug: str, + company_name: str, + plan_name: str, + tools_enabled: dict, + domain: str = None, + kc_client_secret: str = "", +) -> dict: + """ + Write vCluster + ApplicationSet + tool Applications to gitops repo. + ArgoCD on the data-plane picks them up automatically. + Returns dict with vcluster_name, vcluster_url, tool_urls. + """ + s = get_settings() + base_domain = domain or s.DATA_PLANE_DOMAIN + kc_base = s.KEYCLOAK_URL + tenant = _slug_safe(tenant_slug) + namespace = s.VCLUSTER_NAMESPACE + local = s.GITOPS_LOCAL_PATH + + _git_clone_or_pull(s.GITOPS_REPO_URL, local, s.GITOPS_BRANCH) + + tenant_dir = Path(local) / "data-plane" / "tenants" / tenant + tenant_dir.mkdir(parents=True, exist_ok=True) + apps_dir = tenant_dir / "apps" + apps_dir.mkdir(exist_ok=True) + + # 1. vCluster Application + (tenant_dir / "vcluster.yaml").write_text( + _vcluster_app(tenant, base_domain, namespace, s.GITOPS_REPO_URL, s.GITOPS_BRANCH) + ) + + # 2. ApplicationSet (auto-discovers apps/) + (tenant_dir / "appset.yaml").write_text( + _appset(tenant, s.GITOPS_REPO_URL, s.GITOPS_BRANCH) + ) + + # 3. Tool Applications — always include superset; add others per plan + merged = {**DEFAULT_TOOLS, **tools_enabled} + + tool_urls = {} + for tool, gen in TOOL_GENERATORS.items(): + tool_dir = apps_dir / tool + infra_path = apps_dir / f"{tool}-infra.yaml" + if merged.get(tool): + tool_dir.mkdir(exist_ok=True) + (tool_dir / "application.yaml").write_text( + gen(tenant, base_domain, kc_base, kc_client_secret) + ) + # Write companion infra manifests if needed (e.g. postgres + redis for superset) + if tool in INFRA_GENERATORS: + (tool_dir / "infra.yaml").write_text(INFRA_GENERATORS[tool](tenant)) + tool_urls[tool] = f"https://{tenant}-{tool}.{base_domain}" + else: + # Remove if disabled + if tool_dir.exists(): + shutil.rmtree(tool_dir) + infra_path.unlink(missing_ok=True) + + _git_push(local, f"feat: provision tenant {tenant} ({company_name})", s.GITOPS_BRANCH) + + return { + "vcluster_name": f"vcluster-{tenant}", + "vcluster_url": f"https://vc-{tenant}.{base_domain}", + "tool_urls": tool_urls, + "data_plane_url": f"https://data.{base_domain}?tenant={tenant}", + } + + +async def deprovision_tenant(tenant_slug: str) -> None: + """ + Remove tenant directory from gitops. + ArgoCD prunes the vCluster and all tool Applications automatically. + """ + s = get_settings() + tenant = _slug_safe(tenant_slug) + local = s.GITOPS_LOCAL_PATH + _git_clone_or_pull(s.GITOPS_REPO_URL, local, s.GITOPS_BRANCH) + + tenant_dir = Path(local) / "data-plane" / "tenants" / tenant + if tenant_dir.exists(): + shutil.rmtree(tenant_dir) + _git_push(local, f"feat: deprovision tenant {tenant}", s.GITOPS_BRANCH) + log.info("Deprovisioned tenant %s", tenant) + + +async def update_tenant_tools( + tenant_slug: str, + tools_enabled: dict, + domain: str = None, + kc_client_secret: str = "", +) -> dict: + """ + Enable/disable tools for a tenant by adding/removing Application files. + ArgoCD prunes disabled tools automatically. + """ + s = get_settings() + base_domain = domain or s.DATA_PLANE_DOMAIN + kc_base = s.KEYCLOAK_URL + tenant = _slug_safe(tenant_slug) + local = s.GITOPS_LOCAL_PATH + + _git_clone_or_pull(s.GITOPS_REPO_URL, local, s.GITOPS_BRANCH) + + apps_dir = Path(local) / "data-plane" / "tenants" / tenant / "apps" + apps_dir.mkdir(parents=True, exist_ok=True) + + merged = {**DEFAULT_TOOLS, **tools_enabled} + + tool_urls = {} + for tool, gen in TOOL_GENERATORS.items(): + tool_dir = apps_dir / tool + if merged.get(tool): + tool_dir.mkdir(exist_ok=True) + (tool_dir / "application.yaml").write_text( + gen(tenant, base_domain, kc_base, kc_client_secret) + ) + if tool in INFRA_GENERATORS: + (tool_dir / "infra.yaml").write_text(INFRA_GENERATORS[tool](tenant)) + tool_urls[tool] = f"https://{tenant}-{tool}.{base_domain}" + else: + if tool_dir.exists(): + shutil.rmtree(tool_dir) + + _git_push(local, f"chore: update tools for tenant {tenant}", s.GITOPS_BRANCH) + return tool_urls diff --git a/app/services/trial_monitor.py b/app/services/trial_monitor.py new file mode 100644 index 0000000..e3f0f53 --- /dev/null +++ b/app/services/trial_monitor.py @@ -0,0 +1,96 @@ +""" +Trial Monitor — APScheduler job that runs every hour. +- Warns tenants 7 days before trial expiry +- Suspends vClusters when trial expires +- Deletes vClusters after a grace period (configurable) +""" +import logging +from datetime import datetime, timedelta +from sqlalchemy import select, update as sa_update + +from ..core.database import get_session_factory +from ..models.tenant import Tenant, PlatformSetting, ProvisioningJob +from ..services import provisioner + +log = logging.getLogger(__name__) + + +async def _get_setting(db, key: str, default: str) -> str: + row = await db.get(PlatformSetting, key) + return row.value if row else default + + +async def check_trials(): + """Called by APScheduler every hour.""" + factory = get_session_factory() + async with factory() as db: + now = datetime.utcnow() + warn_days_str = await _get_setting(db, "trial_warning_days", "7") + warn_days = int(warn_days_str) + + # ── Warn approaching expiry ─────────────────────────────── + warn_before = now + timedelta(days=warn_days) + approaching = (await db.execute( + select(Tenant) + .where( + Tenant.status == "trial", + Tenant.trial_ends_at <= warn_before, + Tenant.trial_ends_at > now, + ) + )).scalars().all() + + for t in approaching: + days_left = max(0, (t.trial_ends_at - now).days) + log.info("Trial warning: %s expires in %d days", t.slug, days_left) + # TODO: send email notification + + # ── Expire trials ───────────────────────────────────────── + expired = (await db.execute( + select(Tenant) + .where( + Tenant.status == "trial", + Tenant.trial_ends_at <= now, + Tenant.vcluster_status == "ready", + ) + )).scalars().all() + + for t in expired: + log.info("Trial expired: %s — suspending vCluster", t.slug) + await db.execute( + sa_update(Tenant) + .where(Tenant.id == t.id) + .values(status="suspended", vcluster_status="suspended") + ) + db.add(ProvisioningJob( + tenant_id=t.id, + job_type="suspend_vcluster", + payload={"slug": t.slug, "reason": "trial_expired"}, + )) + + await db.commit() + + # ── Delete suspended vclusters after grace period (7 days) ─ + grace_days = int(await _get_setting(db, "suspension_grace_days", "7")) + grace_before = now - timedelta(days=grace_days) + old_suspended = (await db.execute( + select(Tenant) + .where( + Tenant.status == "suspended", + Tenant.updated_at <= grace_before, + Tenant.vcluster_status == "suspended", + ) + )).scalars().all() + + for t in old_suspended: + log.info("Grace period ended: %s — deprovisioning", t.slug) + try: + await provisioner.deprovision_tenant(t.slug) + await db.execute( + sa_update(Tenant) + .where(Tenant.id == t.id) + .values(vcluster_status="deleted") + ) + except Exception as exc: + log.error("Failed to deprovision %s: %s", t.slug, exc) + + await db.commit() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..44f24be --- /dev/null +++ b/requirements.txt @@ -0,0 +1,17 @@ +fastapi>=0.111.0 +uvicorn[standard]>=0.30.0 +asyncpg>=0.29.0 +sqlalchemy[asyncio]>=2.0.30 +alembic>=1.13.0 +pydantic>=2.7.0 +pydantic-settings>=2.2.0 +python-keycloak>=3.9.0 +httpx>=0.27.0 +python-jose[cryptography]>=3.3.0 +passlib[bcrypt]>=1.7.4 +python-multipart>=0.0.9 +APScheduler>=3.10.4 +kubernetes>=29.0.0 +python-slugify>=8.0.4 +jinja2>=3.1.4 +emails>=0.6