diff --git a/pyproject.toml b/pyproject.toml index 17bbe1a..164d969 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,9 +44,19 @@ mcp = [ "nilscript[sdk]", "mcp>=1.2", ] +# SaaS multi-tenancy: the JWT claim verifier (mcp/auth.py) + the encrypted per-tenant secret vault +# (nilscript/secrets/). The kernel core stays dependency-free; these load only with the [saas] extra. +saas = [ + "pyjwt[crypto]>=2.8", # verify keycloak JWTs (workspace claim); [crypto] pulls RS/ES + JWKS support + "cryptography>=42", # Fernet — the secret vault's encrypt-at-rest +] +# Tenant-scoped DURABLE execution (durable_temporal.py) — optional Temporal worker integration. +temporal = ["temporalio>=1.20"] dev = [ "nilscript[sdk]", "nilscript[cli]", + "nilscript[saas]", + "nilscript[temporal]", "mcp>=1.2", # the MCP server's unit tests import nilscript.mcp "pytest", "pytest-asyncio>=0.24", diff --git a/src/nilscript/controlplane/app.py b/src/nilscript/controlplane/app.py index 505e8c5..c76076b 100644 --- a/src/nilscript/controlplane/app.py +++ b/src/nilscript/controlplane/app.py @@ -223,8 +223,10 @@ async def ingest( return {"ok": True, "new": new} @app.get("/api/events") - def events(limit: int = 100) -> dict[str, Any]: - return {"events": store.recent(limit)} + def events(limit: int = 100, workspace: str | None = None) -> dict[str, Any]: + # SaaS: a workspace query param scopes the timeline to that tenant (the BFF passes the + # authenticated workspace); omitted = operator/global view. + return {"events": store.recent(limit, workspace=workspace)} @app.get("/api/events/{event_id}") def event_detail(event_id: int) -> Any: @@ -302,8 +304,9 @@ async def post_decision(proposal_id: str, request: Request) -> Any: return result @app.get("/api/pending") - def pending() -> dict[str, Any]: - return {"pending": store.pending()} + def pending(workspace: str | None = None) -> dict[str, Any]: + # SaaS: scope held proposals to the tenant (joined to its events' workspace); omitted = global. + return {"pending": store.pending(workspace=workspace)} @app.get("/api/adapters") def adapters() -> dict[str, Any]: @@ -369,6 +372,51 @@ async def register_adapter(request: Request, authorization: str | None = Header( ) return {"ok": True, "adapter": _redact(rec)} + @app.post("/tenants/provision") + async def provision_tenant(request: Request, authorization: str | None = Header(default=None)) -> Any: + """One-call onboarding for a company: save its secrets (encrypted) ONCE, then register + + activate its adapter — a new tenant is stood up in a single privileged call. Auth-protected + (registry token); never called from the browser (the OS BFF brokers it behind keycloak).""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + try: + body = await request.json() + except (ValueError, TypeError): + return JSONResponse({"error": "bad json"}, status_code=400) + ws = body.get("workspace", "") or "" + if not ws: + return JSONResponse({"error": "workspace is required"}, status_code=400) + steps: dict[str, Any] = {} + secrets = body.get("secrets") or {} + if secrets: + try: + store.put_secrets(ws, secrets) # adapter creds + llm key, encrypted at rest + steps["secrets"] = sorted(secrets.keys()) + except RuntimeError as exc: # vault disabled (no NIL_VAULT_KEY) + return JSONResponse({"error": str(exc)}, status_code=503) + adapter = body.get("adapter") or {} + if adapter.get("adapter_id") and adapter.get("url"): + store.register_adapter( + ws, adapter["adapter_id"], label=adapter.get("label", "") or "", + url=adapter["url"], bearer=adapter.get("bearer", "") or "", + system=adapter.get("system", "") or "", + ) + store.activate_adapter(ws, adapter["adapter_id"]) + steps["adapter"] = f"{adapter['adapter_id']} registered+activated" + return {"ok": True, "workspace": ws, "provisioned": steps} + + @app.get("/tenants/{workspace}/secret/{name}") + def get_tenant_secret(workspace: str, name: str, authorization: str | None = Header(default=None)) -> Any: + """Server-to-server secret fetch for the platform (e.g. the MCP needs a tenant's LLM key). + Registry-token-gated; returns the DECRYPTED value to the authenticated platform caller only — + never reachable from the browser, never logged.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + value = store.get_secret(workspace, name) + if value is None: + return JSONResponse({"error": "no such secret"}, status_code=404) + return {"workspace": workspace, "name": name, "value": value} + @app.post("/adapters/{workspace}/{adapter_id}/activate") def activate_adapter(workspace: str, adapter_id: str, authorization: str | None = Header(default=None)) -> Any: """Make this adapter the active backend for the workspace (auth-protected).""" diff --git a/src/nilscript/controlplane/store.py b/src/nilscript/controlplane/store.py index ab9a00e..6f504cb 100644 --- a/src/nilscript/controlplane/store.py +++ b/src/nilscript/controlplane/store.py @@ -59,6 +59,15 @@ PRIMARY KEY (workspace, adapter_id) ); +-- Per-tenant secret vault: a company's adapter creds + LLM key, saved ONCE at onboarding. The value +-- is ENCRYPTED at rest (Fernet, master key from NIL_VAULT_KEY) — a leaked row is ciphertext, never +-- credentials. Keyed by workspace; the control-plane decrypts only to use, never echoes to the browser. +CREATE TABLE IF NOT EXISTS tenant_secrets ( + workspace TEXT NOT NULL PRIMARY KEY, + ciphertext BLOB NOT NULL, + updated_at TEXT NOT NULL +); + -- Automation Registry (SSOT): one row per VERSION of one automation. Append-only — "editing" an -- automation writes a new version and archives the prior one (superseded_by). `content_hash` is the -- version lock (sha256 over the validated plan). See docs/PLAN-dynamic-automation-ssot.md. @@ -155,6 +164,15 @@ def __init__(self, path: str | None = None) -> None: self._conn = sqlite3.connect(self._path, check_same_thread=False) self._conn.row_factory = sqlite3.Row self._lock = threading.Lock() + # The secret vault is enabled only when a master key is configured; otherwise secret storage + # fails closed (a tenant's creds are never stored in plaintext as a fallback). + self._vault = None + try: + from nilscript.secrets import SecretVault + + self._vault = SecretVault.from_env() # NIL_VAULT_KEY; raises if unset → stays None + except Exception: # noqa: BLE001 — no/invalid key ⇒ vault disabled, not crashed + self._vault = None with self._lock: self._conn.executescript(_DDL) # Existing DBs (volume) predate event_id — add it idempotently. @@ -201,12 +219,16 @@ def ingest(self, envelope: dict[str, Any], sequence: int | None, *, source: str self._conn.commit() return True - def recent(self, limit: int = 100) -> list[dict[str, Any]]: + def recent(self, limit: int = 100, workspace: str | None = None) -> list[dict[str, Any]]: + # SaaS isolation: when a workspace is given, return ONLY that tenant's events. Pass None only + # for the operator/global timeline. + where = "WHERE workspace = ? " if workspace is not None else "" + params: tuple[Any, ...] = (workspace, max(1, min(limit, 1000))) if workspace is not None else (max(1, min(limit, 1000)),) with self._lock: rows = self._conn.execute( "SELECT id, received_at, workspace, sequence, grant_id, source, performative, " - "event, proposal, verb, tier, severity, envelope FROM events ORDER BY id DESC LIMIT ?", - (max(1, min(limit, 1000)),), + f"event, proposal, verb, tier, severity, envelope FROM events {where}ORDER BY id DESC LIMIT ?", + params, ).fetchall() # An executed/refused event omits verb/tier and the human preview (those live on the # proposal). Pull them from each row's matching `proposed` event in ONE query so the timeline @@ -475,12 +497,21 @@ def decide(self, proposal_id: str, status: str, *, actor: str = "", reason: str self._conn.commit() return cur.rowcount > 0 - def pending(self) -> list[dict[str, Any]]: + def pending(self, workspace: str | None = None) -> list[dict[str, Any]]: + # SaaS isolation: the approvals table has no workspace column (the gate's hold call sends none), + # so scope by JOINing each held proposal to its `proposed` event's workspace. A tenant sees only + # its own held proposals; None = operator/global view. + if workspace is None: + sql = ("SELECT proposal_id, verb, tier, preview, created_at FROM approvals " + "WHERE status = 'pending' ORDER BY created_at DESC") + params: tuple[Any, ...] = () + else: + sql = ("SELECT a.proposal_id, a.verb, a.tier, a.preview, a.created_at FROM approvals a " + "WHERE a.status = 'pending' AND EXISTS (SELECT 1 FROM events e " + "WHERE e.proposal = a.proposal_id AND e.workspace = ?) ORDER BY a.created_at DESC") + params = (workspace,) with self._lock: - rows = self._conn.execute( - "SELECT proposal_id, verb, tier, preview, created_at FROM approvals " - "WHERE status = 'pending' ORDER BY created_at DESC" - ).fetchall() + rows = self._conn.execute(sql, params).fetchall() return [dict(r) for r in rows] # ── active-adapter registry (multi-tenant routing) ─────────────────────────────────────── @@ -502,6 +533,51 @@ def register_adapter( self._conn.commit() return self._adapter(workspace, adapter_id) or {} + # ── per-tenant secret vault (encrypted at rest) ────────────────────────────────────────────── + @property + def vault_enabled(self) -> bool: + return self._vault is not None + + def put_secrets(self, workspace: str, secrets: dict[str, Any]) -> None: + """Save (replace) a workspace's secret bundle, ENCRYPTED. Raises if the vault is unconfigured — + the platform never stores a tenant's creds in plaintext as a fallback.""" + if self._vault is None: + raise RuntimeError("secret vault disabled — set NIL_VAULT_KEY to store tenant secrets") + if not workspace: + raise ValueError("workspace is required") + blob = self._vault._fernet.encrypt( # encrypt here, persist ciphertext (vault store is the DB) + json.dumps(secrets, separators=(",", ":")).encode("utf-8") + ) + with self._lock: + self._conn.execute( + "INSERT INTO tenant_secrets (workspace, ciphertext, updated_at) VALUES (?,?,?) " + "ON CONFLICT(workspace) DO UPDATE SET ciphertext=excluded.ciphertext, " + "updated_at=excluded.updated_at", + (workspace, blob, _now()), + ) + self._conn.commit() + + def get_secrets(self, workspace: str) -> dict[str, Any] | None: + """Decrypt a workspace's secret bundle (by-tenant only), or None. Raises on tamper/wrong key.""" + if self._vault is None: + return None + with self._lock: + row = self._conn.execute( + "SELECT ciphertext FROM tenant_secrets WHERE workspace = ?", (workspace,) + ).fetchone() + if row is None: + return None + return json.loads(self._vault._fernet.decrypt(row["ciphertext"]).decode("utf-8")) + + def get_secret(self, workspace: str, name: str) -> Any | None: + bundle = self.get_secrets(workspace) + return bundle.get(name) if bundle else None + + def delete_secrets(self, workspace: str) -> None: + with self._lock: + self._conn.execute("DELETE FROM tenant_secrets WHERE workspace = ?", (workspace,)) + self._conn.commit() + def activate_adapter(self, workspace: str, adapter_id: str) -> bool: """Make `adapter_id` the active backend for `workspace`, deactivating its siblings. Returns False if no such adapter is registered (so the caller can 404).""" diff --git a/src/nilscript/durable.py b/src/nilscript/durable.py new file mode 100644 index 0000000..a174afa --- /dev/null +++ b/src/nilscript/durable.py @@ -0,0 +1,66 @@ +"""Tenant-scoped durable execution — the isolation LAYER Temporal plugs into (Phase 6). + +Full Temporal worker/activity integration is a separate build; what multi-tenancy needs FIRST is that +durable work is isolated and fair per tenant. This module provides exactly that, with no Temporal +dependency, so it is testable today and the worker wires onto it later: + + • `tenant_workflow_id` — deterministic, tenant-prefixed workflow ids → idempotent replay AND no + cross-tenant id collision (a re-delivered fire for tenant A can never touch tenant B's workflow). + • `tenant_namespace` — a Temporal NAMESPACE per tenant (the platform's hard isolation boundary; + a worker polling tenant A's namespace never sees B's tasks). + • `TenantDurablePolicy` — per-tenant rate + concurrency admission for durable activities (the Odoo-429 + fairness lesson, durable edition): a tenant's bulk job is throttled to its own budget and cannot + starve the others, reusing the per-tenant rate limiter. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field + +from nilscript.governance_quota import TenantRateLimiter + +_SAFE = re.compile(r"[^a-zA-Z0-9._-]") + + +def _slug(value: str) -> str: + """A Temporal-safe id segment (Temporal ids/namespaces disallow arbitrary chars).""" + return _SAFE.sub("-", value.strip()) or "_" + + +def tenant_workflow_id(tenant: str, kind: str, key: str) -> str: + """Deterministic, tenant-scoped workflow id. Same (tenant, kind, key) → same id (idempotent + replay); different tenants → different ids (no cross-tenant collision).""" + if not tenant: + raise ValueError("tenant is required for a tenant-scoped workflow id") + return f"{_slug(tenant)}:{_slug(kind)}:{_slug(key)}" + + +def tenant_namespace(tenant: str, base: str = "nil") -> str: + """The Temporal namespace for a tenant — the hard isolation boundary between companies.""" + if not tenant: + raise ValueError("tenant is required for a tenant namespace") + return f"{_slug(base)}-{_slug(tenant)}" + + +@dataclass +class TenantDurablePolicy: + """Per-tenant admission for durable activities: rate (token bucket) + optional concurrency cap. + `admit()` returns False when the tenant is over budget — the executor parks/retries that tenant's + work without affecting any other tenant.""" + + limiter: TenantRateLimiter + max_concurrent: int = 0 # 0 = unbounded + _running: dict[str, int] = field(default_factory=dict) + + def admit(self, tenant: str, kind: str = "activity") -> bool: + if self.max_concurrent and self._running.get(tenant, 0) >= self.max_concurrent: + return False + if not self.limiter.allow(tenant, kind): + return False + self._running[tenant] = self._running.get(tenant, 0) + 1 + return True + + def release(self, tenant: str) -> None: + if self._running.get(tenant, 0) > 0: + self._running[tenant] -= 1 diff --git a/src/nilscript/durable_temporal.py b/src/nilscript/durable_temporal.py new file mode 100644 index 0000000..461a268 --- /dev/null +++ b/src/nilscript/durable_temporal.py @@ -0,0 +1,119 @@ +"""Temporal worker integration onto the tenant-scoped durable layer (durable.py) — Phase 6. + +OPTIONAL: temporalio is imported lazily, so the kernel runs without it. When a Temporal server is +available, heavy/bulk governed writes run as DURABLE workflows that survive crashes and retry with +backoff — the "429 lesson", durable edition — and stay isolated per tenant: + + • per-tenant Temporal NAMESPACE (tenant_namespace) — a worker for tenant A never sees B's tasks; + • tenant-scoped, deterministic WORKFLOW ID (tenant_workflow_id) — idempotent (a re-kicked job with + the same key replays the same workflow) and collision-free across tenants; + • the NIL gate (propose→commit) runs inside an ACTIVITY with a Temporal RetryPolicy, so a throttled/ + transient backend is retried durably instead of losing the work. + +The activity delegates to a registered executor (`register_executor`) — in deployment that calls the +NIL SDK against the tenant's adapter; in tests it's a simple callable, so the workflow is verifiable +end-to-end with temporalio's in-process time-skipping server (no external infra). +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import timedelta +from typing import Any, Awaitable, Callable + +from nilscript.durable import tenant_namespace, tenant_workflow_id + +try: + from temporalio import activity, workflow + from temporalio.common import RetryPolicy + + _HAS_TEMPORAL = True +except ImportError: # temporalio not installed — module imports cleanly, worker APIs raise on use + _HAS_TEMPORAL = False + + +def temporal_available() -> bool: + return _HAS_TEMPORAL + + +@dataclass +class GovernedWriteInput: + """One durable governed write: which tenant, the NIL verb + args, and an idempotency key (→ the + deterministic workflow id, so a redelivered kick never double-commits).""" + + tenant: str + verb: str + args: dict[str, Any] = field(default_factory=dict) + idempotency_key: str = "" + + +# The NIL executor the activity calls — set in the worker process via register_executor. Activities run +# OUTSIDE the workflow sandbox, so a module-level executor is the correct injection point. +_EXECUTOR: Callable[[GovernedWriteInput], Awaitable[dict[str, Any]]] | None = None + + +def register_executor(fn: Callable[[GovernedWriteInput], Awaitable[dict[str, Any]]]) -> None: + """Register the coroutine the durable activity runs (the real NIL propose→commit; a fake in tests).""" + global _EXECUTOR + _EXECUTOR = fn + + +if _HAS_TEMPORAL: + + @activity.defn(name="nil_governed_commit") + async def nil_governed_commit(inp: GovernedWriteInput) -> dict[str, Any]: + if _EXECUTOR is None: + raise RuntimeError("no NIL executor registered for the durable activity") + return await _EXECUTOR(inp) + + @workflow.defn(name="TenantGovernedWrite") + class TenantGovernedWriteWorkflow: + @workflow.run + async def run(self, inp: GovernedWriteInput) -> dict[str, Any]: + # The NIL gate runs in the activity with durable retry/backoff — a throttled (429) or + # transient backend is retried across attempts (and across worker crashes), not dropped. + return await workflow.execute_activity( + nil_governed_commit, inp, + start_to_close_timeout=timedelta(seconds=60), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=1), backoff_coefficient=2.0, + maximum_attempts=8, + ), + ) + + +def task_queue_for(tenant: str) -> str: + return f"nil-{tenant}" + + +async def start_governed_write(client: Any, inp: GovernedWriteInput, *, task_queue: str | None = None) -> Any: + """Kick a tenant-scoped durable governed-write workflow. The id is deterministic per (tenant, key), + so a re-kick is idempotent; the task queue is per-tenant.""" + if not _HAS_TEMPORAL: + raise RuntimeError("temporalio is not installed — durable workflows unavailable") + return await client.execute_workflow( + TenantGovernedWriteWorkflow.run, inp, + id=tenant_workflow_id(inp.tenant, "governed_write", inp.idempotency_key or inp.verb), + task_queue=task_queue or task_queue_for(inp.tenant), + ) + + +async def run_worker( + tenant: str, *, host: str = "localhost:7233", + executor: Callable[[GovernedWriteInput], Awaitable[dict[str, Any]]] | None = None, +) -> None: + """Start a Temporal worker on the TENANT's namespace + task queue for governed-write workflows. + One worker per tenant namespace is the isolation boundary.""" + if not _HAS_TEMPORAL: + raise RuntimeError("temporalio is not installed — cannot start a worker") + from temporalio.client import Client + from temporalio.worker import Worker + + if executor is not None: + register_executor(executor) + client = await Client.connect(host, namespace=tenant_namespace(tenant)) + worker = Worker( + client, task_queue=task_queue_for(tenant), + workflows=[TenantGovernedWriteWorkflow], activities=[nil_governed_commit], + ) + await worker.run() diff --git a/src/nilscript/governance_quota.py b/src/nilscript/governance_quota.py new file mode 100644 index 0000000..d3e28a6 --- /dev/null +++ b/src/nilscript/governance_quota.py @@ -0,0 +1,69 @@ +"""Per-tenant quotas + rate limiting — a noisy tenant cannot starve the others (the Odoo-429 lesson). + +A token-bucket per (tenant, kind) bounds request rate; a per-tenant daily counter bounds volume (e.g. +exports/writes). Pure + deterministic: the clock is injected, so it is unit-testable without sleeping and +resume-safe. SaaS fairness: tenant A hitting its limit returns a refusal; tenant B is untouched. +""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass, field + + +@dataclass +class _Bucket: + tokens: float + updated: float + + +@dataclass +class TenantRateLimiter: + """Token bucket per (tenant, kind). `rate` tokens/sec refill up to `burst`. `allow()` returns + False when a tenant has spent its bucket — caller turns that into a per-tenant throttle refusal.""" + + rate: float = 5.0 + burst: float = 20.0 + now: Callable[[], float] = lambda: 0.0 # inject a clock; tests pass a fake, prod time.monotonic + _buckets: dict[tuple[str, str], _Bucket] = field(default_factory=dict) + + def allow(self, tenant: str, kind: str = "default", cost: float = 1.0) -> bool: + key = (tenant, kind) + t = self.now() + b = self._buckets.get(key) + if b is None: + b = _Bucket(tokens=self.burst, updated=t) + self._buckets[key] = b + b.tokens = min(self.burst, b.tokens + (t - b.updated) * self.rate) + b.updated = t + if b.tokens >= cost: + b.tokens -= cost + return True + return False + + +@dataclass +class TenantQuota: + """Per-tenant volume caps per day-bucket (e.g. {'export': 100, 'write': 5000}). `charge()` returns + False once a tenant exhausts a kind for the current period; isolated per tenant.""" + + limits: dict[str, int] = field(default_factory=dict) + period: Callable[[], str] = lambda: "static" # inject a period key (e.g. 'YYYY-MM-DD'); tests fix it + _used: dict[tuple[str, str, str], int] = field(default_factory=dict) + + def charge(self, tenant: str, kind: str, amount: int = 1) -> bool: + limit = self.limits.get(kind) + if limit is None: + return True # unmetered kind + key = (tenant, kind, self.period()) + used = self._used.get(key, 0) + if used + amount > limit: + return False + self._used[key] = used + amount + return True + + def remaining(self, tenant: str, kind: str) -> int | None: + limit = self.limits.get(kind) + if limit is None: + return None + return max(0, limit - self._used.get((tenant, kind, self.period()), 0)) diff --git a/src/nilscript/mcp/auth.py b/src/nilscript/mcp/auth.py new file mode 100644 index 0000000..420fa1d --- /dev/null +++ b/src/nilscript/mcp/auth.py @@ -0,0 +1,124 @@ +"""Production `claim_resolver` for SaaS mode — derive the tenant from a VERIFIED JWT, never a header. + +`tenant.resolve_tenant(saas=True, claim_resolver=...)` takes a callable that returns the authenticated +workspace for a connection. This module builds that callable by validating the connection's bearer JWT +(signature + expiry + optional issuer/audience) and reading the `workspace` claim (configurable). A token +that is missing, expired, wrongly-signed, or carries no workspace claim resolves to None → the resolver +default-denies. Header values are never trusted for identity. + +Build it from env (`NIL_JWT_PUBLIC_KEY` / `NIL_JWT_HS_SECRET`, `NIL_JWT_ISSUER`, `NIL_JWT_AUDIENCE`, +`NIL_JWT_WORKSPACE_CLAIM`) or inject keys directly (keycloak JWKS wiring layers on top of this). +""" + +from __future__ import annotations + +import os +from collections.abc import Callable +from typing import Any + +import jwt + +_BEARER = "authorization" + + +def _bearer_token(ctx: Any) -> str | None: + rc = getattr(ctx, "request_context", None) + req = getattr(rc, "request", None) if rc is not None else None + headers = getattr(req, "headers", None) + raw = headers.get(_BEARER) if headers is not None and hasattr(headers, "get") else None + if not raw or not raw.lower().startswith("bearer "): + return None + return raw.split(" ", 1)[1].strip() or None + + +def make_jwt_claim_resolver( + *, + public_key: str | None = None, + hs_secret: str | None = None, + algorithms: list[str] | None = None, + issuer: str | None = None, + audience: str | None = None, + workspace_claim: str = "workspace", +) -> Callable[[Any], str | None]: + """Return a `claim_resolver(ctx) -> workspace|None` that VERIFIES the bearer JWT and extracts the + workspace claim. RS/ES keys via `public_key`; HS via `hs_secret`. Verification failure (bad sig, + expired, wrong issuer/audience) → None (default-deny upstream), never an exception that leaks through. + """ + key = public_key or hs_secret + if not key: + raise ValueError("make_jwt_claim_resolver needs public_key (RS/ES) or hs_secret (HS)") + algs = algorithms or (["RS256"] if public_key else ["HS256"]) + + def resolve(ctx: Any) -> str | None: + token = _bearer_token(ctx) + if not token: + return None + try: + claims = jwt.decode( + token, key, algorithms=algs, + issuer=issuer, audience=audience, + options={"require": ["exp"], "verify_aud": audience is not None, + "verify_iss": issuer is not None}, + ) + except jwt.PyJWTError: + return None # invalid/expired/forged → no identity → default-deny + ws = claims.get(workspace_claim) + return ws if isinstance(ws, str) and ws else None + + return resolve + + +def make_jwks_claim_resolver( + jwks_url: str, + *, + algorithms: list[str] | None = None, + issuer: str | None = None, + audience: str | None = None, + workspace_claim: str = "workspace", + jwk_client: Any = None, +) -> Callable[[Any], str | None]: + """Resolver backed by a JWKS endpoint (keycloak `…/protocol/openid-connect/certs`) — the production + path. PyJWKClient fetches + CACHES signing keys and selects by the token's `kid`, so key ROTATION is + handled automatically. Same fail-closed contract: any verification failure → None. `jwk_client` is + injectable for tests; the real one is built from `jwks_url`.""" + client = jwk_client if jwk_client is not None else jwt.PyJWKClient(jwks_url) + algs = algorithms or ["RS256"] + + def resolve(ctx: Any) -> str | None: + token = _bearer_token(ctx) + if not token: + return None + try: + key = client.get_signing_key_from_jwt(token).key + claims = jwt.decode( + token, key, algorithms=algs, issuer=issuer, audience=audience, + options={"require": ["exp"], "verify_aud": audience is not None, + "verify_iss": issuer is not None}, + ) + except Exception: # noqa: BLE001 — JWKS fetch / verify failure → default-deny, never leaks through + return None + ws = claims.get(workspace_claim) + return ws if isinstance(ws, str) and ws else None + + return resolve + + +def jwt_claim_resolver_from_env() -> Callable[[Any], str | None] | None: + """Build the resolver from env, or None if SaaS JWT auth is not configured (caller fails closed). + + Precedence: NIL_JWT_JWKS_URL (keycloak, production) > NIL_JWT_PUBLIC_KEY (static RS/ES) > + NIL_JWT_HS_SECRET (HS, dev). Issuer/audience/workspace-claim are shared env knobs. + """ + issuer = os.environ.get("NIL_JWT_ISSUER") or None + audience = os.environ.get("NIL_JWT_AUDIENCE") or None + claim = os.environ.get("NIL_JWT_WORKSPACE_CLAIM", "workspace") + jwks = os.environ.get("NIL_JWT_JWKS_URL") or None + if jwks: + return make_jwks_claim_resolver(jwks, issuer=issuer, audience=audience, workspace_claim=claim) + pub = os.environ.get("NIL_JWT_PUBLIC_KEY") or None + hs = os.environ.get("NIL_JWT_HS_SECRET") or None + if not pub and not hs: + return None + return make_jwt_claim_resolver( + public_key=pub, hs_secret=hs, issuer=issuer, audience=audience, workspace_claim=claim, + ) diff --git a/src/nilscript/mcp/server.py b/src/nilscript/mcp/server.py index d5db888..f99a65c 100644 --- a/src/nilscript/mcp/server.py +++ b/src/nilscript/mcp/server.py @@ -108,11 +108,15 @@ def __init__( allow_insecure: bool = False, gate: str = "two-step", registry: Any = None, + saas: bool = False, + claim_resolver: Any = None, ) -> None: self._default = default self._allow_insecure = allow_insecure self._gate = gate self._registry = registry + self._saas = saas + self._claim_resolver = claim_resolver self._cache: dict[str, NilTools] = {} def get(self, ctx: Any) -> NilTools: @@ -123,6 +127,7 @@ def get(self, ctx: Any) -> NilTools: tenant = resolve_tenant( ctx, default=self._default, multi_tenant=True, allow_insecure=self._allow_insecure, registry=self._registry, + saas=self._saas, claim_resolver=self._claim_resolver, ) tools = build_tools( adapter_url=tenant.adapter_url, @@ -586,9 +591,17 @@ def build_asgi_app( auth_token: str | None = None, multi_tenant: bool = False, allow_insecure: bool = False, + saas: bool | None = None, + claim_resolver: Any = None, ): # type: ignore[no-untyped-def] """Return a streamable-HTTP ASGI app for production hosting (uvicorn/gunicorn behind nilscript.org). + `saas=True` (or env `NIL_MCP_SAAS=1`) turns on full tenant ISOLATION: the tenant is the + authenticated identity (a `claim_resolver` reads the verified JWT `workspace` claim), no header can + override it, and routing goes to that tenant's registered active adapter. SaaS requires a + `claim_resolver` — without one it fails closed (a deployment must inject a JWT-verifying resolver), + so SaaS can never silently fall back to trusting a free header. + Single-tenant (default): the skeleton is discovered once at build time so per-verb tools reflect the mounted adapter, and all connections share that backend. @@ -618,8 +631,15 @@ def build_asgi_app( adapter_url=adapter_url, grant_id=grant_id, workspace=workspace, bearer=bearer, scopes=scopes, gate=gate, brain=brain, automation=automation, ) + if saas is None: + saas = os.environ.get("NIL_MCP_SAAS", "") in ("1", "true", "True") + if saas and claim_resolver is None: + raise ValueError( + "SaaS mode requires a claim_resolver (a JWT workspace-claim verifier) — refusing to start " + "without authenticated tenant identity (would otherwise trust a free header)" + ) provider: ToolsProvider | None = None - if multi_tenant: + if multi_tenant or saas: default = Tenant( adapter_url=adapter_url, bearer=bearer, grant_id=grant_id, workspace=workspace, scopes=scopes, @@ -628,7 +648,7 @@ def build_asgi_app( provider = TenantToolsProvider( default=default, allow_insecure=allow_insecure, gate=gate, - registry=make_registry_lookup(), + registry=make_registry_lookup(), saas=saas, claim_resolver=claim_resolver, ) server = build_server( tools, dynamic_verbs=verbs, tools_provider=provider, diff --git a/src/nilscript/mcp/tenant.py b/src/nilscript/mcp/tenant.py index 2b3565c..d2b3566 100644 --- a/src/nilscript/mcp/tenant.py +++ b/src/nilscript/mcp/tenant.py @@ -66,17 +66,45 @@ def resolve_tenant( multi_tenant: bool = False, allow_insecure: bool = False, registry: Callable[[str], Tenant | None] | None = None, + saas: bool = False, + claim_resolver: Callable[[Any], str | None] | None = None, ) -> Tenant: """Resolve the backend for this connection. Single-tenant (default): always return `default` (back-compat — the env-configured backend). - Multi-tenant: read the `X-NIL-*` headers; require an https adapter URL. Resolution precedence for - a multi-tenant connection: - 1. `X-NIL-Adapter-Url` header → true per-connection BYO (always wins); - 2. else, if the connection names a workspace (`X-NIL-Workspace`) and a `registry` lookup is - given, route to that workspace's *active* adapter (the control-plane registry); - 3. else `default` if one exists, else `TenantError`. + + SaaS (`saas=True`): the tenant is the AUTHENTICATED identity, never a free header. The + `claim_resolver` returns the workspace from the verified credential (JWT `workspace` claim / + keycloak realm); the `X-NIL-Workspace` header may NOT override it, a BYO `X-NIL-Adapter-Url` is + rejected (identity routes to the tenant's *registered active* adapter), and a missing claim is + default-deny. This is the isolation spine: a token for tenant A can only ever reach A's backend. + + Multi-tenant (self-hosted / dev, `multi_tenant=True` without `saas`): the connection brings its own + backend via the `X-NIL-*` headers (BYO), or routes by the `X-NIL-Workspace` header via the registry. + Header-trust is acceptable here because the deployment is single-owner; it is NOT in SaaS. """ + if saas: + if claim_resolver is None: + raise TenantError("SaaS mode requires an authenticated-claim resolver") + claimed = claim_resolver(ctx) + if not claimed: + raise TenantError("no authenticated workspace claim — default-deny") + headers = _headers(ctx) + header_ws = _get(headers, WORKSPACE_HEADER) + if header_ws and header_ws != claimed: + raise TenantError("the workspace header cannot override the authenticated tenant") + if _get(headers, ADAPTER_URL_HEADER): + raise TenantError( + "a BYO adapter-url is not allowed in SaaS mode; identity routes to the tenant's " + "registered active adapter" + ) + if registry is None: + raise TenantError("SaaS mode requires the control-plane registry") + resolved = registry(claimed) + if resolved is None: + raise TenantError(f"workspace '{claimed}' has no active adapter") + return resolved + if not multi_tenant: if default is None: raise TenantError("single-tenant mode requires a default backend (set NIL_ADAPTER_URL)") diff --git a/src/nilscript/secrets/__init__.py b/src/nilscript/secrets/__init__.py new file mode 100644 index 0000000..e51f003 --- /dev/null +++ b/src/nilscript/secrets/__init__.py @@ -0,0 +1,5 @@ +"""Per-tenant encrypted secret storage for SaaS multi-tenancy.""" + +from nilscript.secrets.vault import SecretVault, VaultError + +__all__ = ["SecretVault", "VaultError"] diff --git a/src/nilscript/secrets/vault.py b/src/nilscript/secrets/vault.py new file mode 100644 index 0000000..7386ab7 --- /dev/null +++ b/src/nilscript/secrets/vault.py @@ -0,0 +1,82 @@ +"""Per-tenant encrypted secret vault — the place a company's adapter creds + LLM key are saved ONCE. + +SaaS multi-tenancy needs each tenant's secrets (Odoo/Daftra creds, LLM API key) stored encrypted at +rest and readable only as that tenant. This module is the keystone: + + • Secrets are encrypted with a master key (Fernet / AES-128-CBC + HMAC) before they touch the store, + so a leaked store row is ciphertext, not credentials. + • Access is BY TENANT: `get(tenant)` only ever returns that tenant's blob — no cross-tenant read. + • The backing `store` is an injectable MutableMapping (a dict in tests, a DB/Redis/file in prod), so + the crypto + isolation logic is storage-agnostic and unit-testable with no infrastructure. + +The vault never logs or returns secrets except to the caller that asked for its own tenant; the master +key comes from the environment / a KMS, never from code. +""" + +from __future__ import annotations + +import json +import os +from collections.abc import MutableMapping +from typing import Any + +from cryptography.fernet import Fernet, InvalidToken + + +class VaultError(RuntimeError): + """The vault could not store/read a secret (missing master key, corrupt/forged ciphertext).""" + + +class SecretVault: + """Encrypt-at-rest per-tenant secrets. One process holds the master key; the store holds only + ciphertext keyed by tenant.""" + + def __init__(self, key: str | bytes, store: MutableMapping[str, bytes] | None = None) -> None: + try: + self._fernet = Fernet(key if isinstance(key, bytes) else key.encode("utf-8")) + except (ValueError, TypeError) as exc: + raise VaultError(f"invalid vault master key (expect a Fernet key): {exc}") from exc + self._store: MutableMapping[str, bytes] = store if store is not None else {} + + @classmethod + def from_env(cls, var: str = "NIL_VAULT_KEY", + store: MutableMapping[str, bytes] | None = None) -> SecretVault: + key = os.environ.get(var) + if not key: + raise VaultError(f"{var} is not set — refusing to run the secret vault without a master key") + return cls(key, store) + + @staticmethod + def generate_key() -> str: + """A fresh master key the operator stores in their secret manager (never committed).""" + return Fernet.generate_key().decode("utf-8") + + def put(self, tenant: str, secrets: dict[str, Any]) -> None: + """Save (replace) a tenant's secret bundle, encrypted. Called once at onboarding / when rotated.""" + if not tenant: + raise VaultError("a tenant is required to store secrets") + blob = self._fernet.encrypt(json.dumps(secrets, separators=(",", ":")).encode("utf-8")) + self._store[tenant] = blob + + def get(self, tenant: str) -> dict[str, Any] | None: + """Decrypt and return a tenant's secret bundle, or None if the tenant has none. Only ever the + caller-named tenant's blob — isolation is enforced by the key, not by hoping the caller is honest.""" + blob = self._store.get(tenant) + if blob is None: + return None + try: + return json.loads(self._fernet.decrypt(blob).decode("utf-8")) + except (InvalidToken, ValueError) as exc: # forged / wrong-key / corrupt ciphertext + raise VaultError(f"could not decrypt secrets for tenant '{tenant}' (wrong key or tampered)") from exc + + def get_secret(self, tenant: str, name: str) -> Any | None: + """One named secret for a tenant (e.g. 'llm_api_key', 'adapter_bearer'), or None.""" + bundle = self.get(tenant) + return bundle.get(name) if bundle else None + + def has(self, tenant: str) -> bool: + return tenant in self._store + + def delete(self, tenant: str) -> None: + """Off-board: drop a tenant's secrets entirely.""" + self._store.pop(tenant, None) diff --git a/tests/test_cp_provisioning.py b/tests/test_cp_provisioning.py new file mode 100644 index 0000000..a49a36b --- /dev/null +++ b/tests/test_cp_provisioning.py @@ -0,0 +1,106 @@ +"""Control-plane tenant onboarding: one-call provision (encrypted secrets + adapter) + secret read.""" + +from __future__ import annotations + +import os + +import pytest +from fastapi.testclient import TestClient + +from nilscript.secrets import SecretVault + +TOKEN = "reg-tok" + + +@pytest.fixture() +def client(tmp_path, monkeypatch): + monkeypatch.setenv("NIL_VAULT_KEY", SecretVault.generate_key()) + from nilscript.controlplane.app import create_app + from nilscript.controlplane.store import EventStore + + store = EventStore(str(tmp_path / "cp.db")) + app = create_app(store, registry_token=TOKEN) + return TestClient(app, raise_server_exceptions=False), store + + +_AUTH = {"Authorization": f"Bearer {TOKEN}"} + + +def _provision(c, ws, **body): + return c.post("/tenants/provision", json={"workspace": ws, **body}, headers=_AUTH) + + +def test_one_call_provision_stores_secrets_and_activates_adapter(client) -> None: + c, store = client + r = _provision( + c, "ws_acme", + secrets={"adapter_bearer": "sek", "llm_api_key": "sk-acme"}, + adapter={"adapter_id": "odoo", "url": "https://acme.odoo", "system": "odoo_crm"}, + ) + assert r.status_code == 200, r.text + body = r.json() + assert body["ok"] and "llm_api_key" in body["provisioned"]["secrets"] + assert "odoo" in body["provisioned"]["adapter"] + active = store.active_adapter("ws_acme") + assert active and active["adapter_id"] == "odoo" + + +def test_secret_read_is_token_gated_and_returns_value(client) -> None: + c, _ = client + _provision(c, "ws_acme", secrets={"llm_api_key": "sk-acme"}) + assert c.get("/tenants/ws_acme/secret/llm_api_key").status_code == 401 # no token + r = c.get("/tenants/ws_acme/secret/llm_api_key", headers=_AUTH) + assert r.status_code == 200 and r.json()["value"] == "sk-acme" + + +def test_secrets_are_encrypted_at_rest(client) -> None: + c, store = client + _provision(c, "ws_acme", secrets={"llm_api_key": "sk-PLAINTEXT-LEAK"}) + row = store._conn.execute( + "SELECT ciphertext FROM tenant_secrets WHERE workspace='ws_acme'" + ).fetchone() + assert b"sk-PLAINTEXT-LEAK" not in row["ciphertext"] # ciphertext on disk, not the key + + +def test_tenants_are_isolated(client) -> None: + c, store = client + _provision(c, "ws_a", secrets={"llm_api_key": "key-A"}) + _provision(c, "ws_b", secrets={"llm_api_key": "key-B"}) + assert store.get_secret("ws_a", "llm_api_key") == "key-A" + assert store.get_secret("ws_b", "llm_api_key") == "key-B" + assert store.get_secrets("ws_ghost") is None + + +def test_provision_requires_auth_and_workspace(client) -> None: + c, _ = client + assert c.post("/tenants/provision", json={"workspace": "x"}).status_code == 401 + assert c.post("/tenants/provision", json={}, headers=_AUTH).status_code == 400 + + +# ── surface-scoping: tenant A can never see tenant B's events / pending ─────────────────────────── +def _event(ws, seq, proposal, event="executed"): + return {"nil": "0.1", "id": f"{ws}-{seq}", "workspace": ws, "performative": "EVENT", + "body": {"event": event, "proposal": proposal, "verb": "crm.delete_contact", "tier": "HIGH"}} + + +def test_events_are_workspace_scoped(client) -> None: + c, store = client + store.ingest(_event("ws_a", 1, "pA"), 1) + store.ingest(_event("ws_b", 1, "pB"), 1) + a = c.get("/api/events", params={"workspace": "ws_a"}).json()["events"] + assert a and all(e["workspace"] == "ws_a" for e in a) # only A's + assert not any(e["workspace"] == "ws_b" for e in a) # never B's + glob = c.get("/api/events").json()["events"] # operator view sees both + assert {e["workspace"] for e in glob} >= {"ws_a", "ws_b"} + + +def test_pending_is_workspace_scoped(client) -> None: + c, store = client + # a held proposal for each tenant, linked by its proposed event's workspace + store.ingest({**_event("ws_a", 2, "pA2", event="proposed")}, 2) + store.ingest({**_event("ws_b", 2, "pB2", event="proposed")}, 2) + store.await_approval("pA2", verb="crm.delete_contact", tier="HIGH", preview="del A") + store.await_approval("pB2", verb="crm.delete_contact", tier="HIGH", preview="del B") + a = c.get("/api/pending", params={"workspace": "ws_a"}).json()["pending"] + ids = {p["proposal_id"] for p in a} + assert "pA2" in ids and "pB2" not in ids # A sees only its own held proposal diff --git a/tests/test_durable_temporal.py b/tests/test_durable_temporal.py new file mode 100644 index 0000000..972590e --- /dev/null +++ b/tests/test_durable_temporal.py @@ -0,0 +1,53 @@ +"""Temporal worker integration (Phase 6) — verified end-to-end with temporalio's in-process +time-skipping test server (no external Temporal needed). Skips cleanly if temporalio is absent.""" + +from __future__ import annotations + +import pytest + +temporalio = pytest.importorskip("temporalio") + +from temporalio.testing import WorkflowEnvironment # noqa: E402 +from temporalio.worker import Worker # noqa: E402 + +from nilscript.durable import tenant_workflow_id # noqa: E402 +from nilscript.durable_temporal import ( # noqa: E402 + GovernedWriteInput, + TenantGovernedWriteWorkflow, + nil_governed_commit, + register_executor, + task_queue_for, +) + + +@pytest.mark.asyncio +async def test_durable_governed_write_retries_then_commits() -> None: + """A flaky backend (throttled twice) is retried DURABLY by Temporal, then commits — proving the + 429-fairness/durability contract, scoped to the tenant's task queue + deterministic workflow id.""" + attempts = {"n": 0} + + async def flaky_executor(inp: GovernedWriteInput) -> dict: + attempts["n"] += 1 + if attempts["n"] < 3: + raise RuntimeError("429 throttled by backend") + return {"ok": True, "attempt": attempts["n"], "tenant": inp.tenant, "verb": inp.verb} + + register_executor(flaky_executor) + inp = GovernedWriteInput(tenant="ws_a", verb="account.create_invoice", idempotency_key="k1") + + async with await WorkflowEnvironment.start_time_skipping() as env: + async with Worker(env.client, task_queue=task_queue_for("ws_a"), + workflows=[TenantGovernedWriteWorkflow], activities=[nil_governed_commit]): + result = await env.client.execute_workflow( + TenantGovernedWriteWorkflow.run, inp, + id=tenant_workflow_id("ws_a", "governed_write", "k1"), + task_queue=task_queue_for("ws_a"), + ) + assert result == {"ok": True, "attempt": 3, "tenant": "ws_a", "verb": "account.create_invoice"} + + +@pytest.mark.asyncio +async def test_workflow_id_is_tenant_scoped_and_idempotent() -> None: + a = tenant_workflow_id("ws_a", "governed_write", "k1") + b = tenant_workflow_id("ws_b", "governed_write", "k1") + assert a != b and a == tenant_workflow_id("ws_a", "governed_write", "k1") diff --git a/tests/test_mcp_tenant.py b/tests/test_mcp_tenant.py index 932ba12..5d020a8 100644 --- a/tests/test_mcp_tenant.py +++ b/tests/test_mcp_tenant.py @@ -139,3 +139,59 @@ def reg(ws): ctx = _FakeCtx({}) # no workspace, no adapter url assert resolve_tenant(ctx, default=DEFAULT, multi_tenant=True, registry=reg) is DEFAULT assert calls == [] # no workspace → registry never queried + + +# ── SaaS identity spine: tenant from authenticated claim, header cannot override, default-deny ────── +_ADAPTERS = { + "ws_a": Tenant(adapter_url="https://a-adapter", bearer="a-sec", workspace="ws_a"), + "ws_b": Tenant(adapter_url="https://b-adapter", bearer="b-sec", workspace="ws_b"), +} + + +def _saas_reg(ws: str): + return _ADAPTERS.get(ws) + + +def _claim(ws: str | None): + return lambda ctx: ws + + +def test_saas_routes_to_the_authenticated_tenants_adapter() -> None: + t = resolve_tenant(_FakeCtx({}), saas=True, claim_resolver=_claim("ws_a"), registry=_saas_reg) + assert t.adapter_url == "https://a-adapter" and t.workspace == "ws_a" + + +def test_saas_two_tenants_are_isolated() -> None: + a = resolve_tenant(_FakeCtx({}), saas=True, claim_resolver=_claim("ws_a"), registry=_saas_reg) + b = resolve_tenant(_FakeCtx({}), saas=True, claim_resolver=_claim("ws_b"), registry=_saas_reg) + assert a.adapter_url != b.adapter_url # A can never reach B's backend + + +def test_saas_header_cannot_override_authenticated_tenant() -> None: + # token says ws_a; attacker sends X-NIL-Workspace: ws_b → refused, never routed to B + with pytest.raises(TenantError): + resolve_tenant(_FakeCtx({"x-nil-workspace": "ws_b"}), saas=True, + claim_resolver=_claim("ws_a"), registry=_saas_reg) + + +def test_saas_byo_adapter_url_is_rejected() -> None: + with pytest.raises(TenantError): + resolve_tenant(_FakeCtx({ADAPTER_URL_HEADER: "https://attacker"}), saas=True, + claim_resolver=_claim("ws_a"), registry=_saas_reg) + + +def test_saas_missing_claim_is_default_deny() -> None: + with pytest.raises(TenantError): + resolve_tenant(_FakeCtx({}), saas=True, claim_resolver=_claim(None), registry=_saas_reg) + + +def test_saas_unknown_workspace_has_no_adapter() -> None: + with pytest.raises(TenantError): + resolve_tenant(_FakeCtx({}), saas=True, claim_resolver=_claim("ws_ghost"), registry=_saas_reg) + + +def test_saas_matching_header_is_allowed() -> None: + # header equal to the claim is fine (clients may echo it); only a MISMATCH is refused + t = resolve_tenant(_FakeCtx({"x-nil-workspace": "ws_a"}), saas=True, + claim_resolver=_claim("ws_a"), registry=_saas_reg) + assert t.workspace == "ws_a" diff --git a/tests/test_saas_tenant_management.py b/tests/test_saas_tenant_management.py new file mode 100644 index 0000000..c1ca822 --- /dev/null +++ b/tests/test_saas_tenant_management.py @@ -0,0 +1,163 @@ +"""SaaS tenant management: encrypted secret vault, JWT claim verifier, per-tenant quotas/limits.""" + +from __future__ import annotations + +import jwt +import pytest + +from nilscript.governance_quota import TenantQuota, TenantRateLimiter +from nilscript.mcp.auth import make_jwt_claim_resolver +from nilscript.secrets.vault import SecretVault, VaultError + + +class _FakeHeaders: + def __init__(self, m): self._m = {k.lower(): v for k, v in m.items()} + def get(self, n): return self._m.get(n.lower()) + + +class _FakeCtx: + def __init__(self, headers=None): + req = type("Req", (), {"headers": _FakeHeaders(headers or {})})() + self.request_context = type("RC", (), {"request": req})() + + +# ── secret vault ─────────────────────────────────────────────────────────────────────────────── +def _vault(store=None): + return SecretVault(SecretVault.generate_key(), store) + + +def test_vault_put_get_roundtrip() -> None: + v = _vault() + v.put("ws_a", {"adapter_bearer": "sek", "llm_api_key": "sk-123"}) + assert v.get("ws_a") == {"adapter_bearer": "sek", "llm_api_key": "sk-123"} + assert v.get_secret("ws_a", "llm_api_key") == "sk-123" + + +def test_vault_is_encrypted_at_rest() -> None: + store: dict[str, bytes] = {} + v = SecretVault(SecretVault.generate_key(), store) + v.put("ws_a", {"llm_api_key": "sk-SECRET-VALUE"}) + raw = store["ws_a"] + assert b"sk-SECRET-VALUE" not in raw and b"llm_api_key" not in raw # ciphertext, not plaintext + + +def test_vault_tenants_are_isolated() -> None: + v = _vault() + v.put("ws_a", {"k": "a"}); v.put("ws_b", {"k": "b"}) + assert v.get("ws_a") == {"k": "a"} and v.get("ws_b") == {"k": "b"} + assert v.get("ws_other") is None + + +def test_vault_wrong_key_cannot_decrypt() -> None: + store: dict[str, bytes] = {} + SecretVault(SecretVault.generate_key(), store).put("ws_a", {"k": "v"}) + other = SecretVault(SecretVault.generate_key(), store) # different master key, same store + with pytest.raises(VaultError): + other.get("ws_a") + + +def test_vault_delete_offboards() -> None: + v = _vault(); v.put("ws_a", {"k": "v"}) + v.delete("ws_a") + assert v.get("ws_a") is None and not v.has("ws_a") + + +# ── JWT claim verifier (production claim_resolver) ─────────────────────────────────────────────── +_SECRET = "test-hs-secret" + + +def _token(claims, secret=_SECRET): + return jwt.encode({"exp": 9999999999, **claims}, secret, algorithm="HS256") + + +def _ctx_with(token): + return _FakeCtx({"authorization": f"Bearer {token}"}) + + +def test_jwt_resolver_reads_verified_workspace_claim() -> None: + r = make_jwt_claim_resolver(hs_secret=_SECRET) + assert r(_ctx_with(_token({"workspace": "ws_a"}))) == "ws_a" + + +def test_jwt_resolver_rejects_forged_signature() -> None: + r = make_jwt_claim_resolver(hs_secret=_SECRET) + assert r(_ctx_with(_token({"workspace": "ws_a"}, secret="attacker-secret"))) is None + + +def test_jwt_resolver_rejects_expired_token() -> None: + r = make_jwt_claim_resolver(hs_secret=_SECRET) + expired = jwt.encode({"exp": 1, "workspace": "ws_a"}, _SECRET, algorithm="HS256") + assert r(_ctx_with(expired)) is None + + +def test_jwt_resolver_no_token_is_none() -> None: + r = make_jwt_claim_resolver(hs_secret=_SECRET) + assert r(_FakeCtx({})) is None + + +def test_jwt_resolver_no_workspace_claim_is_none() -> None: + r = make_jwt_claim_resolver(hs_secret=_SECRET) + assert r(_ctx_with(_token({"sub": "u1"}))) is None + + +# ── per-tenant rate limit + quota ──────────────────────────────────────────────────────────────── +def test_rate_limiter_throttles_a_tenant_without_starving_others() -> None: + clock = {"t": 0.0} + rl = TenantRateLimiter(rate=1.0, burst=3.0, now=lambda: clock["t"]) + assert [rl.allow("ws_a") for _ in range(3)] == [True, True, True] + assert rl.allow("ws_a") is False # A spent its burst + assert rl.allow("ws_b") is True # B is unaffected (isolation) + clock["t"] = 2.0 # 2s → +2 tokens + assert rl.allow("ws_a") is True + + +def test_quota_caps_volume_per_tenant() -> None: + q = TenantQuota(limits={"export": 2}, period=lambda: "2026-06-27") + assert q.charge("ws_a", "export") and q.charge("ws_a", "export") + assert q.charge("ws_a", "export") is False # A hit its export cap + assert q.charge("ws_b", "export") is True # B still has quota + assert q.charge("ws_a", "write") is True # unmetered kind passes + assert q.remaining("ws_b", "export") == 1 + + +# ── JWKS resolver (production keycloak path) ────────────────────────────────────────────────────── +def test_jwks_resolver_verifies_with_rotating_keys() -> None: + from cryptography.hazmat.primitives.asymmetric import rsa + from nilscript.mcp.auth import make_jwks_claim_resolver + + priv = rsa.generate_private_key(public_exponent=65537, key_size=2048) + pub = priv.public_key() + + class _FakeJWK: + key = pub + + class _FakeClient: + def get_signing_key_from_jwt(self, token): return _FakeJWK() + + token = jwt.encode({"exp": 9999999999, "workspace": "ws_a"}, priv, algorithm="RS256") + r = make_jwks_claim_resolver("https://kc/certs", jwk_client=_FakeClient()) + assert r(_ctx_with(token)) == "ws_a" + # a token signed by a DIFFERENT key (not in the JWKS) → verification fails → None + other = rsa.generate_private_key(public_exponent=65537, key_size=2048) + forged = jwt.encode({"exp": 9999999999, "workspace": "ws_a"}, other, algorithm="RS256") + assert r(_ctx_with(forged)) is None + + +# ── tenant-scoped durable execution (Temporal-ready) ───────────────────────────────────────────── +def test_durable_ids_and_namespace_are_tenant_isolated() -> None: + from nilscript.durable import tenant_namespace, tenant_workflow_id + + a = tenant_workflow_id("ws_a", "bulk_delete", "job1") + b = tenant_workflow_id("ws_b", "bulk_delete", "job1") + assert a != b and a == tenant_workflow_id("ws_a", "bulk_delete", "job1") # isolated + deterministic + assert tenant_namespace("ws_a") != tenant_namespace("ws_b") + + +def test_durable_policy_throttles_per_tenant() -> None: + from nilscript.durable import TenantDurablePolicy + + clock = {"t": 0.0} + pol = TenantDurablePolicy(TenantRateLimiter(rate=1.0, burst=2.0, now=lambda: clock["t"])) + assert pol.admit("ws_a") and pol.admit("ws_a") + assert pol.admit("ws_a") is False # A over budget + assert pol.admit("ws_b") is True # B unaffected