From 7e52abc85878b30bf7efb4199f6426cb870b69ec Mon Sep 17 00:00:00 2001 From: AI Bot Date: Wed, 24 Jun 2026 16:27:23 +0300 Subject: [PATCH 1/3] feat(automation): conversation-authored automation registry (SSOT) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dynamic automation by talking to the agent: the agent drafts a plan, the deterministic validator (V1–V6, against the live adapter skeleton) lowers-or- rejects it, it is content-hashed and versioned in an SSOT, a human approves it, it is armed, and it fires on manual / event / cron|interval triggers — and can span multiple backends via author-declared cross-system composition. Additive layer in src/nilscript/automation/ (models, authoring, skeleton, dispatch, scheduler, compose) + automations/automation_runs tables and ~16 control-plane endpoints; the kernel/executor is untouched. Includes agent-facing MCP tools (nil_automation_draft/register/approve/run/list), a read+control Automations dashboard panel (control token-gated, browser-supplied operator token), and a live smoke-test script. ~57 new tests; full suite green. Honestly out of scope: single-stage dual-backend authority (atomic 2PC) and inferred ontology mapping (handoffs are author-declared); full cron via croniter when the durable Temporal runtime lands. --- scripts/smoke_automation.py | 105 +++++ src/nilscript/automation/__init__.py | 62 +++ src/nilscript/automation/authoring.py | 126 ++++++ src/nilscript/automation/compose.py | 183 +++++++++ src/nilscript/automation/dispatch.py | 152 ++++++++ src/nilscript/automation/models.py | 142 +++++++ src/nilscript/automation/scheduler.py | 73 ++++ src/nilscript/automation/skeleton.py | 39 ++ src/nilscript/automation/triggers.py | 109 ++++++ src/nilscript/controlplane/app.py | 530 +++++++++++++++++++++++++- src/nilscript/controlplane/store.py | 253 ++++++++++++ src/nilscript/mcp/automation_tools.py | 85 +++++ src/nilscript/mcp/server.py | 62 ++- tests/test_automation.py | 243 ++++++++++++ tests/test_automation_api.py | 162 ++++++++ tests/test_automation_compose.py | 182 +++++++++ tests/test_automation_compose_api.py | 108 ++++++ tests/test_automation_dispatch.py | 187 +++++++++ tests/test_automation_scheduler.py | 209 ++++++++++ tests/test_automation_ui.py | 61 +++ tests/test_mcp_automation_tools.py | 80 ++++ 21 files changed, 3151 insertions(+), 2 deletions(-) create mode 100644 scripts/smoke_automation.py create mode 100644 src/nilscript/automation/__init__.py create mode 100644 src/nilscript/automation/authoring.py create mode 100644 src/nilscript/automation/compose.py create mode 100644 src/nilscript/automation/dispatch.py create mode 100644 src/nilscript/automation/models.py create mode 100644 src/nilscript/automation/scheduler.py create mode 100644 src/nilscript/automation/skeleton.py create mode 100644 src/nilscript/automation/triggers.py create mode 100644 src/nilscript/mcp/automation_tools.py create mode 100644 tests/test_automation.py create mode 100644 tests/test_automation_api.py create mode 100644 tests/test_automation_compose.py create mode 100644 tests/test_automation_compose_api.py create mode 100644 tests/test_automation_dispatch.py create mode 100644 tests/test_automation_scheduler.py create mode 100644 tests/test_automation_ui.py create mode 100644 tests/test_mcp_automation_tools.py diff --git a/scripts/smoke_automation.py b/scripts/smoke_automation.py new file mode 100644 index 0000000..68ef866 --- /dev/null +++ b/scripts/smoke_automation.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +"""Live smoke test for the Automation Registry — exercises the production paths the unit suite mocks. + +The test suite injects fake skeleton providers and runners, so the *live default* glue is NOT covered: +the real `describe()` handshake to a live adapter, and the control-plane grant minting +(`GrantRef.from_secret(grant_id="control-plane", secret=)`) that drives `LocalExecutor` against +a real backend. This script drives that glue end to end against a running control plane + adapter, so +an operator can confirm the demo path before relying on it. + +It does NOT depend on the package — stdlib only — so it can run anywhere the control plane is reachable. + + NIL_REGISTRY_URL=https://cp.example.com \ + NIL_REGISTRY_TOKEN=... \ + NIL_SMOKE_WORKSPACE=ws_demo \ + python scripts/smoke_automation.py + +The workspace must already have an ACTIVE adapter. The plan defaults to a single +`commerce.create_product` (PocketBase demo); override with NIL_SMOKE_PLAN (a JSON Wosool program) to +match your live backend's verbs. Exit code 0 = the automation ran and reached a terminal state. +""" + +from __future__ import annotations + +import json +import os +import sys +import time +import urllib.error +import urllib.request + +BASE = os.environ.get("NIL_REGISTRY_URL", "").rstrip("/") +TOKEN = os.environ.get("NIL_REGISTRY_TOKEN", "") +WORKSPACE = os.environ.get("NIL_SMOKE_WORKSPACE", "ws_demo") +AUTOMATION_ID = os.environ.get("NIL_SMOKE_ID", "smoke-product") + +_DEFAULT_PLAN = { + "wosool": "0.1", "workspace": WORKSPACE, "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": "commerce", + "verb": "commerce.create_product", "args": {"name": "Smoke Test Product"}}], +} +PLAN = json.loads(os.environ["NIL_SMOKE_PLAN"]) if os.environ.get("NIL_SMOKE_PLAN") else _DEFAULT_PLAN +NAME = {"ar": "اختبار دخان", "en": "Smoke test"} + + +def _call(method: str, path: str, body: dict | None = None) -> tuple[int, dict]: + data = json.dumps(body).encode() if body is not None else None + req = urllib.request.Request(f"{BASE}{path}", data=data, method=method) # noqa: S310 + req.add_header("Content-Type", "application/json") + if TOKEN: + req.add_header("Authorization", f"Bearer {TOKEN}") + try: + with urllib.request.urlopen(req, timeout=20) as resp: # noqa: S310 + return resp.status, json.loads(resp.read().decode() or "{}") + except urllib.error.HTTPError as exc: + try: + return exc.code, json.loads(exc.read().decode() or "{}") + except ValueError: + return exc.code, {} + except (urllib.error.URLError, TimeoutError, OSError) as exc: + return 0, {"error": str(exc)} + + +def _step(label: str, status: int, payload: dict, *, ok_when=lambda s, p: s == 200) -> dict: + ok = ok_when(status, payload) + mark = "✓" if ok else "✗" + print(f" {mark} {label}: HTTP {status} {json.dumps(payload, ensure_ascii=False)[:240]}") + if not ok: + print(f"\nFAILED at: {label}") + sys.exit(1) + return payload + + +def main() -> None: + if not BASE: + print("set NIL_REGISTRY_URL (and NIL_REGISTRY_TOKEN) to the control plane", file=sys.stderr) + sys.exit(2) + print(f"Live smoke test → {BASE} workspace={WORKSPACE} id={AUTOMATION_ID}\n") + + print("1. draft (validates against the LIVE adapter skeleton)") + s, p = _call("POST", "/automations/draft", + {"automation_id": AUTOMATION_ID, "name": NAME, "plan": PLAN, "trigger": {"type": "manual"}}) + _step("draft", s, p, ok_when=lambda st, pl: st == 200 and pl.get("ok") is True) + + print("2. register (lands pending_approval in the SSOT)") + s, p = _call("POST", "/automations/register", + {"automation_id": AUTOMATION_ID, "name": NAME, "plan": PLAN, "trigger": {"type": "manual"}}) + d = _step("register", s, p, ok_when=lambda st, pl: st == 200 and pl.get("ok"))["definition"] + version = d["version"] + + print("3. approve (arm it)") + s, p = _call("POST", f"/automations/{WORKSPACE}/{AUTOMATION_ID}/{version}/state", + {"state": "active", "approved_by": "smoke"}) + _step("approve", s, p) + + print("4. run (the REAL execution path: grant minting + LocalExecutor against the live adapter)") + s, p = _call("POST", f"/automations/{WORKSPACE}/{AUTOMATION_ID}/run", + {"idempotency_key": f"smoke-{int(time.time())}"}) + run = _step("run", s, p, ok_when=lambda st, pl: st == 200 and pl.get("ok"))["run"] + print(f"\n✅ run state: {run.get('state')} (run_id={run.get('run_id')})") + if run.get("state") != "completed": + print(" note: run did not 'complete' — inspect the trace; the live execution path was reached.") + + +if __name__ == "__main__": + main() diff --git a/src/nilscript/automation/__init__.py b/src/nilscript/automation/__init__.py new file mode 100644 index 0000000..a89b117 --- /dev/null +++ b/src/nilscript/automation/__init__.py @@ -0,0 +1,62 @@ +"""Automation Registry: conversation-authored, deterministically-lowered, SSOT-stored workflows. + +An *automation* is a named, versioned, content-hashed `WosoolProgram` plus a trigger and a +lifecycle state. The agent drafts it (untrusted); the kernel validator lowers-or-rejects it +(deterministic); a human approves it; it lives in the control-plane store with its own version lock. + +See `docs/PLAN-dynamic-automation-ssot.md`. P1 ships the SSOT spine (models, content hash, draft +gate, persistence, lifecycle); schedule/event triggers and the dispatcher are P2. +""" + +from __future__ import annotations + +from nilscript.automation.authoring import DraftResult, draft_automation, register +from nilscript.automation.compose import ( + ComposedPlan, + ComposedResult, + Stage, + composed_hash, + parse_composed, + run_composed, + validate_composed, +) +from nilscript.automation.dispatch import Runner, fire_composed, fire_manual +from nilscript.automation.scheduler import dispatch_event, run_due_schedules +from nilscript.automation.skeleton import context_from_skeleton +from nilscript.automation.models import ( + AutomationDefinition, + AutomationState, + EventTrigger, + ManualTrigger, + ScheduleTrigger, + TriggerSpec, + content_hash, + parse_trigger, +) + +__all__ = [ + "AutomationDefinition", + "AutomationState", + "ComposedPlan", + "ComposedResult", + "DraftResult", + "Stage", + "EventTrigger", + "ManualTrigger", + "Runner", + "ScheduleTrigger", + "TriggerSpec", + "composed_hash", + "content_hash", + "context_from_skeleton", + "dispatch_event", + "draft_automation", + "fire_composed", + "fire_manual", + "parse_composed", + "parse_trigger", + "register", + "run_composed", + "run_due_schedules", + "validate_composed", +] diff --git a/src/nilscript/automation/authoring.py b/src/nilscript/automation/authoring.py new file mode 100644 index 0000000..997d0ed --- /dev/null +++ b/src/nilscript/automation/authoring.py @@ -0,0 +1,126 @@ +"""The authoring loop: draft (validate + hash, no effect) → register (persist to SSOT). + +`draft_automation` is the deterministic boundary. The agent supplies a raw plan + trigger +(untrusted); the kernel validator (V1-V6) lowers-or-rejects it against the live skeleton. Only a +plan that passes becomes a registrable `AutomationDefinition` carrying its content-hash. The agent +cannot talk past a refusal — a hallucinated verb has nothing to bind to (V4). +""" + +from __future__ import annotations + +import datetime +from dataclasses import dataclass +from typing import Any, Protocol + +from nilscript.automation.models import ( + AutomationDefinition, + AutomationState, + BilingualText, + content_hash, + parse_trigger, +) +from nilscript.kernel.context import ValidationContext +from nilscript.kernel.diagnostics import ValidationResult +from nilscript.kernel.models import WosoolProgram +from nilscript.kernel.validator import validate + + +class _Store(Protocol): + """The slice of the control-plane store the registry needs (keeps this module store-agnostic).""" + + def register_automation( + self, + *, + workspace: str, + automation_id: str, + content_hash: str, + name: dict[str, Any], + plan: dict[str, Any], + trigger: dict[str, Any], + state: str = ..., + authored_by: str = ..., + description: dict[str, Any] | None = ..., + approved_by: str | None = ..., + ) -> dict[str, Any]: ... + + +@dataclass(frozen=True) +class DraftResult: + """Outcome of a draft attempt. `ok` mirrors the validator verdict; on failure `definition` is + None and `diagnostics` carries the structured refusal (which node, which verb, why).""" + + ok: bool + diagnostics: ValidationResult + definition: AutomationDefinition | None = None + content_hash: str | None = None + + +def _now() -> str: + return datetime.datetime.now(datetime.UTC).isoformat() + + +def draft_automation( + *, + automation_id: str, + name: Any, + raw_plan: dict[str, Any], + trigger: Any, + ctx: ValidationContext, + authored_by: str = "", + description: Any | None = None, +) -> DraftResult: + """Validate the agent's candidate plan and, if admitted, build a version-1 draft definition. + + No side effect. The workspace is taken from the validated plan so the two cannot drift. + """ + result = validate(raw_plan, ctx) + if not result.ok: + return DraftResult(ok=False, diagnostics=result) + + program = WosoolProgram.model_validate(raw_plan) + digest = content_hash(program) + definition = AutomationDefinition( + automation_id=automation_id, + workspace=program.workspace, + version=1, + content_hash=digest, + name=name if isinstance(name, BilingualText) else BilingualText.model_validate(name), + description=( + None + if description is None + else description + if isinstance(description, BilingualText) + else BilingualText.model_validate(description) + ), + plan=program, + trigger=parse_trigger(trigger), + state="draft", + authored_by=authored_by, + created_at=_now(), + ) + return DraftResult(ok=True, diagnostics=result, definition=definition, content_hash=digest) + + +def register( + store: _Store, + definition: AutomationDefinition, + *, + state: AutomationState = "pending_approval", +) -> AutomationDefinition: + """Persist a drafted definition to the SSOT and return the canonical stored version. + + Registering a recurring automation is a governed act, so it lands in `pending_approval` by + default — never auto-armed. Re-registering an identical plan (same hash) is an idempotent no-op. + """ + row = store.register_automation( + workspace=definition.workspace, + automation_id=definition.automation_id, + content_hash=definition.content_hash, + name=definition.name.model_dump(), + description=definition.description.model_dump() if definition.description else None, + plan=definition.plan.model_dump(by_alias=True, mode="json"), + trigger=definition.trigger.model_dump(), + state=state, + authored_by=definition.authored_by, + ) + return AutomationDefinition.from_row(row) diff --git a/src/nilscript/automation/compose.py b/src/nilscript/automation/compose.py new file mode 100644 index 0000000..1b63d88 --- /dev/null +++ b/src/nilscript/automation/compose.py @@ -0,0 +1,183 @@ +"""Cross-system composition (P3): one automation spanning N adapters, with explicit data handoff. + +This is the sales→marketing→accounting shape. A composed plan is an ordered list of **stages**, each +a normal single-adapter `WosoolProgram` validated against *its own* adapter's skeleton and run by the +existing `LocalExecutor`. Between stages, named outputs are threaded forward by an **explicit** +`input_from` mapping (e.g. `{"lead_ref": "$.stage_1.step_2.output.id"}`) — surfaced as `$.input.*` +in the next stage. + +Two honesty boundaries make this safe rather than magic: +- **Semantic mapping is author-declared, not inferred.** We do not pretend system A's "customer" is + system B's "account"; the composer states the handoff, and B's own Choice Gate still resolves the + handed value to a verified id on B's side. No silent ontology guessing. +- **Authority is per-stage, not composed.** Each stage runs against its own adapter with that + adapter's own credentials (the `run_stage` the caller supplies binds them). The handoff carries + *data values*, never authority — so a composed plan cannot escalate across a boundary + (confused-deputy-safe by construction). What it CANNOT yet express — a single stage that itself + needs two backends' authority at once — is deliberately out of scope. +""" + +from __future__ import annotations + +import hashlib +import json +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from typing import Any + +from nilscript.automation.skeleton import context_from_skeleton +from nilscript.kernel.diagnostics import Diagnostic +from nilscript.kernel.executor import RunResult +from nilscript.kernel.validator import validate + + +def _handoff_source(ref: str) -> str | None: + """The stage a handoff ref reads from — `$.stage_1.step_2.output.id` → `stage_1`. None if not a + reference. (The kernel's parser whitelists `step_N`/`input`/`item` sources; cross-stage refs use + stage names, so composition resolves them itself.)""" + if isinstance(ref, str) and ref.startswith("$."): + return ref[2:].split(".", 1)[0] + return None + + +def _resolve_handoff(ref: Any, ctx: dict[str, Any]) -> Any: + """Walk a `$.stage.path.to.value` reference through the accumulated cross-stage context. A + non-reference is returned as-is; an unresolvable path yields None (never a fabricated value).""" + if not (isinstance(ref, str) and ref.startswith("$.")): + return ref + cursor: Any = ctx + for segment in ref[2:].split("."): + if isinstance(cursor, dict) and segment in cursor: + cursor = cursor[segment] + else: + return None + return cursor + +# Runs one stage's plan against `adapter` and returns its RunResult. +# Called as: run_stage(adapter, plan, run_id=..., input=...). +StageRunner = Callable[..., Awaitable[RunResult]] + + +@dataclass(frozen=True) +class Stage: + """One system's slice of a composed automation.""" + + name: str # routing label, must match STAGE_NAME_PATTERN (e.g. "stage_1") + adapter: str # adapter_id in the registry — which backend this stage runs against + plan: dict[str, Any] # a WosoolProgram dict, validated against `adapter`'s skeleton + input_from: dict[str, str] = field(default_factory=dict) # {input_key: "$.stage_k...output.f"} + + +@dataclass(frozen=True) +class ComposedPlan: + workspace: str + stages: tuple[Stage, ...] + + +@dataclass +class ComposedResult: + """Outcome of a composed run. `completed` is True only if every stage completed; a stage that + blocks stops the chain honestly (downstream stages do not run).""" + + completed: bool + stages: list[dict[str, Any]] = field(default_factory=list) + context: dict[str, Any] = field(default_factory=dict) + blocked_at: str | None = None + + +def composed_hash(raw: dict[str, Any]) -> str: + """Version lock for a composed plan — SHA256 over its canonical JSON (same discipline as a single + plan's `content_hash`).""" + canonical = json.dumps(raw, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest() + + +def parse_composed(raw: dict[str, Any]) -> ComposedPlan: + """Build a ComposedPlan from raw JSON ({workspace, stages:[{name, adapter, plan, input_from}]}).""" + stages = tuple( + Stage( + name=s["name"], adapter=s["adapter"], plan=s["plan"], + input_from=dict(s.get("input_from") or {}), + ) + for s in raw.get("stages", []) + ) + return ComposedPlan(workspace=raw["workspace"], stages=stages) + + +def validate_composed( + composed: ComposedPlan, skeleton_for: dict[str, dict[str, Any]] +) -> dict[str, Any]: + """Validate every stage's plan against ITS adapter's skeleton, plus handoff well-formedness. + + `skeleton_for` maps adapter_id → that adapter's discovery skeleton. Returns + `{"ok": bool, "stages": [{name, ok, diagnostics}], "errors": [...]}`. A handoff `input_from` that + references a stage not declared *before* the consuming stage is a composition error. + """ + seen: set[str] = set() + stage_reports: list[dict[str, Any]] = [] + errors: list[str] = [] + if len({s.name for s in composed.stages}) != len(composed.stages): + errors.append("duplicate stage names") + + for stage in composed.stages: + skeleton = skeleton_for.get(stage.adapter) + if skeleton is None: + errors.append(f"stage {stage.name!r}: no skeleton for adapter {stage.adapter!r}") + stage_reports.append({"name": stage.name, "ok": False, "diagnostics": []}) + seen.add(stage.name) + continue + ctx = context_from_skeleton(composed.workspace, skeleton) + result = validate(stage.plan, ctx) + diags = [_diag(d) for d in result.diagnostics] + # handoff: every `$.stage_k...` source must be a stage declared earlier in the chain. + for ref_str in (stage.input_from or {}).values(): + source = _handoff_source(ref_str) + if source is not None and source not in seen and source not in ("input", "item"): + errors.append( + f"stage {stage.name!r}: input_from references {source!r} which is not a prior stage" + ) + stage_reports.append({"name": stage.name, "ok": result.ok, "diagnostics": diags}) + seen.add(stage.name) + + ok = not errors and all(r["ok"] for r in stage_reports) + return {"ok": ok, "stages": stage_reports, "errors": errors} + + +def _diag(d: Diagnostic) -> dict[str, Any]: + return {"code": d.code, "severity": d.severity, "message": d.message, "node": d.node} + + +def _stage_input(stage: Stage, ctx: dict[str, Any]) -> dict[str, Any]: + """Resolve a stage's handoff mapping against the accumulated cross-stage context.""" + return {key: _resolve_handoff(ref, ctx) for key, ref in (stage.input_from or {}).items()} + + +async def run_composed( + composed: ComposedPlan, *, run_stage: StageRunner, run_id: str +) -> ComposedResult: + """Run each stage in order against its adapter, threading declared outputs into the next stage. + + The cross-stage context is `{stage_name: }`, so a handoff ref like + `$.stage_1.step_2.output.id` resolves to stage_1's step_2 output. A non-completing stage halts the + chain (honest partial) — never fabricates a downstream write on a missing handoff. + """ + ctx: dict[str, Any] = {} + out_stages: list[dict[str, Any]] = [] + for stage in composed.stages: + stage_input = _stage_input(stage, ctx) + result = await run_stage( + stage.adapter, stage.plan, run_id=f"{run_id}:{stage.name}", input=stage_input + ) + ctx[stage.name] = result.context + out_stages.append({ + "name": stage.name, + "adapter": stage.adapter, + "completed": result.completed, + "blocked_at": result.blocked_at, + "refusal": result.refusal, + }) + if not result.completed: + return ComposedResult( + completed=False, stages=out_stages, context=ctx, blocked_at=stage.name + ) + return ComposedResult(completed=True, stages=out_stages, context=ctx) diff --git a/src/nilscript/automation/dispatch.py b/src/nilscript/automation/dispatch.py new file mode 100644 index 0000000..6e38636 --- /dev/null +++ b/src/nilscript/automation/dispatch.py @@ -0,0 +1,152 @@ +"""The dispatcher: fire one run of an armed automation through the executor, recorded in the SSOT. + +P2. `fire_manual` is the only trigger P1/P2 execute end to end. It enforces the governance gate +(only an `active` automation runs), pins the exact stored version, derives a deterministic `run_id` +so a re-delivered fire replays rather than double-executes, and records the executor trace as a +first-class run row. The `runner` (what actually walks the plan) is injected so the orchestration is +testable without a live backend; the control-plane app supplies a `LocalExecutor`-backed default. +""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable +from typing import Any, Protocol + +from nilscript.automation.compose import ( + ComposedResult, + StageRunner, + parse_composed, + run_composed, +) +from nilscript.kernel.executor import RunResult + +# Walks a plan and returns its RunResult. (plan_dict, run_id) -> RunResult. +Runner = Callable[..., Awaitable[RunResult]] + + +class _Store(Protocol): + def get_automation( + self, workspace: str, automation_id: str, version: int | None = ... + ) -> dict[str, Any] | None: ... + def start_run(self, run_id: str, **kw: Any) -> bool: ... + def finish_run(self, run_id: str, state: str, trace: dict[str, Any] | None) -> bool: ... + def get_run(self, run_id: str) -> dict[str, Any] | None: ... + + +def _classify(result: RunResult) -> str: + """Map an executor RunResult onto a terminal run state. `completed` is the only success; a saga + unwind is `compensated`; a halt at a node is `blocked`; anything else partial is `partial`.""" + if result.completed: + return "completed" + if result.compensated: + return "compensated" + if result.blocked_at: + return "blocked" + return "partial" + + +def _trace(result: RunResult) -> dict[str, Any]: + return { + "completed": result.completed, + "partial": result.partial, + "blocked_at": result.blocked_at, + "refusal": result.refusal, + "compensated": result.compensated, + "notifications": result.notifications, + "context": result.context, + } + + +async def fire_manual( + store: _Store, + *, + workspace: str, + automation_id: str, + idempotency_key: str, + runner: Runner, + fired_by: str = "manual", +) -> dict[str, Any]: + """Fire the latest version of an automation now. Returns a result envelope: + + - `{"ok": False, "error": ..., "status": 404|409}` when it cannot run (unknown / not armed), + - `{"ok": True, "replayed": True, "run": ...}` for an idempotent re-fire (no re-execution), + - `{"ok": True, "run": ...}` after a fresh run (run row carries the terminal state + trace). + """ + auto = store.get_automation(workspace, automation_id) + if auto is None: + return {"ok": False, "error": "no such automation", "status": 404} + if auto["state"] != "active": + return { + "ok": False, + "error": f"automation is {auto['state']!r}, not active — approve/arm it first", + "status": 409, + } + + version, content_hash = auto["version"], auto["content_hash"] + run_id = f"{automation_id}:v{version}:{idempotency_key}" + + if not store.start_run( + run_id, workspace=workspace, automation_id=automation_id, version=version, + content_hash=content_hash, fired_by=fired_by, + ): + return {"ok": True, "replayed": True, "run": store.get_run(run_id)} + + try: + result = await runner(auto["plan"], run_id=run_id) + except Exception as exc: # noqa: BLE001 — a runner blow-up is a failed run, recorded honestly + store.finish_run(run_id, "failed", {"error": str(exc)}) + return {"ok": False, "error": str(exc), "status": 500, "run": store.get_run(run_id)} + + store.finish_run(run_id, _classify(result), _trace(result)) + return {"ok": True, "run": store.get_run(run_id)} + + +def _classify_composed(result: ComposedResult) -> str: + if result.completed: + return "completed" + return "blocked" if result.blocked_at else "partial" + + +async def fire_composed( + store: _Store, + *, + workspace: str, + automation_id: str, + idempotency_key: str, + stage_runner: StageRunner, + fired_by: str = "manual", +) -> dict[str, Any]: + """Fire a *composed* automation now — run each stage against its adapter, threading the declared + handoffs. Same gate/idempotency/recording as `fire_manual`; the run trace carries per-stage status.""" + auto = store.get_automation(workspace, automation_id) + if auto is None: + return {"ok": False, "error": "no such automation", "status": 404} + if auto.get("kind") != "composed": + return {"ok": False, "error": "automation is not composed", "status": 400} + if auto["state"] != "active": + return { + "ok": False, + "error": f"automation is {auto['state']!r}, not active — approve/arm it first", + "status": 409, + } + + version, content_hash = auto["version"], auto["content_hash"] + run_id = f"{automation_id}:v{version}:{idempotency_key}" + if not store.start_run( + run_id, workspace=workspace, automation_id=automation_id, version=version, + content_hash=content_hash, fired_by=fired_by, + ): + return {"ok": True, "replayed": True, "run": store.get_run(run_id)} + + try: + composed = parse_composed(auto["plan"]) + result = await run_composed(composed, run_stage=stage_runner, run_id=run_id) + except Exception as exc: # noqa: BLE001 — a stage-runner blow-up is a failed run, recorded honestly + store.finish_run(run_id, "failed", {"error": str(exc)}) + return {"ok": False, "error": str(exc), "status": 500, "run": store.get_run(run_id)} + + store.finish_run(run_id, _classify_composed(result), { + "completed": result.completed, "blocked_at": result.blocked_at, + "stages": result.stages, "context": result.context, + }) + return {"ok": True, "run": store.get_run(run_id)} diff --git a/src/nilscript/automation/models.py b/src/nilscript/automation/models.py new file mode 100644 index 0000000..0f52567 --- /dev/null +++ b/src/nilscript/automation/models.py @@ -0,0 +1,142 @@ +"""The SSOT record for an automation, its trigger union, and the version-lock content hash. + +Mirrors the kernel's DSL discipline (frozen, unknown members rejected). The `content_hash` over the +canonical-JSON of the validated program IS the "lock on a version": a run months later executes +exactly the bytes that were approved, and any out-of-band edit makes the hash mismatch. +""" + +from __future__ import annotations + +import hashlib +import json +from typing import Annotated, Any, Literal + +from pydantic import Field, TypeAdapter, model_validator + +from nilscript.kernel.models import ( + VERB_PATTERN, + BilingualText, + DslModel, + WosoolProgram, +) + +# Workspace-scoped slug: lowercase, digits, dashes/underscores. Stable across versions. +AUTOMATION_ID_PATTERN = r"^[a-z][a-z0-9_-]*$" + +AutomationState = Literal["draft", "pending_approval", "active", "paused", "archived"] + + +class ScheduleTrigger(DslModel): + """Fire on a clock. Both `interval_seconds` and `cron` fire locally via `POST /automations/tick` + (cron uses a self-contained subset matcher — see `triggers.cron_matches`). Temporal Schedules are + the durable cloud upgrade for the same TriggerSpec.""" + + type: Literal["schedule"] + cron: str | None = None + interval_seconds: int | None = Field(default=None, ge=1) + timezone: str = "Asia/Riyadh" + + @model_validator(mode="after") + def _one_of_cron_or_interval(self) -> ScheduleTrigger: + if (self.cron is None) == (self.interval_seconds is None): + raise ValueError("schedule trigger needs exactly one of `cron` or `interval_seconds`") + return self + + +class EventTrigger(DslModel): + """Fire when a matching NIL event lands in the control-plane ledger (P2 dispatcher).""" + + type: Literal["event"] + on_verb: str = Field(pattern=VERB_PATTERN) + on_event: Literal["executed", "refused", "rolled_back"] = "executed" + match: dict[str, Any] = Field(default_factory=dict) + source_adapter: str | None = None + + +class ManualTrigger(DslModel): + """Fire only on an explicit run request. The one trigger P1 supports end to end.""" + + type: Literal["manual"] + + +TriggerType = ScheduleTrigger | EventTrigger | ManualTrigger +TriggerSpec = Annotated[TriggerType, Field(discriminator="type")] + +_TRIGGER_ADAPTER: TypeAdapter[TriggerType] = TypeAdapter(TriggerSpec) + + +def parse_trigger(raw: Any) -> TriggerType: + """Parse a raw trigger (dict from the agent, or an already-built model) into the closed union. + + An unknown `type` is structurally unrepresentable — the discriminated union rejects it. + """ + if isinstance(raw, ScheduleTrigger | EventTrigger | ManualTrigger): + return raw + return _TRIGGER_ADAPTER.validate_python(raw) + + +def content_hash(plan: WosoolProgram) -> str: + """SHA256 over the canonical JSON of the validated program — the version lock. + + `by_alias=True` so `ForeachNode.as_` serialises as `as`; `sort_keys` + tight separators make the + encoding canonical so identical plans hash identically (idempotent registration). + """ + canonical = json.dumps( + plan.model_dump(by_alias=True, mode="json"), + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ) + return hashlib.sha256(canonical.encode("utf-8")).hexdigest() + + +class AutomationDefinition(DslModel): + """One version of one automation — the row that lives in the SSOT. + + `version` and `created_at` are authoritative once persisted; a freshly drafted (unstored) + definition carries version 1 and a provisional `created_at`. The plan's `workspace` is the + automation's workspace (enforced below) so the two can never drift. + """ + + automation_id: str = Field(pattern=AUTOMATION_ID_PATTERN) + workspace: str = Field(min_length=1) + version: int = Field(ge=1) + content_hash: str = Field(min_length=64, max_length=64) + name: BilingualText + description: BilingualText | None = None + plan: WosoolProgram + trigger: TriggerSpec + state: AutomationState + authored_by: str = "" + approved_by: str | None = None + created_at: str + superseded_by: int | None = None + + @model_validator(mode="after") + def _workspace_matches_plan(self) -> AutomationDefinition: + if self.plan.workspace != self.workspace: + raise ValueError( + f"automation workspace {self.workspace!r} != plan workspace {self.plan.workspace!r}" + ) + return self + + @classmethod + def from_row(cls, row: dict[str, Any]) -> AutomationDefinition: + """Rebuild from a deserialized store row (name/description/plan/trigger already JSON-parsed).""" + return cls( + automation_id=row["automation_id"], + workspace=row["workspace"], + version=row["version"], + content_hash=row["content_hash"], + name=BilingualText.model_validate(row["name"]), + description=( + BilingualText.model_validate(row["description"]) if row.get("description") else None + ), + plan=WosoolProgram.model_validate(row["plan"]), + trigger=parse_trigger(row["trigger"]), + state=row["state"], + authored_by=row.get("authored_by") or "", + approved_by=row.get("approved_by"), + created_at=row["created_at"], + superseded_by=row.get("superseded_by"), + ) diff --git a/src/nilscript/automation/scheduler.py b/src/nilscript/automation/scheduler.py new file mode 100644 index 0000000..ff09233 --- /dev/null +++ b/src/nilscript/automation/scheduler.py @@ -0,0 +1,73 @@ +"""The scheduler: turn ledger events and clock ticks into automation fires. + +Two seams, both single-instance and Temporal-free (the durable cloud sibling is the upgrade): +- `dispatch_event` — called when an event lands in the ledger; fires active EventTrigger automations + whose filter matches. A loop guard skips events that a control-plane-fired run itself produced. +- `run_due_schedules` — called by an external clock (cron/Temporal hitting `POST /automations/tick`); + fires active interval ScheduleTrigger automations that are due. + +Both reuse `fire_manual`, so triggered runs inherit the same governance gate (must be `active`), +version pinning, idempotency, and recorded trace as a manual run. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from nilscript.automation.dispatch import Runner, fire_manual +from nilscript.automation.models import EventTrigger, ScheduleTrigger, parse_trigger +from nilscript.automation.triggers import ( + event_fire_key, + event_matches, + schedule_due, + schedule_fire_key, +) + +# A control-plane-fired run's own events carry this grant; never let them re-trigger automations. +_CP_GRANT = "control-plane" + + +async def dispatch_event( + store: Any, envelope: dict[str, Any], *, runner: Runner, fired_by: str = "event" +) -> list[dict[str, Any]]: + """Fire every active EventTrigger automation in the event's workspace whose filter matches.""" + if (envelope.get("grant") or "") == _CP_GRANT: + return [] # loop guard — this event came from a triggered run + workspace = envelope.get("workspace", "") or "" + fired: list[dict[str, Any]] = [] + for auto in store.active_automations(): + if auto["workspace"] != workspace: + continue + trigger = parse_trigger(auto["trigger"]) + if not isinstance(trigger, EventTrigger) or not event_matches(trigger, envelope): + continue + fired.append( + await fire_manual( + store, workspace=workspace, automation_id=auto["automation_id"], + idempotency_key=event_fire_key(envelope), runner=runner, fired_by=fired_by, + ) + ) + return fired + + +async def run_due_schedules( + store: Any, *, runner: Runner, now: datetime, fired_by: str = "schedule" +) -> list[dict[str, Any]]: + """Fire every active interval-ScheduleTrigger automation that is due as of `now`.""" + fired: list[dict[str, Any]] = [] + for auto in store.active_automations(): + trigger = parse_trigger(auto["trigger"]) + if not isinstance(trigger, ScheduleTrigger): + continue + recent = store.list_runs(auto["workspace"], auto["automation_id"], limit=1) + last = recent[0]["started_at"] if recent else None + if not schedule_due(trigger, last, now): + continue + fired.append( + await fire_manual( + store, workspace=auto["workspace"], automation_id=auto["automation_id"], + idempotency_key=schedule_fire_key(trigger, now), runner=runner, fired_by=fired_by, + ) + ) + return fired diff --git a/src/nilscript/automation/skeleton.py b/src/nilscript/automation/skeleton.py new file mode 100644 index 0000000..7a17b07 --- /dev/null +++ b/src/nilscript/automation/skeleton.py @@ -0,0 +1,39 @@ +"""Build a `ValidationContext` from a live adapter's discovery skeleton. + +The draft gate validates the agent's plan against *what the bound backend actually declares*. We map +the `handshake`/`describe` verb list into the kernel's context: verbs grouped by skill prefix become +the V4 whitelist, and the workspace is granted exactly those verbs (so an authoring owner may use the +full surface of their own active adapter — and nothing that isn't on it). + +V5 argument typing stays permissive here (describe does not publish per-verb arg schemas), so a +hallucinated verb is caught by V4 — the load-bearing guarantee — without over-rejecting valid args. +""" + +from __future__ import annotations + +from typing import Any + +from nilscript.kernel.context import SkillSpec, ValidationContext + +# No per-verb arg schema from discovery ⇒ accept any args (V4 still gates the verb itself). +_PERMISSIVE_HINT: dict[str, Any] = {"additionalProperties": True} + + +def context_from_skeleton(workspace: str, skeleton: dict[str, Any]) -> ValidationContext: + """Map a `handshake` report ({verbs, targets, ...}) into a single-workspace ValidationContext.""" + verbs: list[str] = [v for v in skeleton.get("verbs", []) if isinstance(v, str)] + + by_skill: dict[str, set[str]] = {} + for verb in verbs: + skill = verb.split(".", 1)[0] if "." in verb else verb + by_skill.setdefault(skill, set()).add(verb) + + skills = { + name: SkillSpec(required_verbs=frozenset(group), hint_schema=_PERMISSIVE_HINT) + for name, group in by_skill.items() + } + return ValidationContext( + skills=skills, + read_verbs=frozenset(verbs), + workspaces={workspace: frozenset(verbs)}, + ) diff --git a/src/nilscript/automation/triggers.py b/src/nilscript/automation/triggers.py new file mode 100644 index 0000000..0514bb9 --- /dev/null +++ b/src/nilscript/automation/triggers.py @@ -0,0 +1,109 @@ +"""Pure trigger evaluation — does an incoming event / a clock tick fire this automation? + +Side-effect-free and total, so it is exhaustively testable on its own. The scheduler composes these +with the dispatcher. Cron schedules are deferred to Temporal (the durable cloud sibling); the local +single-instance ticker handles `interval_seconds` only — `schedule_due` returns False for cron. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from nilscript.automation.models import EventTrigger, ScheduleTrigger + + +def event_matches(trigger: EventTrigger, envelope: dict[str, Any]) -> bool: + """True when a ledger event envelope satisfies an EventTrigger (event kind, verb, field filter).""" + body = envelope.get("body") or {} + if body.get("event") != trigger.on_event: + return False + if body.get("verb") != trigger.on_verb: + return False + args = body.get("args") or {} + for key, value in trigger.match.items(): + if args.get(key) != value and body.get(key) != value: + return False + return True + + +def _matches_field(spec: str, value: int, lo: int, hi: int) -> bool: + """Match one cron field against a value. Supports `*`, `*/n`, `a-b`, `a-b/n`, `a,b`, and exact.""" + for part in spec.split(","): + body, step = (part.split("/", 1) + ["1"])[:2] if "/" in part else (part, "1") + try: + step_n = int(step) + if body == "*": + start, end = lo, hi + elif "-" in body: + a, b = body.split("-", 1) + start, end = int(a), int(b) + else: + start = end = int(body) + except ValueError: + continue # malformed field part — skip it rather than crash a tick + if start <= value <= end and step_n > 0 and (value - start) % step_n == 0: + return True + return False + + +def cron_matches(expr: str, dt: datetime) -> bool: + """Whether a 5-field cron expression (min hour day-of-month month day-of-week) matches `dt`. + + Supported subset (no `croniter` dependency — swap it in for full cron when the durable runtime + lands): `*`, `*/n`, ranges, lists, and exact values. Day-of-week is 0–6 (Sun=0); 7 also means Sun. + """ + fields = expr.split() + if len(fields) != 5: + return False + minute, hour, dom, month, dow = fields + dow_val = (dt.weekday() + 1) % 7 # Python Mon=0..Sun=6 → cron Sun=0..Sat=6 + dow_ok = _matches_field(dow, dow_val, 0, 7) or (dow_val == 0 and _matches_field(dow, 7, 0, 7)) + return ( + _matches_field(minute, dt.minute, 0, 59) + and _matches_field(hour, dt.hour, 0, 23) + and _matches_field(dom, dt.day, 1, 31) + and _matches_field(month, dt.month, 1, 12) + and dow_ok + ) + + +def schedule_due( + trigger: ScheduleTrigger, last_started_at: str | None, now: datetime +) -> bool: + """True when a schedule is due as of `now` — for both interval and (local subset) cron.""" + last: datetime | None = None + if last_started_at: + try: + last = datetime.fromisoformat(last_started_at) + except ValueError: + last = None # unparseable prior timestamp — treat as never-run + + if trigger.cron is not None: + if not cron_matches(trigger.cron, now): + return False + if last is None: + return True + # fire at most once per matching minute + return last.replace(second=0, microsecond=0) < now.replace(second=0, microsecond=0) + + if trigger.interval_seconds is None: + return False + if last is None: + return True + return (now - last).total_seconds() >= trigger.interval_seconds + + +def event_fire_key(envelope: dict[str, Any]) -> str: + """Idempotency key for an event-triggered fire — the triggering event's stable id, so a + re-delivered event replays the same run instead of firing twice.""" + return f"event:{envelope.get('id') or 'noid'}" + + +def schedule_fire_key(trigger: ScheduleTrigger, now: datetime) -> str: + """Idempotency key for a scheduled fire. Cron buckets to the minute (it fires once per matching + minute); interval buckets to the second. `schedule_due`'s last-run gate handles real dueness — the + key only collapses duplicate ticks within the same bucket into a single replayed run.""" + if trigger.cron is not None: + return "schedule:" + now.replace(second=0, microsecond=0).isoformat() + return "schedule:" + now.replace(microsecond=0).isoformat() diff --git a/src/nilscript/controlplane/app.py b/src/nilscript/controlplane/app.py index b5d55f3..7766c2c 100644 --- a/src/nilscript/controlplane/app.py +++ b/src/nilscript/controlplane/app.py @@ -8,16 +8,66 @@ from __future__ import annotations +import asyncio +import datetime as _dt import hashlib import hmac import json import os from typing import Any +from collections.abc import Awaitable, Callable + from fastapi import FastAPI, Header, Request from fastapi.responses import HTMLResponse, JSONResponse +from pydantic import ValidationError +from nilscript.automation import ( + Runner, + composed_hash, + context_from_skeleton, + dispatch_event, + draft_automation, + fire_composed, + fire_manual, + parse_composed, + parse_trigger, + register, + run_due_schedules, + validate_composed, +) +from nilscript.automation.compose import StageRunner from nilscript.controlplane.store import EventStore +from nilscript.kernel.diagnostics import ValidationResult +from nilscript.kernel.executor import LocalExecutor +from nilscript.sdk.client import NilClient +from nilscript.sdk.connect import handshake +from nilscript.sdk.grants import GrantRef +from nilscript.sdk.transport import NilTransport + + +def _plan_scopes(plan: dict[str, Any]) -> frozenset[str]: + """Grant scopes for a control-plane-fired run: each verb plus its `skill.*` wildcard.""" + scopes: set[str] = set() + for node in plan.get("pipeline", []) if isinstance(plan, dict) else []: + verb = node.get("verb") if isinstance(node, dict) else None + if isinstance(verb, str) and verb: + scopes.add(verb) + scopes.add(verb.split(".", 1)[0] + ".*") + return frozenset(scopes) or frozenset({"*"}) + +# An async source of a workspace's live adapter skeleton ({verbs, targets, ...}), or None when there +# is no reachable/conformant active adapter. Injectable so the draft gate is testable without a backend. +SkeletonProvider = Callable[[str], Awaitable[dict[str, Any] | None]] +# Skeleton of a SPECIFIC adapter by id (for cross-system composed plans). (workspace, adapter_id) -> skeleton|None. +AdapterSkeletonProvider = Callable[[str, str], Awaitable[dict[str, Any] | None]] + + +def _diag_list(result: ValidationResult) -> list[dict[str, Any]]: + return [ + {"code": d.code, "severity": d.severity, "message": d.message, "node": d.node} + for d in result.diagnostics + ] def _redact(adapter: dict[str, Any]) -> dict[str, Any]: @@ -33,6 +83,10 @@ def create_app( store: EventStore | None = None, *, secret: str | None = None, registry_token: str | None = None, + skeleton_provider: SkeletonProvider | None = None, + runner: Runner | None = None, + adapter_skeleton_provider: AdapterSkeletonProvider | None = None, + stage_runner: StageRunner | None = None, ) -> FastAPI: store = store if store is not None else EventStore() secret = secret if secret is not None else os.environ.get("NIL_EVENTS_SECRET", "") @@ -49,6 +103,93 @@ def _registry_authed(authorization: str | None) -> bool: return True return bool(authorization) and hmac.compare_digest(authorization, f"Bearer {registry_token}") + async def _live_skeleton(workspace: str) -> dict[str, Any] | None: + """Default skeleton source: discover the workspace's active adapter over NIL. None when there + is no active adapter, it's unreachable, or it doesn't answer with a conformant describe.""" + active = store.active_adapter(workspace) + if not active or not active.get("url"): + return None + transport = NilTransport(base_url=active["url"], bearer_secret=active.get("bearer", "") or "") + try: + report = await handshake(transport) + finally: + await transport.aclose() + if not report.get("reachable") or not report.get("conformant"): + return None + return report + + provider: SkeletonProvider = skeleton_provider or _live_skeleton + + async def _live_runner(plan: dict[str, Any], *, run_id: str) -> Any: + """Default runner: walk the pinned plan against the workspace's active adapter via a headless + LocalExecutor. The adapter bearer is the transport auth; the grant scopes are the plan's own + verbs. (Production grant minting is the one knob to revisit when CP-initiated runs need a + distinct identity from the adapter bearer.)""" + ws = plan.get("workspace", "") if isinstance(plan, dict) else "" + active = store.active_adapter(ws) + if not active or not active.get("url"): + raise RuntimeError(f"no active adapter for workspace {ws!r}") + bearer = active.get("bearer", "") or "" + transport = NilTransport(base_url=active["url"], bearer_secret=bearer) + grant = GrantRef.from_secret( + grant_id="control-plane", workspace=ws, secret=bearer or "cp", + scopes=_plan_scopes(plan), + ) + client = NilClient(transport=transport, grant=grant) + try: + executor = LocalExecutor( + client, run_id=run_id, session_id=run_id, locale=plan.get("locale", "ar") + ) + return await executor.execute(plan) + finally: + await transport.aclose() + + run_exec: Runner = runner or _live_runner + + async def _live_adapter_skeleton(workspace: str, adapter_id: str) -> dict[str, Any] | None: + """Discover a SPECIFIC registered adapter (by id) over NIL — for composed-plan validation, + where each stage names its own backend (which may not be the workspace's active one).""" + match = next( + (a for a in store.list_adapters(workspace) + if a.get("adapter_id") == adapter_id and a.get("url")), + None, + ) + if match is None: + return None + transport = NilTransport(base_url=match["url"], bearer_secret=match.get("bearer", "") or "") + try: + report = await handshake(transport) + finally: + await transport.aclose() + return report if report.get("reachable") and report.get("conformant") else None + + adapter_skeletons: AdapterSkeletonProvider = adapter_skeleton_provider or _live_adapter_skeleton + + async def _live_stage_runner(adapter: str, plan: dict[str, Any], *, run_id: str, input: dict[str, Any]) -> Any: + """Run one composed stage against the named adapter (by id) via a headless LocalExecutor.""" + ws = plan.get("workspace", "") if isinstance(plan, dict) else "" + match = next( + (a for a in store.list_adapters(ws) if a.get("adapter_id") == adapter and a.get("url")), + None, + ) + if match is None: + raise RuntimeError(f"no registered adapter {adapter!r} in workspace {ws!r}") + bearer = match.get("bearer", "") or "" + transport = NilTransport(base_url=match["url"], bearer_secret=bearer) + grant = GrantRef.from_secret( + grant_id="control-plane", workspace=ws, secret=bearer or "cp", scopes=_plan_scopes(plan), + ) + client = NilClient(transport=transport, grant=grant) + try: + return await LocalExecutor( + client, run_id=run_id, session_id=run_id, locale=plan.get("locale", "ar") + ).execute(plan, input=input or None) + finally: + await transport.aclose() + + stage_exec: StageRunner = stage_runner or _live_stage_runner + _bg_tasks: set[asyncio.Task[Any]] = set() # keep fire-and-forget dispatch tasks from being GC'd + @app.get("/healthz") def healthz() -> dict[str, Any]: return {"status": "ok", "events": store.count()} @@ -71,6 +212,13 @@ async def ingest( return JSONResponse({"error": "bad json"}, status_code=400) seq = int(x_nil_sequence) if (x_nil_sequence and x_nil_sequence.lstrip("-").isdigit()) else None new = store.ingest(envelope, seq, source=x_nil_source or "mcp") + if new: + # Fire event-triggered automations off the request path — ingest must stay fast and must + # not block on (or fail because of) a downstream run. The loop guard in dispatch_event + # skips events that triggered runs themselves produced. + task = asyncio.create_task(dispatch_event(store, envelope, runner=run_exec)) + _bg_tasks.add(task) + task.add_done_callback(_bg_tasks.discard) return {"ok": True, "new": new} @app.get("/api/events") @@ -119,6 +267,31 @@ def pending() -> dict[str, Any]: def adapters() -> dict[str, Any]: return {"adapters": store.adapters()} + @app.get("/api/automations") + def api_automations() -> dict[str, Any]: + """Dashboard view of every automation (latest version, all workspaces). Public read — no + secrets in the record; the heavy plan is summarised, not shipped whole.""" + out: list[dict[str, Any]] = [] + for a in store.all_automations(): + plan = a.get("plan") or {} + if a.get("kind") == "composed": + stages = plan.get("stages") or [] + summary = { + "stages": len(stages), + "adapters": sorted({s.get("adapter") for s in stages if isinstance(s, dict)}), + } + else: + summary = {"nodes": len(plan.get("pipeline") or [])} + out.append({ + "workspace": a["workspace"], "automation_id": a["automation_id"], + "version": a["version"], "content_hash": a["content_hash"], + "kind": a.get("kind", "single"), "name": a.get("name") or {}, + "state": a["state"], "trigger": a.get("trigger") or {}, + "approved_by": a.get("approved_by"), "authored_by": a.get("authored_by"), + "created_at": a.get("created_at"), "plan_summary": summary, + }) + return {"automations": out} + # ── active-adapter registry (multi-tenant routing) ─────────────────────────────────────── @app.post("/adapters/register") async def register_adapter(request: Request, authorization: str | None = Header(default=None)) -> Any: @@ -170,6 +343,246 @@ def get_active_adapter(workspace: str = "", authorization: str | None = Header(d return JSONResponse({"error": "no active adapter"}, status_code=404) return {"adapter": active} + # ── automation registry (conversation-authored, deterministically lowered, SSOT-stored) ───── + async def _draft_from_body(body: dict[str, Any]) -> tuple[Any, Any]: + """Validate a draft request against the workspace's live skeleton. Returns (DraftResult, None) + or (None, JSONResponse-error). The plan's own `workspace` selects the adapter to validate + against, so the lowered plan is bounded by the backend that will actually run it.""" + plan = body.get("plan") + aid, name, trigger = body.get("automation_id"), body.get("name"), body.get("trigger") + if not isinstance(plan, dict) or not aid or name is None or trigger is None: + return None, JSONResponse( + {"error": "automation_id, name, plan, trigger are required"}, status_code=400 + ) + ws = plan.get("workspace") + if not ws: + return None, JSONResponse({"error": "plan.workspace is required"}, status_code=400) + skeleton = await provider(ws) + if skeleton is None: + return None, JSONResponse( + {"error": "no reachable active adapter for this workspace"}, status_code=503 + ) + ctx = context_from_skeleton(ws, skeleton) + try: + res = draft_automation( + automation_id=aid, name=name, raw_plan=plan, trigger=trigger, ctx=ctx, + authored_by=body.get("authored_by", "") or "", description=body.get("description"), + ) + except (ValidationError, ValueError) as exc: + return None, JSONResponse({"error": f"malformed request: {exc}"}, status_code=400) + return res, None + + @app.post("/automations/draft") + async def automation_draft(request: Request, authorization: str | None = Header(default=None)) -> Any: + """Preview: lower the agent's candidate plan against the live skeleton. No side effect. + Returns the validator verdict + content-hash, or a structured refusal.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + try: + body = await request.json() + except (ValueError, TypeError): + return JSONResponse({"error": "bad json"}, status_code=400) + res, err = await _draft_from_body(body) + if err is not None: + return err + if not res.ok: + return {"ok": False, "refusal": _diag_list(res.diagnostics)} + return { + "ok": True, + "content_hash": res.content_hash, + "definition": res.definition.model_dump(by_alias=True, mode="json"), + } + + @app.post("/automations/register") + async def automation_register(request: Request, authorization: str | None = Header(default=None)) -> Any: + """Persist a passing draft to the SSOT as `pending_approval` (never auto-armed). Re-registering + an identical plan is an idempotent no-op. A failing plan is refused — never stored.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + try: + body = await request.json() + except (ValueError, TypeError): + return JSONResponse({"error": "bad json"}, status_code=400) + res, err = await _draft_from_body(body) + if err is not None: + return err + if not res.ok: + return JSONResponse({"ok": False, "refusal": _diag_list(res.diagnostics)}, status_code=400) + stored = register(store, res.definition) # lands in pending_approval + return {"ok": True, "definition": stored.model_dump(by_alias=True, mode="json")} + + # ── cross-system composed automations (P3) ────────────────────────────────────────────── + async def _validate_composed_body(body: dict[str, Any]) -> tuple[Any, Any]: + """Validate a composed-plan request: each stage against ITS adapter's live skeleton + handoff + well-formedness + a valid trigger. Returns ((composed_raw, report), None) or (None, error).""" + composed = body.get("composed") + aid, name, trigger = body.get("automation_id"), body.get("name"), body.get("trigger") + if not isinstance(composed, dict) or not aid or name is None or trigger is None: + return None, JSONResponse( + {"error": "automation_id, name, composed, trigger are required"}, status_code=400 + ) + ws = composed.get("workspace") + if not ws: + return None, JSONResponse({"error": "composed.workspace is required"}, status_code=400) + try: + parse_trigger(trigger) + except (ValidationError, ValueError, TypeError) as exc: + return None, JSONResponse({"error": f"bad trigger: {exc}"}, status_code=400) + stages = composed.get("stages") or [] + adapter_ids = {s.get("adapter") for s in stages if isinstance(s, dict)} + skeletons: dict[str, Any] = {} + for adapter_id in adapter_ids: + skeleton = await adapter_skeletons(ws, adapter_id) + if skeleton is None: + return None, JSONResponse( + {"error": f"no reachable adapter {adapter_id!r} in workspace {ws!r}"}, + status_code=503, + ) + skeletons[adapter_id] = skeleton + try: + parsed = parse_composed(composed) + except (KeyError, TypeError) as exc: + return None, JSONResponse({"error": f"malformed composed plan: {exc}"}, status_code=400) + return (composed, validate_composed(parsed, skeletons)), None + + @app.post("/automations/compose/draft") + async def compose_draft(request: Request, authorization: str | None = Header(default=None)) -> Any: + """Preview: validate a cross-system composed plan, each stage against its adapter. No effect.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + try: + body = await request.json() + except (ValueError, TypeError): + return JSONResponse({"error": "bad json"}, status_code=400) + res, err = await _validate_composed_body(body) + if err is not None: + return err + composed, report = res + if not report["ok"]: + return {"ok": False, "report": report} + return {"ok": True, "content_hash": composed_hash(composed), "report": report} + + @app.post("/automations/compose/register") + async def compose_register(request: Request, authorization: str | None = Header(default=None)) -> Any: + """Persist a passing composed plan as `pending_approval` (kind='composed').""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + try: + body = await request.json() + except (ValueError, TypeError): + return JSONResponse({"error": "bad json"}, status_code=400) + res, err = await _validate_composed_body(body) + if err is not None: + return err + composed, report = res + if not report["ok"]: + return JSONResponse({"ok": False, "report": report}, status_code=400) + stored = store.register_automation( + workspace=composed["workspace"], automation_id=body["automation_id"], + content_hash=composed_hash(composed), name=body["name"], plan=composed, + trigger=body["trigger"], state="pending_approval", kind="composed", + authored_by=body.get("authored_by", "") or "", description=body.get("description"), + ) + return {"ok": True, "definition": stored} + + @app.get("/automations") + def automations_list(workspace: str = "") -> dict[str, Any]: + """Latest version of every automation in a workspace (public read — no secrets in the record).""" + return {"automations": store.list_automations(workspace)} + + @app.get("/automations/{workspace}/{automation_id}") + def automation_get(workspace: str, automation_id: str, version: int | None = None) -> Any: + a = store.get_automation(workspace, automation_id, version) + if a is None: + return JSONResponse({"error": "no such automation"}, status_code=404) + return a + + @app.post("/automations/{workspace}/{automation_id}/{version}/state") + async def automation_set_state( + workspace: str, automation_id: str, version: int, request: Request, + authorization: str | None = Header(default=None), + ) -> Any: + """Arm/disarm/approve an automation (operator-gated). Approving (→ active) records the owner. + Arming a recurring automation is a governance act, so it sits behind the registry token.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + try: + body = await request.json() + except (ValueError, TypeError): + return JSONResponse({"error": "bad json"}, status_code=400) + state = body.get("state") + if state not in ("pending_approval", "active", "paused", "archived"): + return JSONResponse( + {"error": "state must be pending_approval|active|paused|archived"}, status_code=400 + ) + ok = store.set_automation_state( + workspace, automation_id, version, state, approved_by=body.get("approved_by") + ) + if not ok: + return JSONResponse({"error": "no such automation version"}, status_code=404) + return {"ok": True, "automation": store.get_automation(workspace, automation_id, version)} + + @app.post("/automations/{workspace}/{automation_id}/run") + async def automation_run( + workspace: str, automation_id: str, request: Request, + authorization: str | None = Header(default=None), + ) -> Any: + """Fire the active automation now (manual trigger). Requires an `idempotency_key` so a + re-delivered fire replays the same run rather than executing twice. Operator-gated.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + try: + body = await request.json() + except (ValueError, TypeError): + body = {} + idem = body.get("idempotency_key") + if not idem or len(str(idem)) < 6: + return JSONResponse( + {"error": "idempotency_key (>= 6 chars) is required"}, status_code=400 + ) + fired_by = body.get("fired_by", "manual") or "manual" + auto = store.get_automation(workspace, automation_id) + if auto is not None and auto.get("kind") == "composed": + out = await fire_composed( + store, workspace=workspace, automation_id=automation_id, + idempotency_key=str(idem), stage_runner=stage_exec, fired_by=fired_by, + ) + else: + out = await fire_manual( + store, workspace=workspace, automation_id=automation_id, + idempotency_key=str(idem), runner=run_exec, fired_by=fired_by, + ) + if not out.get("ok"): + return JSONResponse(out, status_code=out.pop("status", 400)) + return out + + @app.post("/automations/tick") + async def automations_tick(authorization: str | None = Header(default=None)) -> Any: + """Fire interval-scheduled automations that are due. Called by an external clock (cron / + Temporal Schedule) — the control plane decides *which* are due; the caller owns the tick. + Operator-gated.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + now = _dt.datetime.now(_dt.UTC) + fired = await run_due_schedules(store, runner=run_exec, now=now) + return { + "ok": True, + "fired": len(fired), + "runs": [f["run"] for f in fired if f.get("ok") and f.get("run")], + } + + @app.get("/automations/{workspace}/{automation_id}/runs") + def automation_runs(workspace: str, automation_id: str, limit: int = 50) -> dict[str, Any]: + """Newest-first run history for one automation (trace omitted — fetch via /runs/{run_id}).""" + return {"runs": store.list_runs(workspace, automation_id, limit)} + + @app.get("/runs/{run_id}") + def run_detail(run_id: str) -> Any: + run = store.get_run(run_id) + if run is None: + return JSONResponse({"error": "no such run"}, status_code=404) + return run + @app.get("/", response_class=HTMLResponse) def index() -> str: return _INDEX_HTML @@ -359,6 +772,43 @@ def index() -> str: .acard .st b{color:var(--fg)} .acard .ch{color:var(--faint);font-size:11px;margin-top:6px} + /* ── automations ── */ + #autoWrap{margin-bottom:26px;display:none} + .auth-row{display:flex;align-items:center;gap:8px;margin:0 2px 12px;flex-wrap:wrap} + .auth-row input{background:var(--elev);border:1px solid var(--line2);border-radius:8px;color:var(--fg); + font:12px var(--mono);padding:6px 10px;width:230px;max-width:60vw} + .auth-row .hint{color:var(--faint);font-size:11px} + .auth-row .ok{color:var(--green);font-size:11px} + #automations{display:grid;grid-template-columns:repeat(auto-fill,minmax(320px,1fr));gap:12px} + .autocard{border:1px solid var(--line);border-radius:var(--radius);background:var(--panel); + padding:14px 16px;position:relative;overflow:hidden} + .autocard::before{content:"";position:absolute;left:0;top:0;bottom:0;width:3px;background:var(--faint)} + .autocard.s-active::before{background:var(--green)} + .autocard.s-pending_approval::before{background:var(--amber)} + .autocard.s-paused::before{background:var(--blue)} + .autocard.s-archived::before{background:var(--line2)} + .autocard .top{display:flex;align-items:center;gap:8px;flex-wrap:wrap} + .autocard .nm{font-weight:600;color:var(--verb);font-size:14.5px;flex:1 1 auto;min-width:0;word-break:break-word} + .sbadge{font-size:11px;padding:2px 9px;border-radius:999px;border:1px solid var(--line2);color:var(--mut);white-space:nowrap} + .sbadge.s-active{color:var(--green);border-color:rgba(70,194,102,.45);background:rgba(70,194,102,.08)} + .sbadge.s-pending_approval{color:#f0c674;border-color:rgba(224,166,41,.45);background:rgba(224,166,41,.08)} + .sbadge.s-paused{color:var(--blue);border-color:rgba(91,140,255,.35);background:rgba(91,140,255,.08)} + .sbadge.s-draft,.sbadge.s-archived{color:var(--faint)} + .autocard .meta{display:flex;flex-wrap:wrap;gap:6px;margin:9px 0 4px} + .autocard .meta span{font-size:11px;color:var(--mut);border:1px solid var(--line2);border-radius:6px;padding:1px 7px} + .autocard .meta .kind{color:var(--violet);border-color:rgba(168,119,247,.3)} + .autocard .meta .trig{color:var(--blue);border-color:rgba(91,140,255,.3)} + .autocard .sub{color:var(--faint);font-size:11px;margin-top:5px;word-break:break-all} + .autocard .acts{display:flex;gap:7px;margin-top:11px;flex-wrap:wrap} + .autocard .runs{margin-top:10px;border-top:1px solid var(--line);padding-top:9px;display:none} + .autocard .runs.open{display:block} + .runrow{display:flex;align-items:center;gap:8px;font-size:11.5px;padding:3px 0;color:var(--mut)} + .runrow .rst{padding:1px 7px;border-radius:5px;border:1px solid var(--line2);font-size:10.5px} + .runrow .rst.completed{color:var(--green);border-color:rgba(70,194,102,.4)} + .runrow .rst.failed,.runrow .rst.blocked{color:#ff9a90;border-color:rgba(251,90,78,.4)} + .runrow .rst.partial,.runrow .rst.compensated{color:#f0c674;border-color:rgba(224,166,41,.45)} + .runrow .rst.running{color:var(--blue);border-color:rgba(91,140,255,.35)} + /* toast */ #toast{position:fixed;left:50%;bottom:24px;transform:translateX(-50%) translateY(20px); background:var(--elev);border:1px solid var(--line2);color:var(--fg);padding:11px 16px; @@ -400,6 +850,16 @@ def index() -> str:
+
+
Automations 0 + — conversation-authored workflows: state, trigger, version & run history
+
+ + controls (approve / pause / run) need the registry token — view is open +
+
+
+
Activity — every agent action, one pane
@@ -634,7 +1094,75 @@ def index() -> str: } function applyThemeGlyph(){var b=document.getElementById('themeBtn');if(b)b.textContent=document.documentElement.getAttribute('data-theme')==='light'?'☀':'☾';} function toggleTheme(){var next=document.documentElement.getAttribute('data-theme')==='light'?'dark':'light';document.documentElement.setAttribute('data-theme',next);try{localStorage.setItem('cp-theme',next);}catch(e){}applyThemeGlyph();} -tick();pend();loadAdapters();loadRouting();applyThemeGlyph();setInterval(()=>{tick();pend();loadAdapters();loadRouting();},2000); + +// ── automations: view + (token-gated) control ─────────────────────────────────────────────────── +function tokenVal(){try{return localStorage.getItem('cp-optoken')||'';}catch(e){return '';}} +function saveToken(){var v=document.getElementById('optoken').value;try{localStorage.setItem('cp-optoken',v);}catch(e){}paintTokHint(v);} +function paintTokHint(v){var h=document.getElementById('tokhint');if(!h)return;h.className=v?'ok':'hint'; + h.textContent=v?'token set — controls enabled in this browser only':'controls (approve / pause / run) need the registry token — view is open';} +function initToken(){var i=document.getElementById('optoken');if(i){i.value=tokenVal();paintTokHint(tokenVal());}} +function authHeaders(){var t=tokenVal();var h={'Content-Type':'application/json'};if(t)h['Authorization']='Bearer '+t;return h;} +function trigSummary(t){if(!t)return '—';if(t.type==='manual')return 'manual'; + if(t.type==='schedule')return t.cron?('cron '+t.cron):('every '+t.interval_seconds+'s'); + if(t.type==='event')return 'on '+(t.on_verb||'?')+' → '+(t.on_event||'executed');return t.type;} +function abtn(label,cls,onclick){return ``;} + +async function loadAutomations(){ + try{ + const r=await fetch('/api/automations');const {automations}=await r.json(); + const wrap=document.getElementById('autoWrap'),box=document.getElementById('automations'); + document.getElementById('autocount').textContent=automations.length; + wrap.style.display=automations.length?'block':'none'; + box.innerHTML=automations.map(a=>{ + const nm=(a.name&&(a.name.en||a.name.ar))||a.automation_id; + const ps=a.plan_summary||{}; + const shape=a.kind==='composed'?((ps.stages||0)+' stages · '+((ps.adapters||[]).join(', ')||'—')):((ps.nodes||0)+' steps'); + const st=a.state,rid=esc(a.workspace)+'-'+esc(a.automation_id); + const acts=[]; + if(st==='pending_approval')acts.push(abtn('✓ Approve','ok',`autoState('${a.workspace}','${a.automation_id}',${a.version},'active')`)); + if(st==='active'){acts.push(abtn('⏸ Pause','ghost',`autoState('${a.workspace}','${a.automation_id}',${a.version},'paused')`)); + acts.push(abtn('▶ Run now','',`autoRun('${a.workspace}','${a.automation_id}')`));} + if(st==='paused')acts.push(abtn('▶ Resume','ok',`autoState('${a.workspace}','${a.automation_id}',${a.version},'active')`)); + acts.push(abtn('↻ Runs','ghost',`toggleRuns('${a.workspace}','${a.automation_id}')`)); + return `
+
${esc(nm)}${esc(st)}
+
${esc(a.kind)}${esc(trigSummary(a.trigger))} + v${a.version}${esc(a.workspace)}${esc(shape)}
+
${esc((a.content_hash||'').slice(0,12))}…${a.approved_by?(' · approved by '+esc(a.approved_by)):''}
+
${acts.join('')}
+
+
`; + }).join(''); + }catch(_){} +} +async function autoState(ws,id,ver,state){ + if(!tokenVal())return toast('Enter the operator token above to control automations.'); + try{const r=await fetch(`/automations/${ws}/${id}/${ver}/state`,{method:'POST',headers:authHeaders(),body:JSON.stringify({state,approved_by:'operator'})}); + if(r.status===401)return toast('Operator token rejected.'); + toast(state==='active'?'Armed — the automation is now active.':state==='paused'?'Paused.':'Updated.');loadAutomations(); + }catch(_){toast('Could not update the automation.');} +} +async function autoRun(ws,id){ + if(!tokenVal())return toast('Enter the operator token above to run automations.'); + try{const r=await fetch(`/automations/${ws}/${id}/run`,{method:'POST',headers:authHeaders(),body:JSON.stringify({idempotency_key:'ui-'+Date.now()})}); + if(r.status===401)return toast('Operator token rejected.');const d=await r.json(); + toast(d.run?('Run '+(d.run.state||'started')+'.'):'Fired.');loadAutomations();openRuns(ws,id); + }catch(_){toast('Could not run the automation.');} +} +function toggleRuns(ws,id){const el=document.getElementById('runs-'+ws+'-'+id);if(!el)return; + if(el.classList.contains('open')){el.classList.remove('open');el.innerHTML='';}else openRuns(ws,id);} +async function openRuns(ws,id){ + const el=document.getElementById('runs-'+ws+'-'+id);if(!el)return; + el.classList.add('open');el.innerHTML='
loading…
'; + try{const r=await fetch(`/automations/${ws}/${id}/runs?limit=8`);const {runs}=await r.json(); + el.innerHTML=runs.length?runs.map(x=>`
${esc(x.state)} + ${esc((x.fired_by||'').slice(0,20))}${hhmmss(x.started_at)}
`).join('') + :'
no runs yet
'; + }catch(_){el.innerHTML='
could not load runs
';} +} + +initToken();tick();pend();loadAdapters();loadRouting();loadAutomations();applyThemeGlyph(); +setInterval(()=>{tick();pend();loadAdapters();loadRouting();loadAutomations();},2000); """ diff --git a/src/nilscript/controlplane/store.py b/src/nilscript/controlplane/store.py index 3de78ce..1a959a5 100644 --- a/src/nilscript/controlplane/store.py +++ b/src/nilscript/controlplane/store.py @@ -58,8 +58,51 @@ updated_at TEXT NOT NULL, PRIMARY KEY (workspace, adapter_id) ); + +-- Automation Registry (SSOT): one row per VERSION of one automation. Append-only — "editing" an +-- automation writes a new version and archives the prior one (superseded_by). `content_hash` is the +-- version lock (sha256 over the validated plan). See docs/PLAN-dynamic-automation-ssot.md. +CREATE TABLE IF NOT EXISTS automations ( + workspace TEXT NOT NULL DEFAULT '', + automation_id TEXT NOT NULL, + version INTEGER NOT NULL, + content_hash TEXT NOT NULL, + kind TEXT NOT NULL DEFAULT 'single', + name TEXT NOT NULL DEFAULT '{}', + description TEXT, + plan TEXT NOT NULL, + trigger TEXT NOT NULL, + state TEXT NOT NULL DEFAULT 'draft', + authored_by TEXT NOT NULL DEFAULT '', + approved_by TEXT, + created_at TEXT NOT NULL, + superseded_by INTEGER, + PRIMARY KEY (workspace, automation_id, version) +); + +-- Automation runs: one execution of one (pinned) automation version. `run_id` is deterministic +-- (automation:version:fire) so a re-delivered fire replays the same row, never double-executes. +CREATE TABLE IF NOT EXISTS automation_runs ( + run_id TEXT PRIMARY KEY, + workspace TEXT NOT NULL DEFAULT '', + automation_id TEXT NOT NULL, + version INTEGER NOT NULL, + content_hash TEXT NOT NULL, + fired_by TEXT NOT NULL DEFAULT '', + state TEXT NOT NULL DEFAULT 'running', + trace TEXT, + started_at TEXT NOT NULL, + ended_at TEXT +); +CREATE INDEX IF NOT EXISTS ix_runs_auto ON automation_runs(workspace, automation_id, started_at DESC); """ +# Columns surfaced by the automation registry reads (JSON columns parsed back by `_automation_row`). +_AUTOMATION_COLS = ( + "workspace, automation_id, version, content_hash, kind, name, description, plan, trigger, " + "state, authored_by, approved_by, created_at, superseded_by" +) + # Columns surfaced by the registry read methods (bearer included — the API layer redacts for the # public list endpoint; `active_adapter` keeps it because the MCP needs it to reach the adapter). _ADAPTER_COLS = "workspace, adapter_id, label, url, bearer, system, active, updated_at" @@ -119,6 +162,12 @@ def __init__(self, path: str | None = None) -> None: self._conn.execute("ALTER TABLE events ADD COLUMN event_id TEXT") except sqlite3.OperationalError: pass # column already present + try: + self._conn.execute( + "ALTER TABLE automations ADD COLUMN kind TEXT NOT NULL DEFAULT 'single'" + ) + except sqlite3.OperationalError: + pass # column already present (or table created fresh with it) self._conn.commit() def ingest(self, envelope: dict[str, Any], sequence: int | None, *, source: str = "") -> bool: @@ -486,3 +535,207 @@ def _adapter(self, workspace: str, adapter_id: str) -> dict[str, Any] | None: (workspace, adapter_id), ).fetchone() return dict(row) if row is not None else None + + # ── automation registry (SSOT, append-only versions) ───────────────────────────────────── + @staticmethod + def _automation_row(row: sqlite3.Row) -> dict[str, Any]: + """Deserialize the JSON columns (name/description/plan/trigger) back into dicts.""" + rec = dict(row) + rec["name"] = _loads(rec.get("name")) + rec["plan"] = _loads(rec.get("plan")) + rec["trigger"] = _loads(rec.get("trigger")) + rec["description"] = _loads(rec["description"]) if rec.get("description") else None + return rec + + def register_automation( + self, + *, + workspace: str, + automation_id: str, + content_hash: str, + name: dict[str, Any], + plan: dict[str, Any], + trigger: dict[str, Any], + state: str = "draft", + kind: str = "single", + authored_by: str = "", + description: dict[str, Any] | None = None, + approved_by: str | None = None, + ) -> dict[str, Any]: + """Append a new version. Re-registering an identical plan (same `content_hash` as the latest + version) is an idempotent no-op — returns the existing row, no new version. Otherwise the + prior latest version is archived and marked `superseded_by` the new version.""" + with self._lock: + latest = self._conn.execute( + "SELECT version, content_hash FROM automations " + "WHERE workspace = ? AND automation_id = ? ORDER BY version DESC LIMIT 1", + (workspace, automation_id), + ).fetchone() + if latest is not None and latest["content_hash"] == content_hash: + row = self._conn.execute( + f"SELECT {_AUTOMATION_COLS} FROM automations " + "WHERE workspace = ? AND automation_id = ? AND version = ?", + (workspace, automation_id, latest["version"]), + ).fetchone() + return self._automation_row(row) + version = (latest["version"] + 1) if latest is not None else 1 + if latest is not None: + self._conn.execute( + "UPDATE automations SET superseded_by = ?, state = 'archived' " + "WHERE workspace = ? AND automation_id = ? AND version = ?", + (version, workspace, automation_id, latest["version"]), + ) + self._conn.execute( + "INSERT INTO automations (workspace, automation_id, version, content_hash, kind, name, " + "description, plan, trigger, state, authored_by, approved_by, created_at, superseded_by) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,NULL)", + ( + workspace, automation_id, version, content_hash, kind, + json.dumps(name, ensure_ascii=False), + json.dumps(description, ensure_ascii=False) if description is not None else None, + json.dumps(plan, ensure_ascii=False), + json.dumps(trigger, ensure_ascii=False), + state, authored_by, approved_by, _now(), + ), + ) + self._conn.commit() + row = self._conn.execute( + f"SELECT {_AUTOMATION_COLS} FROM automations " + "WHERE workspace = ? AND automation_id = ? AND version = ?", + (workspace, automation_id, version), + ).fetchone() + return self._automation_row(row) + + def get_automation( + self, workspace: str, automation_id: str, version: int | None = None + ) -> dict[str, Any] | None: + """A specific version, or the latest (highest version) when `version` is None.""" + with self._lock: + if version is None: + row = self._conn.execute( + f"SELECT {_AUTOMATION_COLS} FROM automations " + "WHERE workspace = ? AND automation_id = ? ORDER BY version DESC LIMIT 1", + (workspace, automation_id), + ).fetchone() + else: + row = self._conn.execute( + f"SELECT {_AUTOMATION_COLS} FROM automations " + "WHERE workspace = ? AND automation_id = ? AND version = ?", + (workspace, automation_id, version), + ).fetchone() + return self._automation_row(row) if row is not None else None + + def list_automations(self, workspace: str) -> list[dict[str, Any]]: + """The latest version of every automation in the workspace, by automation_id.""" + with self._lock: + rows = self._conn.execute( + f"SELECT {_AUTOMATION_COLS} FROM automations a WHERE workspace = ? AND version = " + "(SELECT MAX(version) FROM automations b " + " WHERE b.workspace = a.workspace AND b.automation_id = a.automation_id) " + "ORDER BY automation_id", + (workspace,), + ).fetchall() + return [self._automation_row(r) for r in rows] + + def all_automations(self) -> list[dict[str, Any]]: + """Latest version of every automation across ALL workspaces — for the dashboard.""" + with self._lock: + rows = self._conn.execute( + f"SELECT {_AUTOMATION_COLS} FROM automations a WHERE version = " + "(SELECT MAX(version) FROM automations b " + " WHERE b.workspace = a.workspace AND b.automation_id = a.automation_id) " + "ORDER BY workspace, automation_id" + ).fetchall() + return [self._automation_row(r) for r in rows] + + def active_automations(self) -> list[dict[str, Any]]: + """Every armed automation across all workspaces (state='active'). The scheduler scans these. + The supersede-on-edit invariant means the active version is always the latest one.""" + with self._lock: + rows = self._conn.execute( + f"SELECT {_AUTOMATION_COLS} FROM automations WHERE state = 'active' " + "ORDER BY workspace, automation_id" + ).fetchall() + return [self._automation_row(r) for r in rows] + + def set_automation_state( + self, + workspace: str, + automation_id: str, + version: int, + state: str, + *, + approved_by: str | None = None, + ) -> bool: + """Transition one version's lifecycle state (draft→pending_approval→active⇄paused→archived). + Records `approved_by` when supplied. Returns False if no such version exists.""" + with self._lock: + if approved_by is not None: + cur = self._conn.execute( + "UPDATE automations SET state = ?, approved_by = ? " + "WHERE workspace = ? AND automation_id = ? AND version = ?", + (state, approved_by, workspace, automation_id, version), + ) + else: + cur = self._conn.execute( + "UPDATE automations SET state = ? " + "WHERE workspace = ? AND automation_id = ? AND version = ?", + (state, workspace, automation_id, version), + ) + self._conn.commit() + return cur.rowcount > 0 + + # ── automation runs (P2 dispatcher) ────────────────────────────────────────────────────── + def start_run( + self, run_id: str, *, workspace: str, automation_id: str, version: int, content_hash: str, + fired_by: str = "", + ) -> bool: + """Open a run row (state=running). Returns False if `run_id` already exists — the caller must + then treat the fire as an idempotent replay and NOT re-execute (a re-delivered trigger).""" + with self._lock: + if self._conn.execute( + "SELECT 1 FROM automation_runs WHERE run_id = ?", (run_id,) + ).fetchone(): + return False + self._conn.execute( + "INSERT INTO automation_runs (run_id, workspace, automation_id, version, " + "content_hash, fired_by, state, started_at) VALUES (?,?,?,?,?,?, 'running', ?)", + (run_id, workspace, automation_id, version, content_hash, fired_by, _now()), + ) + self._conn.commit() + return True + + def finish_run(self, run_id: str, state: str, trace: dict[str, Any] | None) -> bool: + """Close a run with its terminal state and the executor trace. Returns False if unknown.""" + with self._lock: + cur = self._conn.execute( + "UPDATE automation_runs SET state = ?, trace = ?, ended_at = ? WHERE run_id = ?", + (state, json.dumps(trace, ensure_ascii=False) if trace is not None else None, + _now(), run_id), + ) + self._conn.commit() + return cur.rowcount > 0 + + def get_run(self, run_id: str) -> dict[str, Any] | None: + with self._lock: + row = self._conn.execute( + "SELECT run_id, workspace, automation_id, version, content_hash, fired_by, state, " + "trace, started_at, ended_at FROM automation_runs WHERE run_id = ?", + (run_id,), + ).fetchone() + if row is None: + return None + rec = dict(row) + rec["trace"] = _loads(rec.get("trace")) if rec.get("trace") else None + return rec + + def list_runs(self, workspace: str, automation_id: str, limit: int = 50) -> list[dict[str, Any]]: + """Newest-first run history for one automation (trace omitted — fetch via get_run).""" + with self._lock: + rows = self._conn.execute( + "SELECT run_id, workspace, automation_id, version, content_hash, fired_by, state, " + "started_at, ended_at FROM automation_runs " + "WHERE workspace = ? AND automation_id = ? ORDER BY started_at DESC LIMIT ?", + (workspace, automation_id, max(1, min(limit, 500))), + ).fetchall() + return [dict(r) for r in rows] diff --git a/src/nilscript/mcp/automation_tools.py b/src/nilscript/mcp/automation_tools.py new file mode 100644 index 0000000..4916056 --- /dev/null +++ b/src/nilscript/mcp/automation_tools.py @@ -0,0 +1,85 @@ +"""Agent-facing MCP tools over the control-plane Automation Registry. + +These let an agent author governed automations *by talking*: each tool is a thin, authenticated +request to the control plane (which owns the SSOT, the validator, and the approval gate). The MCP +never re-implements the registry — it relays. Auth is the registry bearer the MCP already holds to +resolve adapters (`NIL_REGISTRY_URL` / `NIL_REGISTRY_TOKEN`). + +Discipline preserved end to end: `draft` is preview-only (returns the validator verdict, no write); +`register` lands a `pending_approval` row a human approves; `run` fires only an `active` automation. +""" + +from __future__ import annotations + +import os +from typing import Any + +import httpx + + +class AutomationTools: + """HTTP relay to the control-plane `/automations/*` endpoints. Inject a client for tests.""" + + def __init__( + self, registry_url: str, token: str = "", *, client: httpx.AsyncClient | None = None, + timeout: float = 10.0, + ) -> None: + self._base = registry_url.rstrip("/") + self._headers = {"Authorization": f"Bearer {token}"} if token else {} + self._client = client + self._timeout = timeout + + @classmethod + def from_env(cls) -> AutomationTools | None: + """Build from `NIL_REGISTRY_URL`/`NIL_REGISTRY_TOKEN`, or None if no registry is configured.""" + url = os.environ.get("NIL_REGISTRY_URL", "") + if not url: + return None + return cls(url, os.environ.get("NIL_REGISTRY_TOKEN", "")) + + async def _request( + self, method: str, path: str, *, json: Any = None, params: Any = None + ) -> dict[str, Any]: + client = self._client or httpx.AsyncClient(base_url=self._base, timeout=self._timeout) + try: + resp = await client.request(method, path, json=json, params=params, headers=self._headers) + try: + return resp.json() + except ValueError: + return {"error": "non-json response", "status": resp.status_code} + except httpx.HTTPError as exc: + return {"error": f"control plane unreachable: {exc}"} + finally: + if self._client is None: + await client.aclose() + + async def draft( + self, automation_id: str, name: dict[str, Any], plan: dict[str, Any], trigger: dict[str, Any], + ) -> dict[str, Any]: + return await self._request( + "POST", "/automations/draft", + json={"automation_id": automation_id, "name": name, "plan": plan, "trigger": trigger}, + ) + + async def register( + self, automation_id: str, name: dict[str, Any], plan: dict[str, Any], trigger: dict[str, Any], + ) -> dict[str, Any]: + return await self._request( + "POST", "/automations/register", + json={"automation_id": automation_id, "name": name, "plan": plan, "trigger": trigger}, + ) + + async def approve(self, workspace: str, automation_id: str, version: int) -> dict[str, Any]: + return await self._request( + "POST", f"/automations/{workspace}/{automation_id}/{version}/state", + json={"state": "active", "approved_by": "agent"}, + ) + + async def run(self, workspace: str, automation_id: str, idempotency_key: str) -> dict[str, Any]: + return await self._request( + "POST", f"/automations/{workspace}/{automation_id}/run", + json={"idempotency_key": idempotency_key}, + ) + + async def list(self, workspace: str) -> dict[str, Any]: + return await self._request("GET", "/automations", params={"workspace": workspace}) diff --git a/src/nilscript/mcp/server.py b/src/nilscript/mcp/server.py index fa55a0a..ad791f7 100644 --- a/src/nilscript/mcp/server.py +++ b/src/nilscript/mcp/server.py @@ -138,6 +138,7 @@ def build_server( host: str = "127.0.0.1", port: int = 8765, tools_provider: ToolsProvider | None = None, + automation_tools: Any = None, ): # type: ignore[no-untyped-def] """Bind the NilTools surface onto a FastMCP server. Imports `mcp` lazily. @@ -146,6 +147,8 @@ def build_server( `tools_provider` (optional) resolves the backend per connection — pass a `TenantToolsProvider` for multi-tenant; default wraps `tools` in a `SingletonToolsProvider` (one shared backend). The skill/skeleton resources and any `dynamic_verbs` always reflect the `tools` backend (the default). + `automation_tools` (optional `AutomationTools`) adds the registry tools so an agent can author + governed automations by talking — bound only when a control-plane registry is configured. """ server = FastMCP(name, instructions=_INSTRUCTIONS, host=host, port=port) @@ -156,9 +159,61 @@ def build_server( register_dynamic_tools(server, tools, dynamic_verbs) _register_skill(server, tools) + if automation_tools is not None: + _register_automation_tools(server, automation_tools) return server +def _register_automation_tools(server: Any, auto: Any) -> None: + """Bind the Automation Registry tools: draft → register → approve → run, plus list. Each relays to + the control plane, preserving the gate (draft = preview-only; register lands pending_approval; + run fires only an active automation).""" + + async def nil_automation_draft( + automation_id: str, name: dict[str, Any], plan: dict[str, Any], trigger: dict[str, Any], + ) -> dict[str, Any]: + return await auto.draft(automation_id, name, plan, trigger) + + async def nil_automation_register( + automation_id: str, name: dict[str, Any], plan: dict[str, Any], trigger: dict[str, Any], + ) -> dict[str, Any]: + return await auto.register(automation_id, name, plan, trigger) + + async def nil_automation_approve(workspace: str, automation_id: str, version: int) -> dict[str, Any]: + return await auto.approve(workspace, automation_id, version) + + async def nil_automation_run(workspace: str, automation_id: str, idempotency_key: str) -> dict[str, Any]: + return await auto.run(workspace, automation_id, idempotency_key) + + async def nil_automation_list(workspace: str) -> dict[str, Any]: + return await auto.list(workspace) + + server.add_tool( + nil_automation_draft, name="nil_automation_draft", + description="Preview a governed automation: validate a plan (a Wosool DSL program) + trigger " + "against the live backend. NO side effect — returns the validator verdict + content hash, or a " + "structured refusal. The deterministic code decides admission, not the agent.", + ) + server.add_tool( + nil_automation_register, name="nil_automation_register", + description="Register a validated automation into the registry as pending_approval (NOT armed). " + "An owner approves it before it can run. Re-registering an identical plan is idempotent.", + ) + server.add_tool( + nil_automation_approve, name="nil_automation_approve", + description="Arm a registered automation (set it active) so it can fire. Operator-grade.", + ) + server.add_tool( + nil_automation_run, name="nil_automation_run", + description="Fire an ACTIVE automation now (manual trigger). Requires an idempotency_key so a " + "re-fire replays rather than executing twice. Returns the run with its terminal state + trace.", + ) + server.add_tool( + nil_automation_list, name="nil_automation_list", + description="List a workspace's registered automations (latest version of each). No side effect.", + ) + + def _register_tools(server: Any, provider: ToolsProvider) -> None: """Wrap each primitive with the MCP Context so the backend + per-connection session resolve from the connection: `provider.get(ctx)` picks the backend (one shared, or per-tenant from headers) @@ -392,7 +447,12 @@ def build_asgi_app( default=default, allow_insecure=allow_insecure, gate=gate, registry=make_registry_lookup(), ) - server = build_server(tools, dynamic_verbs=verbs, tools_provider=provider) + from nilscript.mcp.automation_tools import AutomationTools + + server = build_server( + tools, dynamic_verbs=verbs, tools_provider=provider, + automation_tools=AutomationTools.from_env(), + ) app = server.streamable_http_app() # MCP mounted at /mcp # A plain health route for load balancers / readiness probes (not part of MCP). diff --git a/tests/test_automation.py b/tests/test_automation.py new file mode 100644 index 0000000..46ab31b --- /dev/null +++ b/tests/test_automation.py @@ -0,0 +1,243 @@ +"""Automation Registry (P1): draft gate, content-hash version lock, append-only SSOT, lifecycle.""" + +from __future__ import annotations + +import pytest + +from nilscript.automation import ( + AutomationDefinition, + DraftResult, + content_hash, + draft_automation, + parse_trigger, + register, +) +from nilscript.automation.models import EventTrigger, ManualTrigger, ScheduleTrigger +from nilscript.controlplane.store import EventStore +from nilscript.kernel.context import SkillSpec, ValidationContext +from nilscript.kernel.models import WosoolProgram + + +# --- fixtures --------------------------------------------------------------------------------- + + +def _ctx() -> ValidationContext: + """A workspace `acme` that may create contacts via the `crm` skill — nothing else.""" + return ValidationContext( + skills={ + "crm": SkillSpec( + required_verbs=frozenset({"crm.create_contact"}), + hint_schema={ + "properties": {"name": {"type": "string"}}, + "required": ["name"], + "additionalProperties": True, + }, + ) + }, + read_verbs=frozenset(), + workspaces={"acme": frozenset({"crm.create_contact"})}, + ) + + +def _valid_plan(name: str = "Ada") -> dict: + return { + "wosool": "0.1", + "workspace": "acme", + "entry": "step_1", + "pipeline": [ + { + "id": "step_1", + "type": "action", + "skill": "crm", + "verb": "crm.create_contact", + "args": {"name": name}, + } + ], + } + + +def _name() -> dict: + return {"ar": "متابعة العملاء", "en": "Follow up leads"} + + +@pytest.fixture +def store(tmp_path) -> EventStore: + return EventStore(path=str(tmp_path / "cp.db")) + + +# --- content hash (the version lock) ---------------------------------------------------------- + + +def test_content_hash_is_deterministic_and_sensitive(): + a = WosoolProgram.model_validate(_valid_plan()) + a2 = WosoolProgram.model_validate(_valid_plan()) + b = WosoolProgram.model_validate(_valid_plan(name="Grace")) + assert content_hash(a) == content_hash(a2) # identical bytes → identical hash + assert content_hash(a) != content_hash(b) # a changed arg → different lock + assert len(content_hash(a)) == 64 + + +# --- the draft gate (deterministic lower-or-reject) ------------------------------------------- + + +def test_draft_admits_a_valid_plan(): + res = draft_automation( + automation_id="follow-up", + name=_name(), + raw_plan=_valid_plan(), + trigger={"type": "manual"}, + ctx=_ctx(), + authored_by="agent-1", + ) + assert isinstance(res, DraftResult) + assert res.ok + assert res.definition is not None + assert res.definition.state == "draft" + assert res.definition.workspace == "acme" + assert res.definition.content_hash == res.content_hash + assert isinstance(res.definition.trigger, ManualTrigger) + + +def test_draft_rejects_a_hallucinated_verb(): + plan = _valid_plan() + plan["pipeline"][0]["verb"] = "crm.fly_to_moon" # not in the skeleton + res = draft_automation( + automation_id="bad", + name=_name(), + raw_plan=plan, + trigger={"type": "manual"}, + ctx=_ctx(), + ) + assert not res.ok + assert res.definition is None + codes = {d.code for d in res.diagnostics.diagnostics} + assert "V4_UNKNOWN_SKILL" in codes + + +def test_draft_rejects_a_forward_reference(): + # step_1 consumes an output of step_2, which runs after it — a forward reference (V6). + plan = { + "wosool": "0.1", + "workspace": "acme", + "entry": "step_1", + "pipeline": [ + { + "id": "step_1", "type": "action", "skill": "crm", + "verb": "crm.create_contact", "args": {"name": "$.step_2.id"}, "next": "step_2", + }, + { + "id": "step_2", "type": "action", "skill": "crm", + "verb": "crm.create_contact", "args": {"name": "Grace"}, + }, + ], + } + res = draft_automation( + automation_id="bad-ref", + name=_name(), + raw_plan=plan, + trigger={"type": "manual"}, + ctx=_ctx(), + ) + assert not res.ok + assert "V6_FORWARD_REF" in {d.code for d in res.diagnostics.diagnostics} + + +# --- trigger union ---------------------------------------------------------------------------- + + +def test_parse_trigger_closed_union(): + assert isinstance(parse_trigger({"type": "manual"}), ManualTrigger) + assert isinstance(parse_trigger({"type": "schedule", "cron": "0 9 * * *"}), ScheduleTrigger) + assert isinstance(parse_trigger({"type": "event", "on_verb": "crm.create_lead"}), EventTrigger) + with pytest.raises(Exception): + parse_trigger({"type": "telepathy"}) + + +def test_schedule_trigger_needs_exactly_one_timing(): + with pytest.raises(Exception): + parse_trigger({"type": "schedule"}) # neither cron nor interval + with pytest.raises(Exception): + parse_trigger({"type": "schedule", "cron": "0 9 * * *", "interval_seconds": 60}) # both + + +# --- registration → SSOT (append-only versions) ---------------------------------------------- + + +def test_register_persists_and_reads_back(store): + res = draft_automation( + automation_id="follow-up", + name=_name(), + raw_plan=_valid_plan(), + trigger={"type": "manual"}, + ctx=_ctx(), + authored_by="agent-1", + ) + stored = register(store, res.definition) + assert stored.version == 1 + assert stored.state == "pending_approval" # registered, not auto-armed + assert stored.created_at # store stamped it + + fetched = store.get_automation("acme", "follow-up") + rebuilt = AutomationDefinition.from_row(fetched) + assert rebuilt.content_hash == res.content_hash + assert rebuilt.plan.workspace == "acme" + + +def test_re_registering_identical_plan_is_idempotent(store): + res = draft_automation( + automation_id="follow-up", name=_name(), raw_plan=_valid_plan(), + trigger={"type": "manual"}, ctx=_ctx(), + ) + first = register(store, res.definition) + second = register(store, res.definition) # same hash → no new version + assert first.version == second.version == 1 + assert len(store.list_automations("acme")) == 1 + + +def test_editing_creates_a_new_version_and_archives_the_old(store): + v1 = draft_automation( + automation_id="follow-up", name=_name(), raw_plan=_valid_plan(), + trigger={"type": "manual"}, ctx=_ctx(), + ) + register(store, v1.definition) + v2 = draft_automation( + automation_id="follow-up", name=_name(), raw_plan=_valid_plan(name="Grace"), + trigger={"type": "manual"}, ctx=_ctx(), + ) + stored2 = register(store, v2.definition) + assert stored2.version == 2 + + old = store.get_automation("acme", "follow-up", version=1) + assert old["state"] == "archived" + assert old["superseded_by"] == 2 + # list returns only the latest version per automation_id + listed = store.list_automations("acme") + assert len(listed) == 1 + assert listed[0]["version"] == 2 + + +# --- lifecycle -------------------------------------------------------------------------------- + + +def test_lifecycle_approve_then_pause(store): + res = draft_automation( + automation_id="follow-up", name=_name(), raw_plan=_valid_plan(), + trigger={"type": "manual"}, ctx=_ctx(), + ) + register(store, res.definition) + assert store.set_automation_state("acme", "follow-up", 1, "active", approved_by="owner@acme") + after = store.get_automation("acme", "follow-up") + assert after["state"] == "active" + assert after["approved_by"] == "owner@acme" + + assert store.set_automation_state("acme", "follow-up", 1, "paused") + assert store.get_automation("acme", "follow-up")["state"] == "paused" + + +def test_set_state_unknown_version_returns_false(store): + assert store.set_automation_state("acme", "nope", 1, "active") is False + + +def test_get_unknown_automation_returns_none(store): + assert store.get_automation("acme", "ghost") is None + assert store.list_automations("acme") == [] diff --git a/tests/test_automation_api.py b/tests/test_automation_api.py new file mode 100644 index 0000000..410fa11 --- /dev/null +++ b/tests/test_automation_api.py @@ -0,0 +1,162 @@ +"""Control-plane HTTP surface for the Automation Registry: draft → register → lifecycle. + +The skeleton provider is faked so the draft gate runs against a known verb surface with no live +adapter — the same `context_from_skeleton` path the production `_live_skeleton` feeds. +""" + +from __future__ import annotations + +import pytest +from fastapi.testclient import TestClient + +from nilscript.controlplane.app import create_app +from nilscript.controlplane.store import EventStore + +# The "live" backend surface the draft gate validates against. +_SKELETON = {"reachable": True, "conformant": True, "verbs": ["crm.create_contact"], "targets": {}} + + +def _plan(name: str = "Ada") -> dict: + return { + "wosool": "0.1", + "workspace": "acme", + "entry": "step_1", + "pipeline": [ + { + "id": "step_1", "type": "action", "skill": "crm", + "verb": "crm.create_contact", "args": {"name": name}, + } + ], + } + + +def _body(plan: dict | None = None) -> dict: + return { + "automation_id": "follow-up", + "name": {"ar": "متابعة", "en": "Follow up"}, + "plan": plan if plan is not None else _plan(), + "trigger": {"type": "manual"}, + "authored_by": "agent-1", + } + + +def _client(tmp_path, *, skeleton=_SKELETON, registry_token=None) -> TestClient: + store = EventStore(path=str(tmp_path / "cp.db")) + + async def provider(workspace: str): + return skeleton + + return TestClient(create_app(store, skeleton_provider=provider, registry_token=registry_token)) + + +# --- draft (preview, no side effect) ---------------------------------------------------------- + + +def test_draft_admits_and_returns_hash(tmp_path): + r = _client(tmp_path).post("/automations/draft", json=_body()) + assert r.status_code == 200 + out = r.json() + assert out["ok"] is True + assert len(out["content_hash"]) == 64 + assert out["definition"]["state"] == "draft" + + +def test_draft_refuses_hallucinated_verb(tmp_path): + plan = _plan() + plan["pipeline"][0]["verb"] = "crm.fly_to_moon" + r = _client(tmp_path).post("/automations/draft", json=_body(plan)) + assert r.status_code == 200 + out = r.json() + assert out["ok"] is False + assert any(d["code"] == "V4_UNKNOWN_SKILL" for d in out["refusal"]) + + +def test_draft_503_when_no_active_adapter(tmp_path): + r = _client(tmp_path, skeleton=None).post("/automations/draft", json=_body()) + assert r.status_code == 503 + + +def test_draft_400_on_missing_fields(tmp_path): + r = _client(tmp_path).post("/automations/draft", json={"automation_id": "x"}) + assert r.status_code == 400 + + +# --- register (persist to SSOT) --------------------------------------------------------------- + + +def test_register_persists_then_get_and_list(tmp_path): + c = _client(tmp_path) + r = c.post("/automations/register", json=_body()) + assert r.status_code == 200 + d = r.json()["definition"] + assert d["state"] == "pending_approval" and d["version"] == 1 + + got = c.get("/automations/acme/follow-up") + assert got.status_code == 200 + assert got.json()["content_hash"] == d["content_hash"] + + listed = c.get("/automations", params={"workspace": "acme"}).json()["automations"] + assert len(listed) == 1 and listed[0]["automation_id"] == "follow-up" + + +def test_register_refuses_invalid_plan_with_400(tmp_path): + plan = _plan() + plan["pipeline"][0]["verb"] = "crm.fly_to_moon" + r = _client(tmp_path).post("/automations/register", json=_body(plan)) + assert r.status_code == 400 + assert r.json()["ok"] is False + + +def test_register_identical_plan_is_idempotent(tmp_path): + c = _client(tmp_path) + c.post("/automations/register", json=_body()) + c.post("/automations/register", json=_body()) + assert len(c.get("/automations", params={"workspace": "acme"}).json()["automations"]) == 1 + + +# --- lifecycle -------------------------------------------------------------------------------- + + +def test_approve_then_pause(tmp_path): + c = _client(tmp_path) + c.post("/automations/register", json=_body()) + r = c.post( + "/automations/acme/follow-up/1/state", + json={"state": "active", "approved_by": "owner@acme"}, + ) + assert r.status_code == 200 + assert r.json()["automation"]["state"] == "active" + assert r.json()["automation"]["approved_by"] == "owner@acme" + + c.post("/automations/acme/follow-up/1/state", json={"state": "paused"}) + assert c.get("/automations/acme/follow-up").json()["state"] == "paused" + + +def test_state_unknown_version_404(tmp_path): + r = _client(tmp_path).post("/automations/acme/ghost/1/state", json={"state": "active"}) + assert r.status_code == 404 + + +def test_get_unknown_404(tmp_path): + assert _client(tmp_path).get("/automations/acme/ghost").status_code == 404 + + +# --- auth ------------------------------------------------------------------------------------- + + +def test_write_endpoints_require_token_when_configured(tmp_path): + c = _client(tmp_path, registry_token="s3cret") + assert c.post("/automations/draft", json=_body()).status_code == 401 + assert c.post("/automations/register", json=_body()).status_code == 401 + ok = c.post( + "/automations/register", json=_body(), + headers={"Authorization": "Bearer s3cret"}, + ) + assert ok.status_code == 200 + # reads stay public + assert c.get("/automations", params={"workspace": "acme"}).status_code == 200 + + +@pytest.mark.parametrize("verb", ["crm.create_contact"]) +def test_smoke_known_verb_admits(tmp_path, verb): + assert _client(tmp_path).post("/automations/draft", json=_body()).json()["ok"] is True diff --git a/tests/test_automation_compose.py b/tests/test_automation_compose.py new file mode 100644 index 0000000..262c70d --- /dev/null +++ b/tests/test_automation_compose.py @@ -0,0 +1,182 @@ +"""Cross-system composition (P3): per-stage validation, explicit handoff, real two-adapter run.""" + +from __future__ import annotations + +import httpx +import pytest + +from nilscript.automation.compose import ( + ComposedPlan, + Stage, + parse_composed, + run_composed, + validate_composed, +) +from nilscript.kernel.executor import LocalExecutor, RunResult + + +def _plan(verb: str, skill: str, args: dict, ws: str = "acme") -> dict: + return { + "wosool": "0.1", "workspace": ws, "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": skill, "verb": verb, "args": args}], + } + + +# --- validation: each stage against its OWN adapter's skeleton --------------------------------- + + +def test_validate_passes_when_each_stage_matches_its_adapter(): + composed = ComposedPlan( + workspace="acme", + stages=( + Stage("stage_1", "odoo", _plan("crm.create_lead", "crm", {"name": "Acme"})), + Stage("stage_2", "books", _plan("acc.create_invoice", "acc", {"ref": "$.input.lead"}), + input_from={"lead": "$.stage_1.step_1.output.state"}), + ), + ) + report = validate_composed(composed, { + "odoo": {"verbs": ["crm.create_lead"]}, + "books": {"verbs": ["acc.create_invoice"]}, + }) + assert report["ok"] is True + + +def test_validate_refuses_verb_not_on_its_adapter(): + composed = ComposedPlan( + workspace="acme", + stages=(Stage("stage_1", "odoo", _plan("crm.create_lead", "crm", {"name": "Acme"})),), + ) + # the adapter only declares accounting verbs — the crm verb has nothing to bind to + report = validate_composed(composed, {"odoo": {"verbs": ["acc.create_invoice"]}}) + assert report["ok"] is False + assert any(d["code"].startswith("V4") for d in report["stages"][0]["diagnostics"]) + + +def test_validate_refuses_forward_stage_handoff(): + composed = ComposedPlan( + workspace="acme", + stages=( + Stage("stage_1", "odoo", _plan("crm.create_lead", "crm", {"name": "$.input.x"}), + input_from={"x": "$.stage_2.step_1.output.id"}), # references a LATER stage + Stage("stage_2", "books", _plan("acc.create_invoice", "acc", {"ref": "r"})), + ), + ) + report = validate_composed(composed, { + "odoo": {"verbs": ["crm.create_lead"]}, "books": {"verbs": ["acc.create_invoice"]}, + }) + assert report["ok"] is False + assert any("not a prior stage" in e for e in report["errors"]) + + +# --- orchestration: handoff + honest stop (fake stage runner) --------------------------------- + + +async def test_handoff_threads_output_into_next_stage_input(): + seen_inputs = {} + + async def run_stage(adapter, plan, *, run_id, input): + seen_inputs[adapter] = input + # stage_1 produces an output the next stage references + return RunResult(completed=True, context={"step_1": {"output": {"id": "L-7"}}}) + + composed = ComposedPlan( + workspace="acme", + stages=( + Stage("stage_1", "odoo", _plan("crm.create_lead", "crm", {"name": "Acme"})), + Stage("stage_2", "books", _plan("acc.create_invoice", "acc", {"ref": "$.input.lead"}), + input_from={"lead": "$.stage_1.step_1.output.id"}), + ), + ) + result = await run_composed(composed, run_stage=run_stage, run_id="r1") + assert result.completed + assert seen_inputs["books"] == {"lead": "L-7"} # stage_1's id flowed into stage_2's input + + +async def test_failed_stage_halts_chain_honestly(): + ran = [] + + async def run_stage(adapter, plan, *, run_id, input): + ran.append(adapter) + if adapter == "odoo": + return RunResult(completed=False, blocked_at="step_1", refusal={"code": "SCOPE_DENIED"}) + return RunResult(completed=True) + + composed = ComposedPlan( + workspace="acme", + stages=( + Stage("stage_1", "odoo", _plan("crm.create_lead", "crm", {"name": "Acme"})), + Stage("stage_2", "books", _plan("acc.create_invoice", "acc", {"ref": "x"})), + ), + ) + result = await run_composed(composed, run_stage=run_stage, run_id="r1") + assert result.completed is False + assert result.blocked_at == "stage_1" + assert ran == ["odoo"] # the downstream stage never ran + + +def test_parse_composed_roundtrip(): + raw = { + "workspace": "acme", + "stages": [ + {"name": "stage_1", "adapter": "odoo", "plan": _plan("crm.create_lead", "crm", {"name": "A"})}, + {"name": "stage_2", "adapter": "books", + "plan": _plan("acc.create_invoice", "acc", {"ref": "$.input.l"}), + "input_from": {"l": "$.stage_1.step_1.output.id"}}, + ], + } + composed = parse_composed(raw) + assert len(composed.stages) == 2 + assert composed.stages[1].input_from == {"l": "$.stage_1.step_1.output.id"} + + +# --- real two-adapter run: a value produced on adapter A writes on adapter B ------------------- + +pocketbase_edge = pytest.importorskip("pocketbase_nil_adapter.edge") +pocketbase_system = pytest.importorskip("pocketbase_nil_adapter.system") + + +def _two_adapter_runner(): + """Two independent in-memory PocketBase backends as adapters 'a' and 'b'.""" + from nilscript.sdk.client import NilClient + from nilscript.sdk.grants import GrantRef + from nilscript.sdk.transport import NilTransport + + apps = { + name: pocketbase_edge.create_app( + pocketbase_system.FakeSystem(), pocketbase_edge.CapturingEmitter(), bearer=None + ) + for name in ("a", "b") + } + + async def run_stage(adapter, plan, *, run_id, input): + http = httpx.AsyncClient( + transport=httpx.ASGITransport(app=apps[adapter]), base_url="http://shim" + ) + transport = NilTransport(base_url="http://shim", bearer_secret="x", client=http) + grant = GrantRef.from_secret( + grant_id="g", workspace=plan["workspace"], secret="s", scopes=frozenset({"commerce.*"}) + ) + nil = NilClient(transport=transport, grant=grant) + return await LocalExecutor(nil, run_id=run_id, session_id=run_id).execute(plan, input=input) + + return run_stage + + +async def test_real_cross_adapter_handoff_executes_on_both(): + # stage_1 creates a product on adapter A; its committed `state` is handed to stage_2, which + # writes a product on adapter B named after that handed value. Proves a value produced on one + # backend genuinely drives a write on a DIFFERENT backend. + composed = ComposedPlan( + workspace="ws_demo", + stages=( + Stage("stage_1", "a", + _plan("commerce.create_product", "commerce", {"name": "قميص"}, ws="ws_demo")), + Stage("stage_2", "b", + _plan("commerce.create_product", "commerce", {"name": "$.input.from_a"}, ws="ws_demo"), + input_from={"from_a": "$.stage_1.step_1.output.state"}), + ), + ) + result = await run_composed(composed, run_stage=_two_adapter_runner(), run_id="compose-run-1") + assert result.completed is True + assert result.context["stage_1"]["step_1"]["output"]["state"] == "executed" + assert result.context["stage_2"]["step_1"]["output"]["state"] == "executed" diff --git a/tests/test_automation_compose_api.py b/tests/test_automation_compose_api.py new file mode 100644 index 0000000..b3e84a6 --- /dev/null +++ b/tests/test_automation_compose_api.py @@ -0,0 +1,108 @@ +"""Control-plane HTTP surface for cross-system composed automations: draft → register → run.""" + +from __future__ import annotations + +from fastapi.testclient import TestClient + +from nilscript.controlplane.app import create_app +from nilscript.controlplane.store import EventStore +from nilscript.kernel.executor import RunResult + +# Per-adapter skeletons: each backend declares its own verbs. +_SKELETONS = { + "odoo": {"reachable": True, "conformant": True, "verbs": ["crm.create_lead"], "targets": {}}, + "books": {"reachable": True, "conformant": True, "verbs": ["acc.create_invoice"], "targets": {}}, +} + + +def _stage(name: str, adapter: str, verb: str, skill: str, args: dict, input_from: dict | None = None) -> dict: + s = { + "name": name, "adapter": adapter, + "plan": {"wosool": "0.1", "workspace": "acme", "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": skill, "verb": verb, "args": args}]}, + } + if input_from: + s["input_from"] = input_from + return s + + +def _body() -> dict: + return { + "automation_id": "lead-to-invoice", + "name": {"ar": "من عميل إلى فاتورة", "en": "Lead to invoice"}, + "trigger": {"type": "manual"}, + "composed": { + "workspace": "acme", + "stages": [ + _stage("stage_1", "odoo", "crm.create_lead", "crm", {"name": "Acme Co"}), + _stage("stage_2", "books", "acc.create_invoice", "acc", {"ref": "$.input.lead"}, + input_from={"lead": "$.stage_1.step_1.output.state"}), + ], + }, + } + + +def _client(tmp_path, *, stage_runner=None): + store = EventStore(path=str(tmp_path / "cp.db")) + + async def adapter_skeletons(workspace: str, adapter_id: str): + return _SKELETONS.get(adapter_id) + + async def default_runner(adapter, plan, *, run_id, input): + return RunResult(completed=True, context={"step_1": {"output": {"state": "executed"}}}) + + app = create_app( + store, adapter_skeleton_provider=adapter_skeletons, + stage_runner=stage_runner or default_runner, + ) + return store, TestClient(app) + + +def test_compose_draft_admits_valid_cross_system_plan(tmp_path): + _, c = _client(tmp_path) + r = c.post("/automations/compose/draft", json=_body()) + assert r.status_code == 200 + out = r.json() + assert out["ok"] is True + assert len(out["content_hash"]) == 64 + assert out["report"]["stages"][0]["ok"] and out["report"]["stages"][1]["ok"] + + +def test_compose_draft_refuses_verb_not_on_its_adapter(tmp_path): + _, c = _client(tmp_path) + body = _body() + body["composed"]["stages"][0]["plan"]["pipeline"][0]["verb"] = "crm.fly_to_moon" + r = c.post("/automations/compose/draft", json=body) + assert r.status_code == 200 + assert r.json()["ok"] is False + + +def test_compose_register_then_run(tmp_path): + _, c = _client(tmp_path) + reg = c.post("/automations/compose/register", json=_body()) + assert reg.status_code == 200 + d = reg.json()["definition"] + assert d["kind"] == "composed" and d["state"] == "pending_approval" + + # arm it, then fire + c.post("/automations/acme/lead-to-invoice/1/state", json={"state": "active", "approved_by": "owner"}) + run = c.post("/automations/acme/lead-to-invoice/run", json={"idempotency_key": "fire-comp-1"}) + assert run.status_code == 200 + rec = run.json()["run"] + assert rec["state"] == "completed" + assert rec["trace"]["stages"][1]["name"] == "stage_2" + + +def test_compose_run_requires_active(tmp_path): + _, c = _client(tmp_path) + c.post("/automations/compose/register", json=_body()) # pending_approval, not active + r = c.post("/automations/acme/lead-to-invoice/run", json={"idempotency_key": "fire-comp-2"}) + assert r.status_code == 409 + + +def test_single_and_composed_coexist_in_registry(tmp_path): + """A composed registration must not disturb the single-plan path (kind defaults to 'single').""" + store, c = _client(tmp_path) + c.post("/automations/compose/register", json=_body()) + listed = c.get("/automations", params={"workspace": "acme"}).json()["automations"] + assert len(listed) == 1 and listed[0]["kind"] == "composed" diff --git a/tests/test_automation_dispatch.py b/tests/test_automation_dispatch.py new file mode 100644 index 0000000..b6e57dd --- /dev/null +++ b/tests/test_automation_dispatch.py @@ -0,0 +1,187 @@ +"""P2 dispatcher: fire a manual run of an armed automation through the executor, recorded in the SSOT. + +Two layers: the HTTP/orchestration path with a fake runner (gate, idempotency, run lifecycle), and a +real end-to-end run driving a LocalExecutor against the in-memory PocketBase FakeSystem. +""" + +from __future__ import annotations + +import httpx +import pytest +from fastapi.testclient import TestClient + +from nilscript.controlplane.app import create_app +from nilscript.controlplane.store import EventStore +from nilscript.kernel.executor import LocalExecutor, RunResult + + +def _plan(verb: str = "crm.create_contact", skill: str = "crm", name: str = "Ada", ws: str = "acme") -> dict: + return { + "wosool": "0.1", "workspace": ws, "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": skill, "verb": verb, "args": {"name": name}}], + } + + +def _body(plan: dict | None = None) -> dict: + return { + "automation_id": "follow-up", + "name": {"ar": "متابعة", "en": "Follow up"}, + "plan": plan if plan is not None else _plan(), + "trigger": {"type": "manual"}, + } + + +def _make(tmp_path, *, runner, verbs=("crm.create_contact",)): + store = EventStore(path=str(tmp_path / "cp.db")) + + async def provider(workspace: str): + return {"reachable": True, "conformant": True, "verbs": list(verbs), "targets": {}} + + client = TestClient(create_app(store, skeleton_provider=provider, runner=runner)) + return store, client + + +def _arm(client) -> None: + """register → approve (active) so the automation is fireable.""" + client.post("/automations/register", json=_body()) + client.post("/automations/acme/follow-up/1/state", json={"state": "active", "approved_by": "owner"}) + + +# --- gate + idempotency (fake runner) --------------------------------------------------------- + + +def test_run_refused_when_not_active(tmp_path): + calls = [] + + async def runner(plan, *, run_id): + calls.append(run_id) + return RunResult(completed=True) + + _, c = _make(tmp_path, runner=runner) + c.post("/automations/register", json=_body()) # lands pending_approval, NOT active + r = c.post("/automations/acme/follow-up/run", json={"idempotency_key": "fire-001"}) + assert r.status_code == 409 + assert calls == [] # the executor was never invoked + + +def test_run_executes_and_records_trace(tmp_path): + async def runner(plan, *, run_id): + return RunResult(completed=True, context={"step_1": {"output": {"state": "executed"}}}) + + _, c = _make(tmp_path, runner=runner) + _arm(c) + r = c.post("/automations/acme/follow-up/run", json={"idempotency_key": "fire-001"}) + assert r.status_code == 200 + run = r.json()["run"] + assert run["state"] == "completed" + assert run["run_id"] == "follow-up:v1:fire-001" + assert run["trace"]["completed"] is True + + +def test_refire_same_key_replays_without_re_executing(tmp_path): + calls = [] + + async def runner(plan, *, run_id): + calls.append(run_id) + return RunResult(completed=True) + + _, c = _make(tmp_path, runner=runner) + _arm(c) + c.post("/automations/acme/follow-up/run", json={"idempotency_key": "fire-001"}) + again = c.post("/automations/acme/follow-up/run", json={"idempotency_key": "fire-001"}) + assert again.json().get("replayed") is True + assert len(calls) == 1 # executed exactly once despite two fires + + +def test_run_requires_idempotency_key(tmp_path): + async def runner(plan, *, run_id): + return RunResult(completed=True) + + _, c = _make(tmp_path, runner=runner) + _arm(c) + assert c.post("/automations/acme/follow-up/run", json={}).status_code == 400 + + +def test_run_unknown_automation_404(tmp_path): + async def runner(plan, *, run_id): + return RunResult(completed=True) + + _, c = _make(tmp_path, runner=runner) + r = c.post("/automations/acme/ghost/run", json={"idempotency_key": "fire-001"}) + assert r.status_code == 404 + + +def test_runner_failure_is_recorded_as_failed(tmp_path): + async def runner(plan, *, run_id): + raise RuntimeError("adapter exploded") + + _, c = _make(tmp_path, runner=runner) + _arm(c) + r = c.post("/automations/acme/follow-up/run", json={"idempotency_key": "fire-x"}) + assert r.status_code == 500 + assert r.json()["run"]["state"] == "failed" + + +def test_run_history_and_detail(tmp_path): + async def runner(plan, *, run_id): + return RunResult(completed=True, context={"k": "v"}) + + _, c = _make(tmp_path, runner=runner) + _arm(c) + c.post("/automations/acme/follow-up/run", json={"idempotency_key": "fire-001"}) + runs = c.get("/automations/acme/follow-up/runs").json()["runs"] + assert len(runs) == 1 and runs[0]["state"] == "completed" + detail = c.get("/runs/follow-up:v1:fire-001") + assert detail.status_code == 200 + assert detail.json()["trace"]["context"] == {"k": "v"} + + +# --- real end-to-end run against the in-memory PocketBase shim --------------------------------- + +pocketbase_edge = pytest.importorskip("pocketbase_nil_adapter.edge") +pocketbase_system = pytest.importorskip("pocketbase_nil_adapter.system") + + +def _shim_runner(): + from nilscript.sdk.client import NilClient + from nilscript.sdk.grants import GrantRef + from nilscript.sdk.transport import NilTransport + + async def runner(plan, *, run_id): + app = pocketbase_edge.create_app( + pocketbase_system.FakeSystem(), pocketbase_edge.CapturingEmitter(), bearer=None + ) + http = httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url="http://shim") + transport = NilTransport(base_url="http://shim", bearer_secret="x", client=http) + grant = GrantRef.from_secret( + grant_id="g", workspace=plan["workspace"], secret="s", scopes=frozenset({"commerce.*"}) + ) + nil = NilClient(transport=transport, grant=grant) + return await LocalExecutor(nil, run_id=run_id, session_id=run_id).execute(plan) + + return runner + + +def test_end_to_end_real_executor_reaches_executed(tmp_path): + plan = _plan(verb="commerce.create_product", skill="commerce", name="قميص", ws="ws_demo") + store = EventStore(path=str(tmp_path / "cp.db")) + + async def provider(workspace: str): + return {"reachable": True, "conformant": True, "verbs": ["commerce.create_product"], "targets": {}} + + c = TestClient(create_app(store, skeleton_provider=provider, runner=_shim_runner())) + body = { + "automation_id": "new-product", + "name": {"ar": "منتج", "en": "Product"}, + "plan": plan, + "trigger": {"type": "manual"}, + } + assert c.post("/automations/register", json=body).status_code == 200 + c.post("/automations/ws_demo/new-product/1/state", json={"state": "active", "approved_by": "owner"}) + + r = c.post("/automations/ws_demo/new-product/run", json={"idempotency_key": "fire-real-1"}) + assert r.status_code == 200 + run = r.json()["run"] + assert run["state"] == "completed" + # the executor genuinely walked the plan and the action reached `executed` in the SSOT + assert run["trace"]["context"]["step_1"]["output"]["state"] == "executed" diff --git a/tests/test_automation_scheduler.py b/tests/test_automation_scheduler.py new file mode 100644 index 0000000..a821fa7 --- /dev/null +++ b/tests/test_automation_scheduler.py @@ -0,0 +1,209 @@ +"""Triggers + scheduler (P2): event-driven and interval-scheduled fires, with the loop guard.""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import pytest +from fastapi.testclient import TestClient + +from nilscript.automation import ( + dispatch_event, + draft_automation, + register, + run_due_schedules, +) +from nilscript.automation.models import ScheduleTrigger +from nilscript.automation.triggers import cron_matches, event_matches, schedule_due +from nilscript.controlplane.app import create_app +from nilscript.controlplane.store import EventStore +from nilscript.kernel.context import SkillSpec, ValidationContext +from nilscript.kernel.executor import RunResult + + +def _ctx() -> ValidationContext: + return ValidationContext( + skills={"crm": SkillSpec(frozenset({"crm.create_contact"}), + {"properties": {"name": {"type": "string"}}, "required": ["name"]})}, + read_verbs=frozenset(), workspaces={"acme": frozenset({"crm.create_contact"})}, + ) + + +def _plan(ws: str = "acme") -> dict: + return { + "wosool": "0.1", "workspace": ws, "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": "crm", + "verb": "crm.create_contact", "args": {"name": "Ada"}}], + } + + +def _arm(store, trigger: dict, *, automation_id: str = "auto-1", ws: str = "acme") -> None: + res = draft_automation(automation_id=automation_id, name={"ar": "x", "en": "x"}, + raw_plan=_plan(ws), trigger=trigger, ctx=_ctx()) + d = register(store, res.definition) + store.set_automation_state(ws, automation_id, d.version, "active", approved_by="owner") + + +def _store(tmp_path) -> EventStore: + return EventStore(path=str(tmp_path / "cp.db")) + + +def _event(verb="crm.create_lead", event="executed", grant="g", ws="acme", eid="ev1", args=None): + return {"id": eid, "workspace": ws, "grant": grant, + "body": {"event": event, "verb": verb, "args": args or {}}} + + +def _counting_runner(): + calls = [] + + async def runner(plan, *, run_id): + calls.append(run_id) + return RunResult(completed=True) + + return runner, calls + + +# --- pure trigger logic ----------------------------------------------------------------------- + + +def test_event_matches(): + from nilscript.automation.models import parse_trigger + t = parse_trigger({"type": "event", "on_verb": "crm.create_lead", "match": {"stage": "new"}}) + assert event_matches(t, _event(args={"stage": "new"})) + assert not event_matches(t, _event(verb="crm.create_contact", args={"stage": "new"})) + assert not event_matches(t, _event(event="refused", args={"stage": "new"})) + assert not event_matches(t, _event(args={"stage": "won"})) + + +def test_schedule_due_interval_and_cron(): + interval = ScheduleTrigger(type="schedule", interval_seconds=60) + now = datetime(2026, 1, 1, 12, 0, 0, tzinfo=UTC) + assert schedule_due(interval, None, now) is True # never run → due + assert schedule_due(interval, now.isoformat(), now) is False # just ran → not due + assert schedule_due(interval, (now - timedelta(seconds=90)).isoformat(), now) is True + cron = ScheduleTrigger(type="schedule", cron="0 9 * * *") + assert schedule_due(cron, None, now) is False # 12:00 ≠ 09:00 → not due + + +def test_cron_matches_subset(): + nine_am = datetime(2026, 1, 5, 9, 0, 0, tzinfo=UTC) # 2026-01-05 is a Monday + assert cron_matches("0 9 * * *", nine_am) is True + assert cron_matches("0 9 * * *", nine_am.replace(minute=1)) is False + assert cron_matches("0 9 * * *", nine_am.replace(hour=10)) is False + assert cron_matches("*/5 * * * *", nine_am.replace(minute=10)) is True + assert cron_matches("*/5 * * * *", nine_am.replace(minute=12)) is False + assert cron_matches("0 0 * * 1", nine_am.replace(hour=0, minute=0)) is True # Monday=1 + assert cron_matches("0 0 * * 2", nine_am.replace(hour=0, minute=0)) is False # not Tuesday + + +def test_schedule_due_cron_fires_once_per_minute(): + at_nine = datetime(2026, 1, 5, 9, 0, 30, tzinfo=UTC) + cron = ScheduleTrigger(type="schedule", cron="0 9 * * *") + assert schedule_due(cron, None, at_nine) is True # matching minute, never run + assert schedule_due(cron, at_nine.isoformat(), at_nine) is False # already ran this minute + earlier = at_nine.replace(hour=8).isoformat() + assert schedule_due(cron, earlier, at_nine) is True # last run an earlier minute + off_minute = at_nine.replace(minute=1) + assert schedule_due(cron, None, off_minute) is False # 09:01 ≠ cron + + +# --- event dispatch --------------------------------------------------------------------------- + + +async def test_dispatch_fires_matching_active_automation(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + _arm(store, {"type": "event", "on_verb": "crm.create_lead"}) + fired = await dispatch_event(store, _event(), runner=runner) + assert len(fired) == 1 and fired[0]["ok"] + assert len(calls) == 1 + + +async def test_dispatch_skips_non_matching(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + _arm(store, {"type": "event", "on_verb": "crm.create_lead"}) + await dispatch_event(store, _event(verb="crm.delete_lead"), runner=runner) + assert calls == [] + + +async def test_dispatch_loop_guard_skips_control_plane_events(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + _arm(store, {"type": "event", "on_verb": "crm.create_lead"}) + await dispatch_event(store, _event(grant="control-plane"), runner=runner) + assert calls == [] # a triggered run's own events never re-trigger + + +async def test_dispatch_ignores_non_event_triggers(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + _arm(store, {"type": "manual"}) + await dispatch_event(store, _event(), runner=runner) + assert calls == [] + + +async def test_dispatch_is_idempotent_on_event_id(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + _arm(store, {"type": "event", "on_verb": "crm.create_lead"}) + await dispatch_event(store, _event(eid="ev-42"), runner=runner) + again = await dispatch_event(store, _event(eid="ev-42"), runner=runner) + assert again[0].get("replayed") is True + assert len(calls) == 1 # same event id → one execution + + +# --- scheduled dispatch ----------------------------------------------------------------------- + + +async def test_run_due_schedules_fires_then_not_due(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + _arm(store, {"type": "schedule", "interval_seconds": 1}) + fired = await run_due_schedules(store, runner=runner, now=datetime.now(UTC)) + assert len(fired) == 1 and len(calls) == 1 + # immediately again: the just-created run makes it not yet due + again = await run_due_schedules(store, runner=runner, now=datetime.now(UTC)) + assert again == [] and len(calls) == 1 + + +async def test_run_due_schedules_fires_again_after_interval(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + _arm(store, {"type": "schedule", "interval_seconds": 1}) + await run_due_schedules(store, runner=runner, now=datetime.now(UTC)) + later = datetime.now(UTC) + timedelta(seconds=5) + await run_due_schedules(store, runner=runner, now=later) + assert len(calls) == 2 + + +# --- tick endpoint ---------------------------------------------------------------------------- + + +def test_tick_endpoint_fires_due_schedules(tmp_path): + store = _store(tmp_path) + runner, _ = _counting_runner() + + async def provider(workspace: str): + return {"reachable": True, "conformant": True, "verbs": ["crm.create_contact"], "targets": {}} + + _arm(store, {"type": "schedule", "interval_seconds": 1}) + c = TestClient(create_app(store, skeleton_provider=provider, runner=runner)) + r = c.post("/automations/tick") + assert r.status_code == 200 + assert r.json()["fired"] == 1 + + +def test_tick_endpoint_fires_cron_due_now(tmp_path): + store = _store(tmp_path) + runner, calls = _counting_runner() + + async def provider(workspace: str): + return {"reachable": True, "conformant": True, "verbs": ["crm.create_contact"], "targets": {}} + + _arm(store, {"type": "schedule", "cron": "* * * * *"}) # every minute → always due now + c = TestClient(create_app(store, skeleton_provider=provider, runner=runner)) + assert c.post("/automations/tick").json()["fired"] == 1 + # a second tick in the same minute replays (idempotent) — no second execution + c.post("/automations/tick") + assert len(calls) == 1 diff --git a/tests/test_automation_ui.py b/tests/test_automation_ui.py new file mode 100644 index 0000000..f687145 --- /dev/null +++ b/tests/test_automation_ui.py @@ -0,0 +1,61 @@ +"""The control-plane dashboard surfaces automations: the /api/automations feed + the rendered panel.""" + +from __future__ import annotations + +from fastapi.testclient import TestClient + +from nilscript.controlplane.app import create_app +from nilscript.controlplane.store import EventStore + + +def _plan() -> dict: + return { + "wosool": "0.1", "workspace": "acme", "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": "crm", + "verb": "crm.create_contact", "args": {"name": "Ada"}}], + } + + +def _client(tmp_path): + store = EventStore(path=str(tmp_path / "cp.db")) + + async def provider(workspace: str): + return {"reachable": True, "conformant": True, "verbs": ["crm.create_contact"], "targets": {}} + + return TestClient(create_app(store, skeleton_provider=provider)) + + +def test_api_automations_feed(tmp_path): + c = _client(tmp_path) + assert c.get("/api/automations").json()["automations"] == [] # empty before any registration + + c.post("/automations/register", json={ + "automation_id": "follow-up", "name": {"ar": "x", "en": "Follow up"}, + "plan": _plan(), "trigger": {"type": "schedule", "cron": "0 9 * * *"}, + }) + feed = c.get("/api/automations").json()["automations"] + assert len(feed) == 1 + a = feed[0] + assert a["state"] == "pending_approval" + assert a["kind"] == "single" + assert a["trigger"]["cron"] == "0 9 * * *" + assert a["plan_summary"] == {"nodes": 1} + assert "plan" not in a # heavy plan summarised, not shipped + + +def test_dashboard_html_includes_automations_panel(tmp_path): + html = _client(tmp_path).get("/").text + assert "id=automations" in html + assert "Automations" in html + assert "loadAutomations" in html + assert "operator token" in html # the token-gated control affordance + + +def test_api_automations_lists_across_workspaces(tmp_path): + c = _client(tmp_path) + c.post("/automations/register", json={ + "automation_id": "a1", "name": {"ar": "x", "en": "x"}, + "plan": _plan(), "trigger": {"type": "manual"}, + }) + feed = c.get("/api/automations").json()["automations"] + assert [a["automation_id"] for a in feed] == ["a1"] diff --git a/tests/test_mcp_automation_tools.py b/tests/test_mcp_automation_tools.py new file mode 100644 index 0000000..af50453 --- /dev/null +++ b/tests/test_mcp_automation_tools.py @@ -0,0 +1,80 @@ +"""The agent-facing MCP automation tools drive the real control-plane registry end to end. + +The AutomationTools HTTP client is backed by the actual control-plane ASGI app (no network), with an +injected skeleton provider + fake runner — so this proves: MCP tool → CP endpoint → validator → SSOT. +""" + +from __future__ import annotations + +import httpx +import pytest + +from nilscript.controlplane.app import create_app +from nilscript.controlplane.store import EventStore +from nilscript.kernel.executor import RunResult +from nilscript.mcp.automation_tools import AutomationTools + + +def _plan(name: str = "Ada") -> dict: + return { + "wosool": "0.1", "workspace": "acme", "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": "crm", + "verb": "crm.create_contact", "args": {"name": name}}], + } + + +@pytest.fixture +def tools(tmp_path) -> AutomationTools: + store = EventStore(path=str(tmp_path / "cp.db")) + + async def provider(workspace: str): + return {"reachable": True, "conformant": True, "verbs": ["crm.create_contact"], "targets": {}} + + async def runner(plan, *, run_id): + return RunResult(completed=True, context={"step_1": {"output": {"state": "executed"}}}) + + app = create_app(store, skeleton_provider=provider, runner=runner) + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url="http://cp") + return AutomationTools("http://cp", "", client=client) + + +_NAME = {"ar": "متابعة", "en": "Follow up"} +_TRIGGER = {"type": "manual"} + + +async def test_draft_tool_previews(tools): + out = await tools.draft("follow-up", _NAME, _plan(), _TRIGGER) + assert out["ok"] is True + assert len(out["content_hash"]) == 64 + + +async def test_draft_tool_refuses_hallucinated_verb(tools): + plan = _plan() + plan["pipeline"][0]["verb"] = "crm.fly_to_moon" + out = await tools.draft("bad", _NAME, plan, _TRIGGER) + assert out["ok"] is False + assert any(d["code"] == "V4_UNKNOWN_SKILL" for d in out["refusal"]) + + +async def test_full_agent_flow_draft_register_approve_run(tools): + # the whole "by talking" path, through the agent tool surface + assert (await tools.draft("follow-up", _NAME, _plan(), _TRIGGER))["ok"] is True + + reg = await tools.register("follow-up", _NAME, _plan(), _TRIGGER) + d = reg["definition"] + assert d["state"] == "pending_approval" and d["version"] == 1 + + approved = await tools.approve("acme", "follow-up", 1) + assert approved["automation"]["state"] == "active" + + run = await tools.run("acme", "follow-up", "fire-tool-1") + assert run["run"]["state"] == "completed" + + listed = await tools.list("acme") + assert len(listed["automations"]) == 1 + + +async def test_run_before_approve_is_refused(tools): + await tools.register("follow-up", _NAME, _plan(), _TRIGGER) # pending_approval, not active + run = await tools.run("acme", "follow-up", "fire-tool-2") + assert run.get("ok") is False # 409 envelope from fire_manual From a6d483ac66cd7733cc0e1e2af96b4d307c763ce3 Mon Sep 17 00:00:00 2001 From: AI Bot Date: Wed, 24 Jun 2026 16:38:06 +0300 Subject: [PATCH 2/3] feat(automation): multi-active adapters + MCP cross-system compose tool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Enable several adapters in one workspace at once (e.g. PocketBase + Odoo) so a cross-system automation can run between two backends linked via NIL/MCP: - store.set_adapter_active: non-exclusive enable/disable (exclusive activate_adapter kept for single-backend default routing); active_adapter() returns the most-recent active as the default — composition addresses adapters by id, so multi-active never makes a plain propose/commit ambiguous. - control plane: POST /adapters/{ws}/{id}/enable|disable (token-gated) + per-adapter Enable/Disable toggles in the MCP-routing panel (operator-token gated). - MCP: nil_automation_compose_register so an agent can author a two-system automation by talking (stages validated against each adapter's own live skeleton). +5 tests; full suite green (372). --- src/nilscript/controlplane/app.py | 32 ++++++++- src/nilscript/controlplane/store.py | 19 +++++- src/nilscript/mcp/automation_tools.py | 16 +++++ src/nilscript/mcp/server.py | 12 ++++ tests/test_multi_adapter.py | 95 +++++++++++++++++++++++++++ 5 files changed, 171 insertions(+), 3 deletions(-) create mode 100644 tests/test_multi_adapter.py diff --git a/src/nilscript/controlplane/app.py b/src/nilscript/controlplane/app.py index 7766c2c..03981e6 100644 --- a/src/nilscript/controlplane/app.py +++ b/src/nilscript/controlplane/app.py @@ -320,6 +320,25 @@ def activate_adapter(workspace: str, adapter_id: str, authorization: str | None return JSONResponse({"error": "no such adapter"}, status_code=404) return {"ok": True, "workspace": workspace, "adapter_id": adapter_id} + @app.post("/adapters/{workspace}/{adapter_id}/enable") + def enable_adapter(workspace: str, adapter_id: str, authorization: str | None = Header(default=None)) -> Any: + """Enable an adapter WITHOUT deactivating siblings — several can be active at once (e.g. + PocketBase + Odoo for a cross-system automation). Operator-gated.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + if not store.set_adapter_active(workspace, adapter_id, True): + return JSONResponse({"error": "no such adapter"}, status_code=404) + return {"ok": True, "workspace": workspace, "adapter_id": adapter_id, "active": True} + + @app.post("/adapters/{workspace}/{adapter_id}/disable") + def disable_adapter(workspace: str, adapter_id: str, authorization: str | None = Header(default=None)) -> Any: + """Disable one adapter (leaves siblings untouched). Operator-gated.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + if not store.set_adapter_active(workspace, adapter_id, False): + return JSONResponse({"error": "no such adapter"}, status_code=404) + return {"ok": True, "workspace": workspace, "adapter_id": adapter_id, "active": False} + @app.get("/adapters") def list_adapters(workspace: str = "") -> dict[str, Any]: """List a workspace's registered adapters for the UI — bearer REDACTED (public read).""" @@ -1076,22 +1095,33 @@ def index() -> str: function host(u){try{return new URL(u).host;}catch(e){return u||'';}} async function loadRouting(){ try{ - const r=await fetch('/api/registry');const {adapters}=await r.json(); + const r=await fetch('/api/registry');const data=await r.json();const adapters=data.adapters||[];const ws=data.workspace||''; const wrap=document.getElementById('routingWrap'),box=document.getElementById('routing'); document.getElementById('rcount').textContent=adapters.length; wrap.style.display=adapters.length?'block':'none'; box.innerHTML=adapters.map(a=>{ const on=!!a.active; + const tgl=on + ? abtn('⏻ Disable','ghost',`adapterToggle('${esc(ws)}','${esc(a.adapter_id)}',false)`) + : abtn('⏼ Enable','ok',`adapterToggle('${esc(ws)}','${esc(a.adapter_id)}',true)`); return `
${esc(a.label||a.adapter_id)} ${esc(a.system||a.adapter_id)} ${esc(host(a.url))} ${on?'● active':'idle'} + ${tgl}
`; }).join(''); }catch(_){} } +async function adapterToggle(ws,id,enable){ + if(!tokenVal())return toast('Enter the operator token (Automations section) to toggle adapters.'); + try{const r=await fetch(`/adapters/${ws}/${id}/${enable?'enable':'disable'}`,{method:'POST',headers:authHeaders()}); + if(r.status===401)return toast('Operator token rejected.'); + toast(enable?'Adapter enabled — several can be active at once.':'Adapter disabled.');loadRouting(); + }catch(_){toast('Could not toggle the adapter.');} +} function applyThemeGlyph(){var b=document.getElementById('themeBtn');if(b)b.textContent=document.documentElement.getAttribute('data-theme')==='light'?'☀':'☾';} function toggleTheme(){var next=document.documentElement.getAttribute('data-theme')==='light'?'dark':'light';document.documentElement.setAttribute('data-theme',next);try{localStorage.setItem('cp-theme',next);}catch(e){}applyThemeGlyph();} diff --git a/src/nilscript/controlplane/store.py b/src/nilscript/controlplane/store.py index 1a959a5..35148fa 100644 --- a/src/nilscript/controlplane/store.py +++ b/src/nilscript/controlplane/store.py @@ -509,11 +509,26 @@ def activate_adapter(self, workspace: str, adapter_id: str) -> bool: self._conn.commit() return True + def set_adapter_active(self, workspace: str, adapter_id: str, enabled: bool) -> bool: + """Enable/disable ONE adapter without touching its siblings (non-exclusive). This is what lets + a workspace have several adapters active at once — e.g. PocketBase + Odoo for a cross-system + automation. Returns False if no such adapter is registered.""" + with self._lock: + cur = self._conn.execute( + "UPDATE adapters SET active = ?, updated_at = ? WHERE workspace = ? AND adapter_id = ?", + (1 if enabled else 0, _now(), workspace, adapter_id), + ) + self._conn.commit() + return cur.rowcount > 0 + def active_adapter(self, workspace: str) -> dict[str, Any] | None: - """The workspace's active adapter (WITH bearer — the MCP needs it), or None.""" + """The workspace's default active adapter for single-backend MCP routing (WITH bearer), or + None. With several active, the most-recently-updated wins — composition addresses adapters by + id, so multi-active never makes a plain propose/commit ambiguous.""" with self._lock: row = self._conn.execute( - f"SELECT {_ADAPTER_COLS} FROM adapters WHERE workspace = ? AND active = 1 LIMIT 1", + f"SELECT {_ADAPTER_COLS} FROM adapters WHERE workspace = ? AND active = 1 " + "ORDER BY updated_at DESC LIMIT 1", (workspace,), ).fetchone() return dict(row) if row is not None else None diff --git a/src/nilscript/mcp/automation_tools.py b/src/nilscript/mcp/automation_tools.py index 4916056..bbb9ac4 100644 --- a/src/nilscript/mcp/automation_tools.py +++ b/src/nilscript/mcp/automation_tools.py @@ -69,6 +69,22 @@ async def register( json={"automation_id": automation_id, "name": name, "plan": plan, "trigger": trigger}, ) + async def compose_draft( + self, automation_id: str, name: dict[str, Any], composed: dict[str, Any], trigger: dict[str, Any], + ) -> dict[str, Any]: + return await self._request( + "POST", "/automations/compose/draft", + json={"automation_id": automation_id, "name": name, "composed": composed, "trigger": trigger}, + ) + + async def compose_register( + self, automation_id: str, name: dict[str, Any], composed: dict[str, Any], trigger: dict[str, Any], + ) -> dict[str, Any]: + return await self._request( + "POST", "/automations/compose/register", + json={"automation_id": automation_id, "name": name, "composed": composed, "trigger": trigger}, + ) + async def approve(self, workspace: str, automation_id: str, version: int) -> dict[str, Any]: return await self._request( "POST", f"/automations/{workspace}/{automation_id}/{version}/state", diff --git a/src/nilscript/mcp/server.py b/src/nilscript/mcp/server.py index ad791f7..d774a46 100644 --- a/src/nilscript/mcp/server.py +++ b/src/nilscript/mcp/server.py @@ -179,6 +179,11 @@ async def nil_automation_register( ) -> dict[str, Any]: return await auto.register(automation_id, name, plan, trigger) + async def nil_automation_compose_register( + automation_id: str, name: dict[str, Any], composed: dict[str, Any], trigger: dict[str, Any], + ) -> dict[str, Any]: + return await auto.compose_register(automation_id, name, composed, trigger) + async def nil_automation_approve(workspace: str, automation_id: str, version: int) -> dict[str, Any]: return await auto.approve(workspace, automation_id, version) @@ -199,6 +204,13 @@ async def nil_automation_list(workspace: str) -> dict[str, Any]: description="Register a validated automation into the registry as pending_approval (NOT armed). " "An owner approves it before it can run. Re-registering an identical plan is idempotent.", ) + server.add_tool( + nil_automation_compose_register, name="nil_automation_compose_register", + description="Register a CROSS-SYSTEM automation: `composed` = {workspace, stages:[{name, " + "adapter, plan, input_from}]}, each stage validated against ITS adapter's live skeleton. " + "Handoffs between stages are explicit ($.stage_1.step_2.output.id → next stage's $.input.*). " + "Lands pending_approval. Use to wire two backends (e.g. PocketBase → Odoo) into one workflow.", + ) server.add_tool( nil_automation_approve, name="nil_automation_approve", description="Arm a registered automation (set it active) so it can fire. Operator-grade.", diff --git a/tests/test_multi_adapter.py b/tests/test_multi_adapter.py new file mode 100644 index 0000000..4019dc9 --- /dev/null +++ b/tests/test_multi_adapter.py @@ -0,0 +1,95 @@ +"""Multiple adapters active at once (PocketBase + Odoo) and authoring a cross-system automation +through the MCP compose tool.""" + +from __future__ import annotations + +import httpx + +from nilscript.controlplane.app import create_app +from nilscript.controlplane.store import EventStore +from nilscript.mcp.automation_tools import AutomationTools + + +def _store(tmp_path, *, two=True) -> EventStore: + s = EventStore(path=str(tmp_path / "cp.db")) + s.register_adapter("acme", "pocket", url="http://pocket", system="pocketbase") + if two: + s.register_adapter("acme", "odoo", url="http://odoo", system="odoo") + return s + + +def _stage(name: str, adapter: str, verb: str, skill: str, args: dict, input_from: dict | None = None) -> dict: + s = {"name": name, "adapter": adapter, + "plan": {"wosool": "0.1", "workspace": "acme", "entry": "step_1", + "pipeline": [{"id": "step_1", "type": "action", "skill": skill, "verb": verb, "args": args}]}} + if input_from: + s["input_from"] = input_from + return s + + +# --- multiple adapters active simultaneously -------------------------------------------------- + + +def test_two_adapters_active_at_once(tmp_path): + s = _store(tmp_path) + assert s.set_adapter_active("acme", "pocket", True) + assert s.set_adapter_active("acme", "odoo", True) + active = [a for a in s.list_adapters("acme") if a["active"]] + assert {a["adapter_id"] for a in active} == {"pocket", "odoo"} # BOTH on + assert s.active_adapter("acme") is not None # singular default still resolves + + +def test_disable_one_leaves_the_other(tmp_path): + s = _store(tmp_path) + s.set_adapter_active("acme", "pocket", True) + s.set_adapter_active("acme", "odoo", True) + s.set_adapter_active("acme", "pocket", False) + active = [a for a in s.list_adapters("acme") if a["active"]] + assert [a["adapter_id"] for a in active] == ["odoo"] + + +def test_set_active_unknown_adapter(tmp_path): + assert _store(tmp_path).set_adapter_active("acme", "ghost", True) is False + + +def test_enable_disable_endpoints_are_token_gated(tmp_path): + c = __import__("fastapi.testclient", fromlist=["TestClient"]).TestClient( + create_app(_store(tmp_path), registry_token="t") + ) + assert c.post("/adapters/acme/pocket/enable").status_code == 401 # no token + ok = c.post("/adapters/acme/pocket/enable", headers={"Authorization": "Bearer t"}) + assert ok.status_code == 200 and ok.json()["active"] is True + off = c.post("/adapters/acme/pocket/disable", headers={"Authorization": "Bearer t"}) + assert off.json()["active"] is False + assert c.post("/adapters/acme/ghost/enable", headers={"Authorization": "Bearer t"}).status_code == 404 + + +# --- author a cross-system automation through the MCP compose tool ----------------------------- + + +async def test_mcp_compose_tool_registers_two_system_automation(tmp_path): + store = _store(tmp_path) + skeletons = { + "pocket": {"reachable": True, "conformant": True, "verbs": ["crm.create_lead"], "targets": {}}, + "odoo": {"reachable": True, "conformant": True, "verbs": ["acc.create_invoice"], "targets": {}}, + } + + async def adapter_skeletons(workspace: str, adapter_id: str): + return skeletons.get(adapter_id) + + app = create_app(store, adapter_skeleton_provider=adapter_skeletons) + client = httpx.AsyncClient(transport=httpx.ASGITransport(app=app), base_url="http://cp") + tools = AutomationTools("http://cp", "", client=client) + + composed = {"workspace": "acme", "stages": [ + _stage("stage_1", "pocket", "crm.create_lead", "crm", {"name": "Acme"}), + _stage("stage_2", "odoo", "acc.create_invoice", "acc", {"ref": "$.input.lead"}, + input_from={"lead": "$.stage_1.step_1.output.state"}), + ]} + out = await tools.compose_register("lead-to-invoice", {"ar": "x", "en": "Lead→Invoice"}, + composed, {"type": "manual"}) + assert out["ok"] is True + assert out["definition"]["kind"] == "composed" + # both backends are referenced in the one registered automation + listed = (await tools.list("acme"))["automations"] + assert len(listed) == 1 and listed[0]["kind"] == "composed" From b96a73117dd885c5397e81e89a0c69209be8f47c Mon Sep 17 00:00:00 2001 From: AI Bot Date: Wed, 24 Jun 2026 16:42:12 +0300 Subject: [PATCH 3/3] feat(automation): in-UI compose form for cross-system automations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A one-click builder in the control-plane dashboard: pick adapter A + verb, adapter B + verb, declare the handoff, and register a two-system automation — no agent round-trip needed. Verb dropdowns load live from each adapter's own skeleton (new token-gated GET /api/adapter-skeleton). Submit is token-gated and posts to /automations/compose/register, surfacing per-stage refusal codes. +2 tests; full suite green (374). --- src/nilscript/controlplane/app.py | 95 +++++++++++++++++++++++++++++++ tests/test_automation_ui.py | 23 ++++++++ 2 files changed, 118 insertions(+) diff --git a/src/nilscript/controlplane/app.py b/src/nilscript/controlplane/app.py index 03981e6..634909c 100644 --- a/src/nilscript/controlplane/app.py +++ b/src/nilscript/controlplane/app.py @@ -267,6 +267,22 @@ def pending() -> dict[str, Any]: def adapters() -> dict[str, Any]: return {"adapters": store.adapters()} + @app.get("/api/adapter-skeleton") + async def api_adapter_skeleton( + workspace: str = "", adapter_id: str = "", authorization: str | None = Header(default=None), + ) -> Any: + """The verbs (and target names) a specific adapter declares — feeds the UI compose form's verb + dropdowns. Token-gated: it triggers a live handshake using the adapter's bearer.""" + if not _registry_authed(authorization): + return JSONResponse({"error": "unauthorized"}, status_code=401) + skeleton = await adapter_skeletons(workspace, adapter_id) + if skeleton is None: + return JSONResponse({"error": "adapter not reachable/conformant"}, status_code=503) + return { + "verbs": skeleton.get("verbs", []), + "targets": sorted((skeleton.get("targets") or {}).keys()), + } + @app.get("/api/automations") def api_automations() -> dict[str, Any]: """Dashboard view of every automation (latest version, all workspaces). Public read — no @@ -827,6 +843,20 @@ def index() -> str: .runrow .rst.failed,.runrow .rst.blocked{color:#ff9a90;border-color:rgba(251,90,78,.4)} .runrow .rst.partial,.runrow .rst.compensated{color:#f0c674;border-color:rgba(224,166,41,.45)} .runrow .rst.running{color:var(--blue);border-color:rgba(91,140,255,.35)} + /* compose form */ + .cform{border:1px solid var(--line2);border-radius:var(--radius);background:var(--panel); + padding:14px 16px;display:grid;gap:10px;margin-bottom:14px} + .cform input,.cform select,.cform textarea{background:var(--elev);border:1px solid var(--line2); + border-radius:8px;color:var(--fg);font:12px var(--mono);padding:7px 10px;width:100%} + .cform textarea{min-height:46px;resize:vertical} + .cform .row2{display:grid;grid-template-columns:1fr 1fr;gap:8px} + .cform .ids{display:grid;grid-template-columns:1fr 1fr;gap:8px} + .stageblk{border:1px solid var(--line);border-radius:10px;padding:11px 12px;display:grid;gap:8px} + .stageblk .stitle{color:var(--verb);font-size:12px;font-weight:600} + .stageblk.b2{border-color:rgba(168,119,247,.3)} + .cform .handoff{display:flex;align-items:center;gap:8px;color:var(--mut);font-size:12px;flex-wrap:wrap} + .cform .handoff input{width:auto;flex:1 1 120px} + .arrow{color:var(--violet);font-weight:600} /* toast */ #toast{position:fixed;left:50%;bottom:24px;transform:translateX(-50%) translateY(20px); @@ -875,6 +905,26 @@ def index() -> str:
controls (approve / pause / run) need the registry token — view is open + +
+
@@ -1191,6 +1241,51 @@ def index() -> str: }catch(_){el.innerHTML='
could not load runs
';} } +// ── compose form: build a two-system automation in one click ──────────────────────────────────── +let cfWs=''; +function val(id){var e=document.getElementById(id);return e?e.value.trim():'';} +function toggleCompose(){const f=document.getElementById('composeForm');if(!f)return; + if(f.style.display==='none'){f.style.display='block';populateCompose();}else f.style.display='none';} +async function populateCompose(){ + try{const d=await(await fetch('/api/registry')).json();cfWs=d.workspace||'';const ads=d.adapters||[]; + const opts=''+ads.map(a=>``).join(''); + ['cf_a1','cf_a2'].forEach(id=>{const s=document.getElementById(id);if(s)s.innerHTML=opts;}); + ['cf_v1','cf_v2'].forEach(id=>{const s=document.getElementById(id);if(s)s.innerHTML='';}); + }catch(_){} +} +async function loadVerbs(vsel,adapter){ + const s=document.getElementById(vsel);if(!s)return; + if(!adapter){s.innerHTML='';return;} + if(!tokenVal()){s.innerHTML='';return;} + s.innerHTML=''; + try{const r=await fetch(`/api/adapter-skeleton?workspace=${encodeURIComponent(cfWs)}&adapter_id=${encodeURIComponent(adapter)}`,{headers:authHeaders()}); + if(r.status===401){s.innerHTML='';return;} + if(!r.ok){s.innerHTML='';return;} + const d=await r.json();const vs=d.verbs||[]; + s.innerHTML=vs.length?(''+vs.map(v=>``).join('')):''; + }catch(_){s.innerHTML='';} +} +async function submitCompose(){ + if(!tokenVal())return toast('Enter the operator token above first.'); + const id=val('cf_id'),nm=val('cf_name')||id,a1=val('cf_a1'),v1=val('cf_v1'),a2=val('cf_a2'),v2=val('cf_v2'); + if(!id||!a1||!v1||!a2||!v2)return toast('Need id + both adapters + both verbs.'); + let ar1={},ar2={}; + try{ar1=val('cf_args1')?JSON.parse(val('cf_args1')):{};ar2=val('cf_args2')?JSON.parse(val('cf_args2')):{};} + catch(e){return toast('Args must be valid JSON.');} + const stage=(n,ad,vb,ar)=>({name:n,adapter:ad,plan:{wosool:"0.1",workspace:cfWs,entry:"step_1", + pipeline:[{id:"step_1",type:"action",skill:vb.split('.')[0],verb:vb,args:ar}]}}); + const s2=stage('stage_2',a2,v2,ar2);const hk=val('cf_hk'),hr=val('cf_hr');if(hk&&hr)s2.input_from={[hk]:hr}; + const body={automation_id:id,name:{en:nm,ar:nm},trigger:{type:"manual"}, + composed:{workspace:cfWs,stages:[stage('stage_1',a1,v1,ar1),s2]}}; + try{const r=await fetch('/automations/compose/register',{method:'POST',headers:authHeaders(),body:JSON.stringify(body)}); + if(r.status===401)return toast('Operator token rejected.'); + const d=await r.json(); + if(!d.ok){const why=(d.report&&d.report.stages||[]).flatMap(s=>(s.diagnostics||[]).map(x=>x.code)).join(', ')||(d.report&&d.report.errors||[]).join(', ')||d.error||'refused'; + return toast('Refused: '+esc(why));} + toast('Cross-system automation registered — pending approval.');toggleCompose();loadAutomations(); + }catch(_){toast('Could not create the automation.');} +} + initToken();tick();pend();loadAdapters();loadRouting();loadAutomations();applyThemeGlyph(); setInterval(()=>{tick();pend();loadAdapters();loadRouting();loadAutomations();},2000); """ diff --git a/tests/test_automation_ui.py b/tests/test_automation_ui.py index f687145..8567bcd 100644 --- a/tests/test_automation_ui.py +++ b/tests/test_automation_ui.py @@ -51,6 +51,29 @@ def test_dashboard_html_includes_automations_panel(tmp_path): assert "operator token" in html # the token-gated control affordance +def test_adapter_skeleton_endpoint_feeds_the_form(tmp_path): + store = EventStore(path=str(tmp_path / "cp.db")) + + async def adapter_skeletons(workspace: str, adapter_id: str): + return {"reachable": True, "conformant": True, + "verbs": ["crm.create_lead", "crm.create_contact"], "targets": {"crm.lead": {}}} + + c = TestClient(create_app(store, adapter_skeleton_provider=adapter_skeletons, registry_token="t")) + assert c.get("/api/adapter-skeleton", params={"workspace": "acme", "adapter_id": "odoo"}).status_code == 401 + ok = c.get("/api/adapter-skeleton", params={"workspace": "acme", "adapter_id": "odoo"}, + headers={"Authorization": "Bearer t"}) + assert ok.status_code == 200 + assert ok.json()["verbs"] == ["crm.create_lead", "crm.create_contact"] + assert ok.json()["targets"] == ["crm.lead"] + + +def test_compose_form_present_in_html(tmp_path): + html = _client(tmp_path).get("/").text + assert "id=composeForm" in html + assert "submitCompose" in html and "loadVerbs" in html + assert "New cross-system automation" in html + + def test_api_automations_lists_across_workspaces(tmp_path): c = _client(tmp_path) c.post("/automations/register", json={