Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions scripts/smoke_automation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""Live smoke test for the Automation Registry — exercises the production paths the unit suite mocks.

The test suite injects fake skeleton providers and runners, so the *live default* glue is NOT covered:
the real `describe()` handshake to a live adapter, and the control-plane grant minting
(`GrantRef.from_secret(grant_id="control-plane", secret=<bearer>)`) that drives `LocalExecutor` against
a real backend. This script drives that glue end to end against a running control plane + adapter, so
an operator can confirm the demo path before relying on it.

It does NOT depend on the package — stdlib only — so it can run anywhere the control plane is reachable.

NIL_REGISTRY_URL=https://cp.example.com \
NIL_REGISTRY_TOKEN=... \
NIL_SMOKE_WORKSPACE=ws_demo \
python scripts/smoke_automation.py

The workspace must already have an ACTIVE adapter. The plan defaults to a single
`commerce.create_product` (PocketBase demo); override with NIL_SMOKE_PLAN (a JSON Wosool program) to
match your live backend's verbs. Exit code 0 = the automation ran and reached a terminal state.
"""

from __future__ import annotations

import json
import os
import sys
import time
import urllib.error
import urllib.request

BASE = os.environ.get("NIL_REGISTRY_URL", "").rstrip("/")
TOKEN = os.environ.get("NIL_REGISTRY_TOKEN", "")
WORKSPACE = os.environ.get("NIL_SMOKE_WORKSPACE", "ws_demo")
AUTOMATION_ID = os.environ.get("NIL_SMOKE_ID", "smoke-product")

_DEFAULT_PLAN = {
"wosool": "0.1", "workspace": WORKSPACE, "entry": "step_1",
"pipeline": [{"id": "step_1", "type": "action", "skill": "commerce",
"verb": "commerce.create_product", "args": {"name": "Smoke Test Product"}}],
}
PLAN = json.loads(os.environ["NIL_SMOKE_PLAN"]) if os.environ.get("NIL_SMOKE_PLAN") else _DEFAULT_PLAN
NAME = {"ar": "اختبار دخان", "en": "Smoke test"}


def _call(method: str, path: str, body: dict | None = None) -> tuple[int, dict]:
data = json.dumps(body).encode() if body is not None else None
req = urllib.request.Request(f"{BASE}{path}", data=data, method=method) # noqa: S310
req.add_header("Content-Type", "application/json")
if TOKEN:
req.add_header("Authorization", f"Bearer {TOKEN}")
try:
with urllib.request.urlopen(req, timeout=20) as resp: # noqa: S310
return resp.status, json.loads(resp.read().decode() or "{}")
except urllib.error.HTTPError as exc:
try:
return exc.code, json.loads(exc.read().decode() or "{}")
except ValueError:
return exc.code, {}
except (urllib.error.URLError, TimeoutError, OSError) as exc:
return 0, {"error": str(exc)}


def _step(label: str, status: int, payload: dict, *, ok_when=lambda s, p: s == 200) -> dict:
ok = ok_when(status, payload)
mark = "✓" if ok else "✗"
print(f" {mark} {label}: HTTP {status} {json.dumps(payload, ensure_ascii=False)[:240]}")
if not ok:
print(f"\nFAILED at: {label}")
sys.exit(1)
return payload


def main() -> None:
if not BASE:
print("set NIL_REGISTRY_URL (and NIL_REGISTRY_TOKEN) to the control plane", file=sys.stderr)
sys.exit(2)
print(f"Live smoke test → {BASE} workspace={WORKSPACE} id={AUTOMATION_ID}\n")

print("1. draft (validates against the LIVE adapter skeleton)")
s, p = _call("POST", "/automations/draft",
{"automation_id": AUTOMATION_ID, "name": NAME, "plan": PLAN, "trigger": {"type": "manual"}})
_step("draft", s, p, ok_when=lambda st, pl: st == 200 and pl.get("ok") is True)

print("2. register (lands pending_approval in the SSOT)")
s, p = _call("POST", "/automations/register",
{"automation_id": AUTOMATION_ID, "name": NAME, "plan": PLAN, "trigger": {"type": "manual"}})
d = _step("register", s, p, ok_when=lambda st, pl: st == 200 and pl.get("ok"))["definition"]
version = d["version"]

print("3. approve (arm it)")
s, p = _call("POST", f"/automations/{WORKSPACE}/{AUTOMATION_ID}/{version}/state",
{"state": "active", "approved_by": "smoke"})
_step("approve", s, p)

print("4. run (the REAL execution path: grant minting + LocalExecutor against the live adapter)")
s, p = _call("POST", f"/automations/{WORKSPACE}/{AUTOMATION_ID}/run",
{"idempotency_key": f"smoke-{int(time.time())}"})
run = _step("run", s, p, ok_when=lambda st, pl: st == 200 and pl.get("ok"))["run"]
print(f"\n✅ run state: {run.get('state')} (run_id={run.get('run_id')})")
if run.get("state") != "completed":
print(" note: run did not 'complete' — inspect the trace; the live execution path was reached.")


if __name__ == "__main__":
main()
62 changes: 62 additions & 0 deletions src/nilscript/automation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""Automation Registry: conversation-authored, deterministically-lowered, SSOT-stored workflows.

An *automation* is a named, versioned, content-hashed `WosoolProgram` plus a trigger and a
lifecycle state. The agent drafts it (untrusted); the kernel validator lowers-or-rejects it
(deterministic); a human approves it; it lives in the control-plane store with its own version lock.

See `docs/PLAN-dynamic-automation-ssot.md`. P1 ships the SSOT spine (models, content hash, draft
gate, persistence, lifecycle); schedule/event triggers and the dispatcher are P2.
"""

