Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/nilscript/controlplane/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,8 @@ def index() -> str:
const r=await fetch('/api/automations');const {automations}=await r.json();
const wrap=document.getElementById('autoWrap'),box=document.getElementById('automations');
document.getElementById('autocount').textContent=automations.length;
wrap.style.display=automations.length?'block':'none';
wrap.style.display='block'; // always visible — the compose form + token live here, even with 0 automations
if(!automations.length){box.innerHTML='<div class=empty style=padding:22px><div class=big>No automations yet</div><div>Click “+ New cross-system automation” above to build one between two systems — or ask the agent via MCP.</div></div>';return;}
box.innerHTML=automations.map(a=>{
const nm=(a.name&&(a.name.en||a.name.ar))||a.automation_id;
const ps=a.plan_summary||{};
Expand Down
120 changes: 120 additions & 0 deletions src/nilscript/mcp/brain_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Agent-facing MCP tools for READING the NIL Business Graph (the brain).

The kernel governs *actions* against a connected backend (`nil_describe`/`propose`/`commit`). But a
business also has a *graph* — cycles, entities, roles, policies, flows, and the live instances flowing
through them (invoices, payments, the overdue list). That graph lives in the NIL **brain**, a separate
read-model service. Without a dedicated tool, an agent asked "show my policies" improvises (curl, file
search) and answers inconsistently — sometimes hitting the brain, sometimes asking the kernel "is there
a policy verb?" and wrongly concluding "no policies".

These tools remove the guesswork: each is a thin, read-only HTTP relay to a stable brain GET endpoint,
so "show my policies / cycles / what changed / what's overdue" is a deterministic tool call with one
answer. The kernel stays decoupled from the brain's internals — it relays HTTP, it never imports the
brain. Env-gated: present only when `NIL_BRAIN_URL` is configured.
"""

from __future__ import annotations

import os
from typing import Any

import httpx

# Graph node kinds an operator asks about by name. "policy" is the one that was failing — policies are
# graph nodes, never kernel verbs.
_GRAPH_KINDS = ("entity", "role", "policy", "flow", "cycle")


class BrainTools:
"""Read-only HTTP relay to the brain's `/api/graph/*` endpoints. Inject a client for tests."""

def __init__(
self, brain_url: str, *, token: str = "", tenant: str = "",
client: httpx.AsyncClient | None = None, timeout: float = 10.0,
) -> None:
self._base = brain_url.rstrip("/")
self._headers = {"Authorization": f"Bearer {token}"} if token else {}
self._tenant = tenant
self._client = client
self._timeout = timeout

@classmethod
def from_env(cls) -> BrainTools | None:
"""Build from `NIL_BRAIN_URL` (+ optional `NIL_BRAIN_TOKEN`/`NIL_BRAIN_TENANT`), else None."""
url = os.environ.get("NIL_BRAIN_URL", "")
if not url:
return None
return cls(
url,
token=os.environ.get("NIL_BRAIN_TOKEN", ""),
tenant=os.environ.get("NIL_BRAIN_TENANT", ""),
)

def _tenant_for(self, tenant: str | None) -> str:
return tenant or self._tenant

async def _get(self, path: str, *, params: dict[str, Any]) -> Any:
client = self._client or httpx.AsyncClient(base_url=self._base, timeout=self._timeout)
try:
resp = await client.request("GET", path, params=params, headers=self._headers)
if resp.status_code >= 400:
return {"error": f"brain returned {resp.status_code}", "path": path}
try:
return resp.json()
except ValueError:
return {"error": "non-json response from brain", "status": resp.status_code}
except httpx.HTTPError as exc:
return {"error": f"brain unreachable: {exc}"}
finally:
if self._client is None:
await client.aclose()

# ── read tools ──────────────────────────────────────────────────────────────────────────────

async def graph(self, kind: str | None = None, tenant: str | None = None) -> dict[str, Any]:
"""Business-graph nodes, optionally filtered to one `kind` (entity/role/policy/flow/cycle).

`graph(kind="policy")` is the deterministic answer to "show my policies"."""
ws = self._tenant_for(tenant)
if not ws:
return {"error": "no tenant configured (set NIL_BRAIN_TENANT or pass tenant)"}
if kind is not None and kind not in _GRAPH_KINDS:
return {"error": f"unknown kind {kind!r}; expected one of {list(_GRAPH_KINDS)}"}
data = await self._get("/api/graph/nodes", params={"tenant": ws})
if isinstance(data, dict) and data.get("error"):
return data
nodes = data.get("nodes", data) if isinstance(data, dict) else data
if not isinstance(nodes, list):
return {"error": "unexpected node payload from brain", "tenant": ws}
if kind is not None:
nodes = [n for n in nodes if isinstance(n, dict) and n.get("kind") == kind]
return {"tenant": ws, "kind": kind, "count": len(nodes), "nodes": nodes}

async def cycles(self, tenant: str | None = None) -> dict[str, Any]:
"""Business cycles (Sales, Finance, …) with their goal, metrics, and members."""
ws = self._tenant_for(tenant)
if not ws:
return {"error": "no tenant configured (set NIL_BRAIN_TENANT or pass tenant)"}
return await self._get("/api/graph/cycles", params={"tenant": ws})

async def overview(self, tenant: str | None = None) -> dict[str, Any]:
"""Graph summary: how many entities, roles, policies, flows, cycles exist."""
ws = self._tenant_for(tenant)
if not ws:
return {"error": "no tenant configured (set NIL_BRAIN_TENANT or pass tenant)"}
return await self._get("/api/graph/summary", params={"tenant": ws})

async def instances(self, tenant: str | None = None) -> dict[str, Any]:
"""Live instance tallies per entity type — totals + derived-state counts (e.g. overdue,
awaiting_approval). The deterministic answer to "how many invoices are overdue"."""
ws = self._tenant_for(tenant)
if not ws:
return {"error": "no tenant configured (set NIL_BRAIN_TENANT or pass tenant)"}
return await self._get("/api/graph/instances/summary", params={"tenant": ws})

async def activity(self, tenant: str | None = None) -> dict[str, Any]:
"""What recently changed in the business graph (the latest version diff / additions)."""
ws = self._tenant_for(tenant)
if not ws:
return {"error": "no tenant configured (set NIL_BRAIN_TENANT or pass tenant)"}
return await self._get("/api/graph/activity", params={"tenant": ws})
94 changes: 93 additions & 1 deletion src/nilscript/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from __future__ import annotations

import json
import os
from typing import Any

# Imported at module scope (not lazily) so the wrapped tool functions' stringized annotations
Expand Down Expand Up @@ -139,6 +140,8 @@ def build_server(
port: int = 8765,
tools_provider: ToolsProvider | None = None,
automation_tools: Any = None,
brain_tools: Any = None,
allowed_hosts: list[str] | None = None,
): # type: ignore[no-untyped-def]
"""Bind the NilTools surface onto a FastMCP server. Imports `mcp` lazily.

Expand All @@ -149,8 +152,25 @@ def build_server(
skill/skeleton resources and any `dynamic_verbs` always reflect the `tools` backend (the default).
`automation_tools` (optional `AutomationTools`) adds the registry tools so an agent can author
governed automations by talking — bound only when a control-plane registry is configured.
`allowed_hosts` (optional) widens the streamable-HTTP DNS-rebinding guard: FastMCP only reads
`transport_security` as a constructor kwarg and otherwise auto-enables a localhost-only allowlist,
so a server reachable by its container/service or public host (e.g. another in-cluster agent
dialing `nilscript-mcp:8765`) is 421-rejected unless those hosts are listed here. Entries may use
the SDK's ``host:*`` wildcard-port form. The `/mcp` front door stays bearer-gated regardless.
"""
server = FastMCP(name, instructions=_INSTRUCTIONS, host=host, port=port)
transport_security = None
if allowed_hosts:
from mcp.server.transport_security import TransportSecuritySettings

transport_security = TransportSecuritySettings(
enable_dns_rebinding_protection=True,
allowed_hosts=list(allowed_hosts),
allowed_origins=["*"],
)
server = FastMCP(
name, instructions=_INSTRUCTIONS, host=host, port=port,
transport_security=transport_security,
)

provider = tools_provider if tools_provider is not None else SingletonToolsProvider(tools)
_register_tools(server, provider)
Expand All @@ -161,6 +181,8 @@ def build_server(
_register_skill(server, tools)
if automation_tools is not None:
_register_automation_tools(server, automation_tools)
if brain_tools is not None:
_register_brain_tools(server, brain_tools)
return server


Expand Down Expand Up @@ -226,6 +248,55 @@ async def nil_automation_list(workspace: str) -> dict[str, Any]:
)


def _register_brain_tools(server: Any, brain: Any) -> None:
"""Bind the read-only Business Graph tools. These answer "show my policies / cycles / what changed /
what's overdue" deterministically from the brain read-model — so the agent never improvises (curl,
file search) or mistakes a graph question for a missing kernel verb. All read-only, no side effect."""

async def nil_graph(kind: str | None = None, tenant: str | None = None) -> dict[str, Any]:
return await brain.graph(kind, tenant)

async def nil_cycles(tenant: str | None = None) -> dict[str, Any]:
return await brain.cycles(tenant)

async def nil_overview(tenant: str | None = None) -> dict[str, Any]:
return await brain.overview(tenant)

async def nil_instances(tenant: str | None = None) -> dict[str, Any]:
return await brain.instances(tenant)

async def nil_activity(tenant: str | None = None) -> dict[str, Any]:
return await brain.activity(tenant)

server.add_tool(
nil_graph, name="nil_graph",
description="READ the Business Graph nodes from the brain. Pass kind to filter: 'policy' (the "
"right way to answer 'show my policies' — policies are graph nodes, NOT kernel verbs), 'entity', "
"'role', 'flow', or 'cycle'. No side effect. Use this for any 'show me my X' structure question.",
)
server.add_tool(
nil_cycles, name="nil_cycles",
description="READ the business cycles (e.g. Sales, Finance) with each cycle's goal, metrics, and "
"members. The deterministic answer to 'show me the cycles'. No side effect.",
)
server.add_tool(
nil_overview, name="nil_overview",
description="READ a one-glance graph summary: counts of entities, roles, policies, flows, and "
"cycles for the workspace. No side effect.",
)
server.add_tool(
nil_instances, name="nil_instances",
description="READ live instance tallies per entity type — totals plus derived-state counts such "
"as overdue and awaiting_approval. The deterministic answer to 'how many invoices are overdue'. "
"No side effect.",
)
server.add_tool(
nil_activity, name="nil_activity",
description="READ what recently changed in the Business Graph (latest version additions/diff). "
"The deterministic answer to 'what changed this week'. No side effect.",
)


def _register_tools(server: Any, provider: ToolsProvider) -> None:
"""Wrap each primitive with the MCP Context so the backend + per-connection session resolve from
the connection: `provider.get(ctx)` picks the backend (one shared, or per-tenant from headers)
Expand Down Expand Up @@ -411,6 +482,24 @@ def serve(
uvicorn.run(app, host=host, port=port)


def _allowed_hosts_from_env() -> list[str] | None:
"""Parse ``NIL_MCP_ALLOWED_HOSTS`` (JSON list or comma-separated) for the DNS-rebinding allowlist.

Set by the deploy when the server is reached by a name other than localhost (its container/service
name, or a public ``mcp.*`` host behind a reverse proxy). ``None`` (unset/blank) keeps FastMCP's
localhost-only default. Entries may use the SDK's ``host:*`` wildcard-port form.
"""
raw = os.environ.get("NIL_MCP_ALLOWED_HOSTS", "").strip()
if not raw:
return None
if raw.startswith("["):
hosts = [str(h).strip() for h in json.loads(raw)]
else:
hosts = [h.strip() for h in raw.split(",")]
hosts = [h for h in hosts if h]
return hosts or None


def build_asgi_app(
*,
adapter_url: str,
Expand Down Expand Up @@ -460,10 +549,13 @@ def build_asgi_app(
registry=make_registry_lookup(),
)
from nilscript.mcp.automation_tools import AutomationTools
from nilscript.mcp.brain_tools import BrainTools

server = build_server(
tools, dynamic_verbs=verbs, tools_provider=provider,
automation_tools=AutomationTools.from_env(),
brain_tools=BrainTools.from_env(),
allowed_hosts=_allowed_hosts_from_env(),
)
app = server.streamable_http_app() # MCP mounted at /mcp

Expand Down
119 changes: 119 additions & 0 deletions tests/test_mcp_brain_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""The agent-facing Business Graph (brain) READ tools.

BrainTools is a thin read-only HTTP relay to the brain's `/api/graph/*` endpoints. These tests back it
with an httpx MockTransport (no network) to prove: the right endpoint is hit, the tenant is applied,
`graph(kind=…)` filters correctly, and failures degrade to a structured `{"error": …}`. A registration
test proves the five tools land on a FastMCP server. This is the fix for the agent improvising on
"show my policies" — policies are graph nodes, surfaced deterministically here.
"""

from __future__ import annotations

import httpx
import pytest

from nilscript.mcp.brain_tools import BrainTools

_NODES = [
{"id": "entity:invoice", "kind": "entity", "label": {"en": "Invoice"}},
{"id": "policy:payment-approval", "kind": "policy", "label": {"en": "Payment Approval"}},
{"id": "role:cfo", "kind": "role", "label": {"en": "CFO"}},
]


def _brain(handler) -> BrainTools:
client = httpx.AsyncClient(transport=httpx.MockTransport(handler), base_url="http://brain")
return BrainTools("http://brain", tenant="ws_acme", client=client)


def _ok(handler):
"""Wrap a path→json map into a MockTransport handler that records the last request."""
seen: dict = {}

def _h(request: httpx.Request) -> httpx.Response:
seen["url"] = str(request.url)
seen["path"] = request.url.path
seen["params"] = dict(request.url.params)
return handler(request)

return _h, seen


async def test_graph_filters_to_policies() -> None:
handler, seen = _ok(lambda r: httpx.Response(200, json={"nodes": _NODES}))
out = await _brain(handler).graph(kind="policy")
assert seen["path"] == "/api/graph/nodes"
assert seen["params"]["tenant"] == "ws_acme"
assert out["count"] == 1
assert out["nodes"][0]["id"] == "policy:payment-approval"


async def test_graph_no_kind_returns_all() -> None:
handler, _ = _ok(lambda r: httpx.Response(200, json={"nodes": _NODES}))
out = await _brain(handler).graph()
assert out["count"] == 3


async def test_graph_rejects_unknown_kind() -> None:
handler, _ = _ok(lambda r: httpx.Response(200, json={"nodes": _NODES}))
out = await _brain(handler).graph(kind="invoiceish")
assert "error" in out and "unknown kind" in out["error"]


async def test_cycles_hits_cycles_endpoint() -> None:
handler, seen = _ok(lambda r: httpx.Response(200, json={"tenant": "ws_acme", "cycles": []}))
out = await _brain(handler).cycles()
assert seen["path"] == "/api/graph/cycles"
assert out["tenant"] == "ws_acme"


async def test_instances_hits_instances_summary() -> None:
handler, seen = _ok(lambda r: httpx.Response(200, json={"entity_types": {"invoice": {"overdue": 12}}}))
out = await _brain(handler).instances()
assert seen["path"] == "/api/graph/instances/summary"
assert out["entity_types"]["invoice"]["overdue"] == 12


async def test_activity_hits_activity_endpoint() -> None:
handler, seen = _ok(lambda r: httpx.Response(200, json={"added": []}))
await _brain(handler).activity()
assert seen["path"] == "/api/graph/activity"


async def test_brain_error_degrades_to_structured_error() -> None:
handler, _ = _ok(lambda r: httpx.Response(503, text="upstream down"))
out = await _brain(handler).cycles()
assert "error" in out and "503" in out["error"]


async def test_missing_tenant_is_a_clear_error() -> None:
handler, _ = _ok(lambda r: httpx.Response(200, json={"nodes": []}))
client = httpx.AsyncClient(transport=httpx.MockTransport(handler), base_url="http://brain")
notenant = BrainTools("http://brain", tenant="", client=client)
out = await notenant.graph(kind="policy")
assert "error" in out and "tenant" in out["error"]


def test_from_env_none_without_url(monkeypatch) -> None:
monkeypatch.delenv("NIL_BRAIN_URL", raising=False)
assert BrainTools.from_env() is None


def test_from_env_builds_with_url(monkeypatch) -> None:
monkeypatch.setenv("NIL_BRAIN_URL", "https://brain.example")
monkeypatch.setenv("NIL_BRAIN_TENANT", "ws_acme")
bt = BrainTools.from_env()
assert bt is not None
assert bt._tenant == "ws_acme"


@pytest.mark.skipif(
pytest.importorskip("mcp", reason="needs the [mcp] extra") is None, reason="needs mcp"
)
async def test_brain_tools_register_on_server() -> None:
from nilscript.mcp.server import build_server, build_tools

handler, _ = _ok(lambda r: httpx.Response(200, json={"nodes": []}))
server = build_server(build_tools(adapter_url="http://127.0.0.1:9", bearer=""), brain_tools=_brain(handler))
names = {t.name for t in await server.list_tools()}
assert {"nil_graph", "nil_cycles", "nil_overview", "nil_instances", "nil_activity"} <= names
Loading
Loading