Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,19 @@ mcp = [
"nilscript[sdk]",
"mcp>=1.2",
]
# SaaS multi-tenancy: the JWT claim verifier (mcp/auth.py) + the encrypted per-tenant secret vault
# (nilscript/secrets/). The kernel core stays dependency-free; these load only with the [saas] extra.
saas = [
"pyjwt[crypto]>=2.8", # verify keycloak JWTs (workspace claim); [crypto] pulls RS/ES + JWKS support
"cryptography>=42", # Fernet — the secret vault's encrypt-at-rest
]
# Tenant-scoped DURABLE execution (durable_temporal.py) — optional Temporal worker integration.
temporal = ["temporalio>=1.20"]
dev = [
"nilscript[sdk]",
"nilscript[cli]",
"nilscript[saas]",
"nilscript[temporal]",
"mcp>=1.2", # the MCP server's unit tests import nilscript.mcp
"pytest",
"pytest-asyncio>=0.24",
Expand Down
56 changes: 52 additions & 4 deletions src/nilscript/controlplane/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,10 @@ async def ingest(
return {"ok": True, "new": new}

@app.get("/api/events")
def events(limit: int = 100) -> dict[str, Any]:
return {"events": store.recent(limit)}
def events(limit: int = 100, workspace: str | None = None) -> dict[str, Any]:
# SaaS: a workspace query param scopes the timeline to that tenant (the BFF passes the
# authenticated workspace); omitted = operator/global view.
return {"events": store.recent(limit, workspace=workspace)}

@app.get("/api/events/{event_id}")
def event_detail(event_id: int) -> Any:
Expand Down Expand Up @@ -302,8 +304,9 @@ async def post_decision(proposal_id: str, request: Request) -> Any:
return result

@app.get("/api/pending")
def pending() -> dict[str, Any]:
return {"pending": store.pending()}
def pending(workspace: str | None = None) -> dict[str, Any]:
# SaaS: scope held proposals to the tenant (joined to its events' workspace); omitted = global.
return {"pending": store.pending(workspace=workspace)}

@app.get("/api/adapters")
def adapters() -> dict[str, Any]:
Expand Down Expand Up @@ -369,6 +372,51 @@ async def register_adapter(request: Request, authorization: str | None = Header(
)
return {"ok": True, "adapter": _redact(rec)}

@app.post("/tenants/provision")
async def provision_tenant(request: Request, authorization: str | None = Header(default=None)) -> Any:
"""One-call onboarding for a company: save its secrets (encrypted) ONCE, then register +
activate its adapter — a new tenant is stood up in a single privileged call. Auth-protected
(registry token); never called from the browser (the OS BFF brokers it behind keycloak)."""
if not _registry_authed(authorization):
return JSONResponse({"error": "unauthorized"}, status_code=401)
try:
body = await request.json()
except (ValueError, TypeError):
return JSONResponse({"error": "bad json"}, status_code=400)
ws = body.get("workspace", "") or ""
if not ws:
return JSONResponse({"error": "workspace is required"}, status_code=400)
steps: dict[str, Any] = {}
secrets = body.get("secrets") or {}
if secrets:
try:
store.put_secrets(ws, secrets) # adapter creds + llm key, encrypted at rest
steps["secrets"] = sorted(secrets.keys())
except RuntimeError as exc: # vault disabled (no NIL_VAULT_KEY)
return JSONResponse({"error": str(exc)}, status_code=503)
adapter = body.get("adapter") or {}
if adapter.get("adapter_id") and adapter.get("url"):
store.register_adapter(
ws, adapter["adapter_id"], label=adapter.get("label", "") or "",
url=adapter["url"], bearer=adapter.get("bearer", "") or "",
system=adapter.get("system", "") or "",
)
store.activate_adapter(ws, adapter["adapter_id"])
steps["adapter"] = f"{adapter['adapter_id']} registered+activated"
return {"ok": True, "workspace": ws, "provisioned": steps}

@app.get("/tenants/{workspace}/secret/{name}")
def get_tenant_secret(workspace: str, name: str, authorization: str | None = Header(default=None)) -> Any:
"""Server-to-server secret fetch for the platform (e.g. the MCP needs a tenant's LLM key).
Registry-token-gated; returns the DECRYPTED value to the authenticated platform caller only —
never reachable from the browser, never logged."""
if not _registry_authed(authorization):
return JSONResponse({"error": "unauthorized"}, status_code=401)
value = store.get_secret(workspace, name)
if value is None:
return JSONResponse({"error": "no such secret"}, status_code=404)
return {"workspace": workspace, "name": name, "value": value}

@app.post("/adapters/{workspace}/{adapter_id}/activate")
def activate_adapter(workspace: str, adapter_id: str, authorization: str | None = Header(default=None)) -> Any:
"""Make this adapter the active backend for the workspace (auth-protected)."""
Expand Down
92 changes: 84 additions & 8 deletions src/nilscript/controlplane/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@
PRIMARY KEY (workspace, adapter_id)
);

-- Per-tenant secret vault: a company's adapter creds + LLM key, saved ONCE at onboarding. The value
-- is ENCRYPTED at rest (Fernet, master key from NIL_VAULT_KEY) — a leaked row is ciphertext, never
-- credentials. Keyed by workspace; the control-plane decrypts only to use, never echoes to the browser.
CREATE TABLE IF NOT EXISTS tenant_secrets (
workspace TEXT NOT NULL PRIMARY KEY,
ciphertext BLOB NOT NULL,
updated_at TEXT NOT NULL
);

-- Automation Registry (SSOT): one row per VERSION of one automation. Append-only — "editing" an
-- automation writes a new version and archives the prior one (superseded_by). `content_hash` is the
-- version lock (sha256 over the validated plan). See docs/PLAN-dynamic-automation-ssot.md.
Expand Down Expand Up @@ -155,6 +164,15 @@ def __init__(self, path: str | None = None) -> None:
self._conn = sqlite3.connect(self._path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._lock = threading.Lock()
# The secret vault is enabled only when a master key is configured; otherwise secret storage
# fails closed (a tenant's creds are never stored in plaintext as a fallback).
self._vault = None
try:
from nilscript.secrets import SecretVault

self._vault = SecretVault.from_env() # NIL_VAULT_KEY; raises if unset → stays None
except Exception: # noqa: BLE001 — no/invalid key ⇒ vault disabled, not crashed
self._vault = None
with self._lock:
self._conn.executescript(_DDL)
# Existing DBs (volume) predate event_id — add it idempotently.
Expand Down Expand Up @@ -201,12 +219,16 @@ def ingest(self, envelope: dict[str, Any], sequence: int | None, *, source: str
self._conn.commit()
return True

def recent(self, limit: int = 100) -> list[dict[str, Any]]:
def recent(self, limit: int = 100, workspace: str | None = None) -> list[dict[str, Any]]:
# SaaS isolation: when a workspace is given, return ONLY that tenant's events. Pass None only
# for the operator/global timeline.
where = "WHERE workspace = ? " if workspace is not None else ""
params: tuple[Any, ...] = (workspace, max(1, min(limit, 1000))) if workspace is not None else (max(1, min(limit, 1000)),)
with self._lock:
rows = self._conn.execute(
"SELECT id, received_at, workspace, sequence, grant_id, source, performative, "
"event, proposal, verb, tier, severity, envelope FROM events ORDER BY id DESC LIMIT ?",
(max(1, min(limit, 1000)),),
f"event, proposal, verb, tier, severity, envelope FROM events {where}ORDER BY id DESC LIMIT ?",
params,
).fetchall()
# An executed/refused event omits verb/tier and the human preview (those live on the
# proposal). Pull them from each row's matching `proposed` event in ONE query so the timeline
Expand Down Expand Up @@ -475,12 +497,21 @@ def decide(self, proposal_id: str, status: str, *, actor: str = "", reason: str
self._conn.commit()
return cur.rowcount > 0

def pending(self) -> list[dict[str, Any]]:
def pending(self, workspace: str | None = None) -> list[dict[str, Any]]:
# SaaS isolation: the approvals table has no workspace column (the gate's hold call sends none),
# so scope by JOINing each held proposal to its `proposed` event's workspace. A tenant sees only
# its own held proposals; None = operator/global view.
if workspace is None:
sql = ("SELECT proposal_id, verb, tier, preview, created_at FROM approvals "
"WHERE status = 'pending' ORDER BY created_at DESC")
params: tuple[Any, ...] = ()
else:
sql = ("SELECT a.proposal_id, a.verb, a.tier, a.preview, a.created_at FROM approvals a "
"WHERE a.status = 'pending' AND EXISTS (SELECT 1 FROM events e "
"WHERE e.proposal = a.proposal_id AND e.workspace = ?) ORDER BY a.created_at DESC")
params = (workspace,)
with self._lock:
rows = self._conn.execute(
"SELECT proposal_id, verb, tier, preview, created_at FROM approvals "
"WHERE status = 'pending' ORDER BY created_at DESC"
).fetchall()
rows = self._conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]

# ── active-adapter registry (multi-tenant routing) ───────────────────────────────────────
Expand All @@ -502,6 +533,51 @@ def register_adapter(
self._conn.commit()
return self._adapter(workspace, adapter_id) or {}

# ── per-tenant secret vault (encrypted at rest) ──────────────────────────────────────────────
@property
def vault_enabled(self) -> bool:
return self._vault is not None

def put_secrets(self, workspace: str, secrets: dict[str, Any]) -> None:
"""Save (replace) a workspace's secret bundle, ENCRYPTED. Raises if the vault is unconfigured —
the platform never stores a tenant's creds in plaintext as a fallback."""
if self._vault is None:
raise RuntimeError("secret vault disabled — set NIL_VAULT_KEY to store tenant secrets")
if not workspace:
raise ValueError("workspace is required")
blob = self._vault._fernet.encrypt( # encrypt here, persist ciphertext (vault store is the DB)
json.dumps(secrets, separators=(",", ":")).encode("utf-8")
)
with self._lock:
self._conn.execute(
"INSERT INTO tenant_secrets (workspace, ciphertext, updated_at) VALUES (?,?,?) "
"ON CONFLICT(workspace) DO UPDATE SET ciphertext=excluded.ciphertext, "
"updated_at=excluded.updated_at",
(workspace, blob, _now()),
)
self._conn.commit()

def get_secrets(self, workspace: str) -> dict[str, Any] | None:
"""Decrypt a workspace's secret bundle (by-tenant only), or None. Raises on tamper/wrong key."""
if self._vault is None:
return None
with self._lock:
row = self._conn.execute(
"SELECT ciphertext FROM tenant_secrets WHERE workspace = ?", (workspace,)
).fetchone()
if row is None:
return None
return json.loads(self._vault._fernet.decrypt(row["ciphertext"]).decode("utf-8"))

def get_secret(self, workspace: str, name: str) -> Any | None:
bundle = self.get_secrets(workspace)
return bundle.get(name) if bundle else None

def delete_secrets(self, workspace: str) -> None:
with self._lock:
self._conn.execute("DELETE FROM tenant_secrets WHERE workspace = ?", (workspace,))
self._conn.commit()

def activate_adapter(self, workspace: str, adapter_id: str) -> bool:
"""Make `adapter_id` the active backend for `workspace`, deactivating its siblings.
Returns False if no such adapter is registered (so the caller can 404)."""
Expand Down
66 changes: 66 additions & 0 deletions src/nilscript/durable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Tenant-scoped durable execution — the isolation LAYER Temporal plugs into (Phase 6).

Full Temporal worker/activity integration is a separate build; what multi-tenancy needs FIRST is that
durable work is isolated and fair per tenant. This module provides exactly that, with no Temporal
dependency, so it is testable today and the worker wires onto it later:

• `tenant_workflow_id` — deterministic, tenant-prefixed workflow ids → idempotent replay AND no
cross-tenant id collision (a re-delivered fire for tenant A can never touch tenant B's workflow).
• `tenant_namespace` — a Temporal NAMESPACE per tenant (the platform's hard isolation boundary;
a worker polling tenant A's namespace never sees B's tasks).
• `TenantDurablePolicy` — per-tenant rate + concurrency admission for durable activities (the Odoo-429
fairness lesson, durable edition): a tenant's bulk job is throttled to its own budget and cannot
starve the others, reusing the per-tenant rate limiter.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field

from nilscript.governance_quota import TenantRateLimiter

_SAFE = re.compile(r"[^a-zA-Z0-9._-]")


def _slug(value: str) -> str:
"""A Temporal-safe id segment (Temporal ids/namespaces disallow arbitrary chars)."""
return _SAFE.sub("-", value.strip()) or "_"


def tenant_workflow_id(tenant: str, kind: str, key: str) -> str:
"""Deterministic, tenant-scoped workflow id. Same (tenant, kind, key) → same id (idempotent
replay); different tenants → different ids (no cross-tenant collision)."""
if not tenant:
raise ValueError("tenant is required for a tenant-scoped workflow id")
return f"{_slug(tenant)}:{_slug(kind)}:{_slug(key)}"


def tenant_namespace(tenant: str, base: str = "nil") -> str:
"""The Temporal namespace for a tenant — the hard isolation boundary between companies."""
if not tenant:
raise ValueError("tenant is required for a tenant namespace")
return f"{_slug(base)}-{_slug(tenant)}"


@dataclass
class TenantDurablePolicy:
"""Per-tenant admission for durable activities: rate (token bucket) + optional concurrency cap.
`admit()` returns False when the tenant is over budget — the executor parks/retries that tenant's
work without affecting any other tenant."""

limiter: TenantRateLimiter
max_concurrent: int = 0 # 0 = unbounded
_running: dict[str, int] = field(default_factory=dict)

def admit(self, tenant: str, kind: str = "activity") -> bool:
if self.max_concurrent and self._running.get(tenant, 0) >= self.max_concurrent:
return False
if not self.limiter.allow(tenant, kind):
return False
self._running[tenant] = self._running.get(tenant, 0) + 1
return True

def release(self, tenant: str) -> None:
if self._running.get(tenant, 0) > 0:
self._running[tenant] -= 1
Loading
Loading