from __future__ import annotations

from nilscript.automation.authoring import DraftResult, draft_automation, register
from nilscript.automation.compose import (
ComposedPlan,
ComposedResult,
Stage,
composed_hash,
parse_composed,
run_composed,
validate_composed,
)
from nilscript.automation.dispatch import Runner, fire_composed, fire_manual
from nilscript.automation.scheduler import dispatch_event, run_due_schedules
from nilscript.automation.skeleton import context_from_skeleton
from nilscript.automation.models import (
AutomationDefinition,
AutomationState,
EventTrigger,
ManualTrigger,
ScheduleTrigger,
TriggerSpec,
content_hash,
parse_trigger,
)

__all__ = [
"AutomationDefinition",
"AutomationState",
"ComposedPlan",
"ComposedResult",
"DraftResult",
"Stage",
"EventTrigger",
"ManualTrigger",
"Runner",
"ScheduleTrigger",
"TriggerSpec",
"composed_hash",
"content_hash",
"context_from_skeleton",
"dispatch_event",
"draft_automation",
"fire_composed",
"fire_manual",
"parse_composed",
"parse_trigger",
"register",
"run_composed",
"run_due_schedules",
"validate_composed",
]
126 changes: 126 additions & 0 deletions src/nilscript/automation/authoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""The authoring loop: draft (validate + hash, no effect) → register (persist to SSOT).

`draft_automation` is the deterministic boundary. The agent supplies a raw plan + trigger
(untrusted); the kernel validator (V1-V6) lowers-or-rejects it against the live skeleton. Only a
plan that passes becomes a registrable `AutomationDefinition` carrying its content-hash. The agent
cannot talk past a refusal — a hallucinated verb has nothing to bind to (V4).
"""

from __future__ import annotations

import datetime
from dataclasses import dataclass
from typing import Any, Protocol

from nilscript.automation.models import (
AutomationDefinition,
AutomationState,
BilingualText,
content_hash,
parse_trigger,
)
from nilscript.kernel.context import ValidationContext
from nilscript.kernel.diagnostics import ValidationResult
from nilscript.kernel.models import WosoolProgram
from nilscript.kernel.validator import validate


class _Store(Protocol):
"""The slice of the control-plane store the registry needs (keeps this module store-agnostic)."""

def register_automation(
self,
*,
workspace: str,
automation_id: str,
content_hash: str,
name: dict[str, Any],
plan: dict[str, Any],
trigger: dict[str, Any],
state: str = ...,
authored_by: str = ...,
description: dict[str, Any] | None = ...,
approved_by: str | None = ...,
) -> dict[str, Any]: ...


@dataclass(frozen=True)
class DraftResult:
"""Outcome of a draft attempt. `ok` mirrors the validator verdict; on failure `definition` is
None and `diagnostics` carries the structured refusal (which node, which verb, why)."""

ok: bool
diagnostics: ValidationResult
definition: AutomationDefinition | None = None
content_hash: str | None = None


def _now() -> str:
return datetime.datetime.now(datetime.UTC).isoformat()


def draft_automation(
*,
automation_id: str,
name: Any,
raw_plan: dict[str, Any],
trigger: Any,
ctx: ValidationContext,
authored_by: str = "",
description: Any | None = None,
) -> DraftResult:
"""Validate the agent's candidate plan and, if admitted, build a version-1 draft definition.

No side effect. The workspace is taken from the validated plan so the two cannot drift.
"""
result = validate(raw_plan, ctx)
if not result.ok:
return DraftResult(ok=False, diagnostics=result)

program = WosoolProgram.model_validate(raw_plan)
digest = content_hash(program)
definition = AutomationDefinition(
automation_id=automation_id,
workspace=program.workspace,
version=1,
content_hash=digest,
name=name if isinstance(name, BilingualText) else BilingualText.model_validate(name),
description=(
None
if description is None
else description
if isinstance(description, BilingualText)
else BilingualText.model_validate(description)
),
plan=program,
trigger=parse_trigger(trigger),
state="draft",
authored_by=authored_by,
created_at=_now(),
)
return DraftResult(ok=True, diagnostics=result, definition=definition, content_hash=digest)


def register(
store: _Store,
definition: AutomationDefinition,
*,
state: AutomationState = "pending_approval",
) -> AutomationDefinition:
"""Persist a drafted definition to the SSOT and return the canonical stored version.

Registering a recurring automation is a governed act, so it lands in `pending_approval` by
default — never auto-armed. Re-registering an identical plan (same hash) is an idempotent no-op.
"""
row = store.register_automation(
workspace=definition.workspace,
automation_id=definition.automation_id,
content_hash=definition.content_hash,
name=definition.name.model_dump(),
description=definition.description.model_dump() if definition.description else None,
plan=definition.plan.model_dump(by_alias=True, mode="json"),
trigger=definition.trigger.model_dump(),
state=state,
authored_by=definition.authored_by,
)
return AutomationDefinition.from_row(row)
Loading
Loading