Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions docs/PLAN-adapter-data-plane.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# NIL Read Data Plane — Architecture (revised, ground-up)

Grounded in a real failure: `crm.list_contacts` returned **590 KB for 41 contacts** (full Odoo
`res.partner` dumps) → context flood → agent fell back to `read_file`/`execute_code` (zero business
data). The fix is **not** an Odoo patch. The read plane was never given a contract; this gives it one.

## Hard constraint (the no-shared-import reality)

Adapters are **standalone HTTP shims** — the Odoo adapter depends only on `fastapi`/`pydantic`, it does
**not** import the `nilscript` package. So "universal" cannot mean a shared library. The universal
guarantee lives in three seams that already exist:

1. **Contract** — read-verb request/answer schemas + the `/nil/v0.1/describe` shape, in
`nilscript/nil/schemas`. The SSOT every adapter conforms to.
2. **Conformance proof** — the suite each adapter's `conformance/` dir must pass: projection, byte cap,
refuse-not-truncate, cursor stability, capability negotiation.
3. **Relay backstop** — the MCP relay in `nilscript`, the one chokepoint that sees every adapter's
output and re-enforces the byte cap regardless of adapter behavior.

Contract + conformance **force** all adapters to comply; the relay backstop **catches** any that don't.

## The invariant (one sentence)

> Business data reaches the agent only as **(a)** a bounded, projected, paginated page that fits a hard
> byte cap, **(b)** a server-side aggregate, or **(c)** an opaque **handle** to an artifact the agent
> processes with code — and any read that cannot be made to fit is **REFUSED, never truncated**.

A naive cap that truncates is a *bug*: it drops the row the agent needed (رغد at record #500,000) and
returns a confident wrong "not found". The cap therefore **refuses**; correctness comes from selection
(filter/count/aggregate) happening server-side, where the rows live.

## The verb network (every adapter implements; `<ns>.*`)

| Verb | Returns | Role |
|------|---------|------|
| `schema(target)` | fields `{name,type,filterable,sortable,returnable,is_key,sensitivity}` + cardinality `small\|large\|huge` + default projection + **capability profile** `{server_filter,server_sort,server_paginate,server_aggregate}` | how to query + how the edge degrades |
| `count(target,filter)` | `{count}` or `{count,approximate:true}` | first call for "how many / exists" |
| `search(target,filter,fields,sort,limit,cursor)` | lean page `{items:[{id,…proj}],total?,next_cursor}` | a few by criteria; keyset cursor stable for 1M; **refuses** over cap |
| `get(target,id,fields)` | one lean record | exact lookup by key |
| `aggregate(target,filter,group_by,metrics)` | `{groups:[{key,metrics}]}` | server-side rollup ("revenue by country") |
| `export(target,filter,fields,format)` | **handle** `{handle,format,rows,bytes,schema,expires_at}` | bulk read → artifact; **governed + audited** |

Filter is a typed predicate list `[{field,op,value}]`, ops: `eq,ne,gt,gte,lt,lte,in,contains,ilike,between`.
Writes unchanged: `create/update/delete` via `propose→commit→gate`, plus the bulk-write spine below.

## Capability negotiation + enforced fallback (the universal part)

`schema(target).capabilities` declares what the backend does server-side. The edge has a **defined
fallback per missing capability — never a silent fetch-all**:

| Capability absent | Edge behavior |
|---|---|
| `server_filter` (a field) | bounded pull (≤ cap rows) + edge-side filter; **refuse** if the unfiltered set exceeds the bound |
| `server_sort` | sort the bounded page only; declare "sort is page-local" or route to export |
| `server_paginate` | export-only for that target (no cursor promise) |
| `server_aggregate` | `aggregate` transparently does export→edge-side rollup, bounded; refuse if unbounded |

A weak backend degrades **honestly** (refuse / export), never by re-introducing the flood.

## Bulk read = governed action (closes the exfiltration hole)

"Reads are free" holds for `count/get/search/aggregate` (bounded, projected). It does **not** hold for
`export`:
- export above `BULK_THRESHOLD` (rows/bytes) or touching `sensitive` fields → requires
**propose→approve** (a read proposal, tier by size+sensitivity) and is **always audited**
(who, target, filter, rows, bytes).
- handles are **tenant/session-scoped**, access-controlled on fetch, **PII-at-rest**: sandbox-local,
TTL-deleted, never logged.

Bulk extraction becomes a deliberate, attributable act — not a silent side effect of a "free" read.

## Read-side authorization

`effective_fields = requested ∩ returnable ∩ grant_visible(target)`. Sensitive fields (salary, PII)
require an explicit grant; absent it they are dropped from the projection and the response **notes the
redaction** (never silently implies completeness). Row-level policy where backend/grant defines it.

## "All the data" — the completed decision tree

```
intent
├─ exists / how many → count (bounded, may be approximate)
├─ one by key → get
├─ a few by criteria → search(filter,fields,sort) + cursor
├─ rollup / group → aggregate(group_by,metrics) ← server-side
├─ deliver the dataset → export → handle → download/artifact (agent never reads rows)
├─ analyze row-level → export → sandbox → pandas/DuckDB (only small result in context)
└─ act on all of them → export id set → BATCHED propose→commit→gate (resumable, stoppable)
every branch: fits-or-REFUSED, never truncated; bulk export gated + audited + tenant-scoped
```

## Bulk-write spine (heavy ops: delete-many / update-many / email-all)

`export(filter)` → id-set handle → walk in batches of N:
- each batch: `propose→commit→gate` (existing governance, per batch),
- checkpoint after each batch (**resumable**; **idempotent** — committed batches never re-apply),
- **STOP honored between batches**,
- partial-failure policy `skip+report | stop | compensate` (declared per run; default skip+report + audit),
- bulk-delete reversibility: per-batch compensation tokens where the verb is COMPENSABLE.

Same spine powers "fetch big data", "delete many", "update many" — smart batches, multi-step, reliable.

## Failure taxonomy (refusals are answers, never filesystem fallbacks)

`RESULT_TOO_LARGE` (narrow filter / use export) · `FIELD_NOT_FILTERABLE` (which fields are) ·
`FIELD_NOT_SORTABLE` · `CAPABILITY_UNSUPPORTED` (export instead) · `HANDLE_EXPIRED` ·
`NOT_AUTHORIZED_FIELD` (grant needed) · `BULK_APPROVAL_REQUIRED`. Each is structured + actionable.

## Agent skill (the discipline)

count/schema first → get by key → search tight+projected → aggregate for rollups → export→code for
row-level over many → export→batch for actions. **ABSOLUTE:** business data lives only behind these
verbs; a large/awkward result is a `RESULT_TOO_LARGE` refusal to narrow or export — **never** a reason
to `read_file`/`execute_code` over the agent's own tree; **0 rows = "none found", never invented**.

## Build order (TDD, failing-test-first)

1. **Contract primitives** (`nilscript`, pure, no I/O): typed filter, projection, **byte-cap
enforcement that refuses-not-truncates**, data-handle type, capability profile. ← start here
2. **Read-verb schemas + `/describe` extension** + conformance assertions.
3. **Generic edge read plane** over the `SystemClient` protocol (projection, cap-refuse, capability
fallback, read authz, bulk-export gate) — proven against `FakeSystem`.
4. **Odoo mapping**: `search/get/count/aggregate/export` onto `search_read(fields=)` / `read_group` /
keyset cursor; verify name-search on 41 AND synthetic 1M stays under cap.
5. **Relay byte-cap backstop** in the MCP + `nil_search/count/get/aggregate/export`.
6. **Bulk-write spine** + agent skill replaces the stopgap guardrail.

## Acceptance

- "Find رغد عبدالله" works on 41 AND 1,000,000 — via `search(name ilike …)` or `export`+Python — no
flood, precise, deterministic. A cap hit → **refusal to narrow**, never a truncated wrong answer.
- "How many overdue?" → one `count`. "Revenue by country across all" → one `aggregate` (or export→code).
- Bulk export of all customers is **gated + audited + tenant-scoped**, not a free read.
- The agent NEVER falls back to `read_file`/`execute_code` on business data; every read bounded,
projected, paginated; same inputs → same outputs.
55 changes: 49 additions & 6 deletions src/nilscript/controlplane/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from nilscript.kernel.diagnostics import ValidationResult
from nilscript.kernel.executor import LocalExecutor
from nilscript.sdk.client import NilClient
from nilscript.sdk.idempotency import commit_idempotency_key
from nilscript.sdk.connect import handshake
from nilscript.sdk.grants import GrantRef
from nilscript.sdk.transport import NilTransport
Expand Down Expand Up @@ -236,18 +237,56 @@ def event_detail(event_id: int) -> Any:

# ── human-approval gate (Phase 2) ────────────────────────────────────────────────────────
@app.post("/proposals/{proposal_id}/await")
def await_approval(proposal_id: str) -> dict[str, Any]:
"""Called by the gate when it holds a proposal for owner approval."""
return store.await_approval(proposal_id)
async def await_approval(proposal_id: str, request: Request) -> dict[str, Any]:
"""Called by the gate when it holds a proposal for owner approval. The gate passes the verb +
human preview (a held proposal has no ledger event to enrich from) so the Decisions screen
shows WHAT is being approved."""
body: dict[str, Any] = {}
try:
body = await request.json()
except (ValueError, TypeError):
body = {}
return store.await_approval(
proposal_id, verb=body.get("verb"), tier=body.get("tier"), preview=body.get("preview"),
)

@app.get("/proposals/{proposal_id}/decision")
def get_decision(proposal_id: str) -> dict[str, Any]:
"""Polled by the gate before it commits a held proposal."""
return {"proposal_id": proposal_id, "status": store.decision(proposal_id)}

async def _execute_approved(proposal_id: str) -> dict[str, Any]:
"""The owner approved a HELD proposal → the CONTROL PLANE commits it against the active adapter.
This is the SSOT keystone: approval DRIVES execution (the agent never re-commits), so an approve
click actually performs the deletion/effect. Reuses the `_live_runner` client pattern; the
proposal detail (verb) rides on the approval row (threaded at hold-time), so no MCP memory is
needed — survives MCP restarts. Honest on failure (expired / already committed / unreachable)."""
appr = store.approval(proposal_id) or {}
active = store.any_active_adapter()
if not active or not active.get("url"):
return {"executed": False, "error": "no active adapter to commit against"}
ws = active.get("workspace", "") or ""
verb = appr.get("verb")
bearer = active.get("bearer", "") or ""
transport = NilTransport(base_url=active["url"], bearer_secret=bearer)
grant = GrantRef.from_secret(
grant_id="control-plane-approval", workspace=ws, secret=bearer or "cp",
scopes=frozenset({verb}) if verb else frozenset(),
)
client = NilClient(transport=transport, grant=grant)
try:
key = commit_idempotency_key(f"cp-approve:{proposal_id}", proposal_id)
outcome = await client.commit(proposal_id, idempotency_key=key)
return {"executed": True, "outcome": outcome.model_dump(mode="json", exclude_none=True)}
except Exception as exc: # noqa: BLE001 — adapter unreachable / proposal expired / already done
return {"executed": False, "error": f"{type(exc).__name__}: {exc}"}
finally:
await transport.aclose()

@app.post("/proposals/{proposal_id}/decision")
async def post_decision(proposal_id: str, request: Request) -> Any:
"""Owner approves/rejects from the UI."""
"""Owner approves/rejects from the UI. On APPROVE the control plane immediately executes the
held proposal against the active adapter (the approval drives the effect)."""
body = {}
try:
body = await request.json()
Expand All @@ -257,7 +296,10 @@ async def post_decision(proposal_id: str, request: Request) -> Any:
if status not in ("approved", "rejected"):
return JSONResponse({"error": "status must be 'approved' or 'rejected'"}, status_code=400)
ok = store.decide(proposal_id, status, actor=body.get("actor", "owner"), reason=body.get("reason", ""))
return {"ok": ok, "proposal_id": proposal_id, "status": store.decision(proposal_id)}
result: dict[str, Any] = {"ok": ok, "proposal_id": proposal_id, "status": store.decision(proposal_id)}
if ok and status == "approved":
result["execution"] = await _execute_approved(proposal_id)
return result

@app.get("/api/pending")
def pending() -> dict[str, Any]:
Expand Down Expand Up @@ -1192,7 +1234,8 @@ def index() -> str:
const r=await fetch('/api/automations');const {automations}=await r.json();
const wrap=document.getElementById('autoWrap'),box=document.getElementById('automations');
document.getElementById('autocount').textContent=automations.length;
wrap.style.display=automations.length?'block':'none';
wrap.style.display='block'; // always visible — the compose form + token live here, even with 0 automations
if(!automations.length){box.innerHTML='<div class=empty style=padding:22px><div class=big>No automations yet</div><div>Click “+ New cross-system automation” above to build one between two systems — or ask the agent via MCP.</div></div>';return;}
box.innerHTML=automations.map(a=>{
const nm=(a.name&&(a.name.en||a.name.ar))||a.automation_id;
const ps=a.plan_summary||{};
Expand Down
37 changes: 34 additions & 3 deletions src/nilscript/controlplane/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,19 +426,30 @@ def _enrich(self, proposal_id: str) -> dict[str, Any]:
preview = None
return {"verb": row["verb"], "tier": row["tier"], "preview": preview}

def await_approval(self, proposal_id: str) -> dict[str, Any]:
"""Register a proposal as awaiting human approval (idempotent — keeps an existing decision)."""
def await_approval(
self, proposal_id: str, *, verb: str | None = None, tier: str | None = None,
preview: Any = None,
) -> dict[str, Any]:
"""Register a proposal as awaiting human approval (idempotent — keeps an existing decision).

`verb`/`tier`/`preview` are passed by the gate at hold-time (a held proposal has no ledger
event yet, so `_enrich` finds nothing). They win over enrichment; `preview` (a dict) is stored
as JSON so the owner's Decisions screen can show exactly what the proposal does."""
with self._lock:
existing = self._conn.execute(
"SELECT status FROM approvals WHERE proposal_id = ?", (proposal_id,)
).fetchone()
if existing is not None:
return {"proposal_id": proposal_id, "status": existing["status"]}
meta = self._enrich(proposal_id)
preview_str = (
json.dumps(preview) if isinstance(preview, (dict, list))
else (preview if preview is not None else meta["preview"])
)
self._conn.execute(
"INSERT INTO approvals (proposal_id, status, verb, tier, preview, created_at) "
"VALUES (?, 'pending', ?, ?, ?, ?)",
(proposal_id, meta["verb"], meta["tier"], meta["preview"], _now()),
(proposal_id, verb or meta["verb"], tier or meta["tier"], preview_str, _now()),
)
self._conn.commit()
return {"proposal_id": proposal_id, "status": "pending"}
Expand Down Expand Up @@ -533,6 +544,26 @@ def active_adapter(self, workspace: str) -> dict[str, Any] | None:
).fetchone()
return dict(row) if row is not None else None

def any_active_adapter(self) -> dict[str, Any] | None:
"""The single most-recently-active adapter across the whole registry (WITH bearer). Used by the
approval executor: in a single-workspace deployment the held proposal was proposed on whatever
backend is active, so committing the approved proposal there is correct."""
with self._lock:
row = self._conn.execute(
f"SELECT {_ADAPTER_COLS} FROM adapters WHERE active = 1 ORDER BY updated_at DESC LIMIT 1"
).fetchone()
return dict(row) if row is not None else None

def approval(self, proposal_id: str) -> dict[str, Any] | None:
"""The full approval row (verb/tier/preview/status) — the executor reads the verb to scope the
control-plane grant when it commits the approved proposal."""
with self._lock:
row = self._conn.execute(
"SELECT proposal_id, status, verb, tier, preview FROM approvals WHERE proposal_id = ?",
(proposal_id,),
).fetchone()
return dict(row) if row is not None else None

def list_adapters(self, workspace: str) -> list[dict[str, Any]]:
"""All registered adapters for a workspace (active first, then most-recent). Carries the
bearer — the API layer redacts it for the public list endpoint."""
Expand Down
Loading
Loading