Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions packages/plugins/mcp/src/sdk/plugin.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import { mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { describe, expect, it } from "@effect/vitest";
import { Effect, Layer, Option, Predicate, Schema } from "effect";
import {
Expand Down Expand Up @@ -38,6 +41,61 @@ import { makeAnnotationsMcpServer, serveMcpServer } from "../testing";

const TEMPLATE = AuthTemplateSlug.make("none");

const makeStdioMcpFixture = () => {
const dir = mkdtempSync(join(tmpdir(), "executor-mcp-stdio-"));
const script = join(dir, "server.mjs");
writeFileSync(
script,
`let buffer = "";
const send = (message) => process.stdout.write(JSON.stringify(message) + "\\n");
process.stdin.setEncoding("utf8");
process.stdin.on("data", (chunk) => {
buffer += chunk;
let index;
while ((index = buffer.indexOf("\\n")) >= 0) {
const line = buffer.slice(0, index).replace(/\\r$/, "");
buffer = buffer.slice(index + 1);
if (!line.trim()) continue;
const message = JSON.parse(line);
if (!("id" in message)) continue;
if (message.method === "initialize") {
send({
jsonrpc: "2.0",
id: message.id,
result: {
protocolVersion: "2025-06-18",
capabilities: { tools: {} },
serverInfo: { name: "stdio-fixture", version: "1.0.0" },
},
});
} else if (message.method === "tools/list") {
send({
jsonrpc: "2.0",
id: message.id,
result: {
tools: [
{
name: "hello",
description: "Greets over stdio",
inputSchema: { type: "object", properties: { name: { type: "string" } } },
},
],
},
});
} else {
send({
jsonrpc: "2.0",
id: message.id,
error: { code: -32601, message: "Method not found" },
});
}
}
});
`,
);
return { command: process.execPath, args: [script], dir };
};

const JsonRpcId = Schema.Union([Schema.String, Schema.Number, Schema.Null]);
const JsonRpcRequest = Schema.Struct({
id: Schema.optional(JsonRpcId),
Expand Down Expand Up @@ -345,6 +403,127 @@ describe("mcpPlugin", () => {
}),
);

it.effect("creates a default org connection when adding a stdio MCP server", () =>
Effect.gen(function* () {
const executor = yield* createExecutor(
makeTestConfig({
plugins: [
memoryCredentialsPlugin(),
mcpPlugin({ dangerouslyAllowStdioMCP: true }),
] as const,
}),
);

yield* executor.mcp.addServer({
transport: "stdio",
name: "Stdio MCP",
command: "",
slug: "stdio_mcp",
});

const connections = yield* executor.connections.list({
integration: IntegrationSlug.make("stdio_mcp"),
});

expect(connections).toHaveLength(1);
expect(connections[0]).toMatchObject({
owner: "org",
integration: IntegrationSlug.make("stdio_mcp"),
name: ConnectionName.make("default"),
template: TEMPLATE,
address: "tools.stdio_mcp.org.default",
});
}),
);

it.effect("produces stdio MCP tools under the default org connection", () =>
Effect.scoped(
Effect.gen(function* () {
const fixture = yield* Effect.acquireRelease(Effect.sync(makeStdioMcpFixture), ({ dir }) =>
Effect.sync(() => rmSync(dir, { recursive: true, force: true })),
);
const executor = yield* createExecutor(
makeTestConfig({
plugins: [
memoryCredentialsPlugin(),
mcpPlugin({ dangerouslyAllowStdioMCP: true }),
] as const,
}),
);

yield* executor.mcp.addServer({
transport: "stdio",
name: "Stdio MCP Tools",
command: fixture.command,
args: fixture.args,
slug: "stdio_tools",
});

const tools = yield* executor.tools.list();
const stdioTools = tools.filter((tool) => String(tool.integration) === "stdio_tools");

expect(stdioTools).toHaveLength(1);
expect(stdioTools[0]).toMatchObject({
address: ToolAddress.make("tools.stdio_tools.org.default.hello"),
owner: "org",
integration: IntegrationSlug.make("stdio_tools"),
connection: ConnectionName.make("default"),
name: "hello",
description: "Greets over stdio",
});
}),
),
);

it.effect(
"backfills a default connection for existing stdio MCP servers with no connections",
() =>
Effect.scoped(
Effect.gen(function* () {
const fixture = yield* Effect.acquireRelease(
Effect.sync(makeStdioMcpFixture),
({ dir }) => Effect.sync(() => rmSync(dir, { recursive: true, force: true })),
);
const executor = yield* createExecutor(
makeTestConfig({
plugins: [
memoryCredentialsPlugin(),
mcpPlugin({ dangerouslyAllowStdioMCP: true }),
] as const,
}),
);
const integration = IntegrationSlug.make("stdio_backfill");

yield* executor.mcp.addServer({
transport: "stdio",
name: "Stdio MCP Backfill",
command: fixture.command,
args: fixture.args,
slug: String(integration),
});
yield* executor.connections.remove({
owner: "org",
integration,
name: ConnectionName.make("default"),
});

expect(yield* executor.connections.list({ integration })).toHaveLength(0);
yield* executor.mcp.getServer(String(integration));

const connections = yield* executor.connections.list({ integration });
const tools = (yield* executor.tools.list()).filter(
(tool) => String(tool.integration) === String(integration),
);

expect(connections).toHaveLength(1);
expect(connections[0]?.address).toBe("tools.stdio_backfill.org.default");
expect(tools.map((tool) => String(tool.address))).toEqual([
"tools.stdio_backfill.org.default.hello",
]);
}),
),
);

it.effect("removing an MCP server removes the OAuth client used by its connection", () =>
Effect.scoped(
Effect.gen(function* () {
Expand Down
84 changes: 80 additions & 4 deletions packages/plugins/mcp/src/sdk/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import * as z from "zod/v4";
import {
authToolFailure,
definePlugin,
AuthTemplateSlug,
ConnectionName,
IntegrationAlreadyExistsError,
IntegrationSlug,
mergeAuthTemplates,
Expand Down Expand Up @@ -63,6 +65,8 @@ import {
} from "./types";

const MCP_PLUGIN_ID = "mcp" as const;
const DEFAULT_STDIO_CONNECTION_NAME = ConnectionName.make("default");
const NO_AUTH_TEMPLATE = AuthTemplateSlug.make("none");

const legacyOAuthClientSlugCandidate = (value: string): string | null => {
const slug = value
Expand Down Expand Up @@ -748,6 +752,57 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
}),
);

const defaultStdioConnectionRef = (integration: IntegrationSlug) => ({
owner: "org" as const,
name: DEFAULT_STDIO_CONNECTION_NAME,
integration,
});

const defaultStdioConnectionFailure = (message: string) =>
new McpConnectionError({
transport: "stdio",
message: `Failed creating the default stdio MCP connection: ${message}`,
});

const refreshDefaultStdioConnection = (integration: IntegrationSlug) =>
ctx.connections.refresh(defaultStdioConnectionRef(integration)).pipe(
Effect.asVoid,
Effect.catchTags({
ConnectionNotFoundError: (error) =>
Effect.fail(defaultStdioConnectionFailure(error.message)),
IntegrationNotFoundError: (error) =>
Effect.fail(defaultStdioConnectionFailure(error.message)),
}),
);

const ensureDefaultStdioConnection = (integration: IntegrationSlug, slug: string) =>
Effect.gen(function* () {
const connections = yield* ctx.connections.list({ integration });
if (connections.length > 0) return;

yield* ctx.connections
.create({
...defaultStdioConnectionRef(integration),
template: NO_AUTH_TEMPLATE,
values: {},
})
.pipe(
Effect.asVoid,
Effect.catchTags({
CredentialProviderNotRegisteredError: (error) =>
Effect.fail(defaultStdioConnectionFailure(error.message)),
IntegrationNotFoundError: (error) =>
Effect.fail(defaultStdioConnectionFailure(error.message)),
InvalidConnectionInputError: (error) =>
Effect.fail(defaultStdioConnectionFailure(error.message)),
UniqueViolationError: () => refreshDefaultStdioConnection(integration),
}),
Effect.withSpan("mcp.plugin.ensure_stdio_default_connection", {
attributes: { "mcp.integration.slug": slug },
}),
);
});

const addServer = (input: McpServerInput) =>
Effect.gen(function* () {
const slug = normalizeSlug(input);
Expand All @@ -763,9 +818,11 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
return yield* new IntegrationAlreadyExistsError({ slug: slugFrom(slug) });
}

const integration = slugFrom(slug);

yield* ctx.core.integrations
.register({
slug: slugFrom(slug),
slug: integration,
name: input.name,
description: input.description?.trim() || input.name,
config,
Expand All @@ -777,6 +834,11 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
attributes: { "mcp.integration.slug": slug },
}),
);

if (config.transport === "stdio") {
yield* ensureDefaultStdioConnection(integration, slug);
}

return { slug };
}).pipe(
Effect.withSpan("mcp.plugin.add_server", {
Expand Down Expand Up @@ -863,14 +925,28 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
);

const getServer = (slug: string) =>
ctx.core.integrations.get(slugFrom(slug)).pipe(
Effect.gen(function* () {
const integration = slugFrom(slug);
const record = yield* ctx.core.integrations.get(integration);
const config = record ? parseMcpIntegrationConfig(record.config) : null;
if (config?.transport === "stdio") {
yield* ensureDefaultStdioConnection(integration, slug);
}
return record;
}).pipe(
Effect.withSpan("mcp.plugin.get_server", {
attributes: { "mcp.integration.slug": slug },
}),
);

const configureServer = (slug: string, config: McpIntegrationConfigType) =>
ctx.core.integrations.update(slugFrom(slug), { config }).pipe(
Effect.gen(function* () {
const integration = slugFrom(slug);
yield* ctx.core.integrations.update(integration, { config });
if (config.transport === "stdio") {
yield* ensureDefaultStdioConnection(integration, slug);
}
}).pipe(
Effect.withSpan("mcp.plugin.configure_server", {
attributes: { "mcp.integration.slug": slug },
}),
Expand Down Expand Up @@ -1214,7 +1290,7 @@ export const mcpPlugin = definePlugin((options?: McpPluginOptions) => {
tool({
name: "addServer",
description:
"Register an MCP server in the catalog as an integration. Returns its `slug`. Then create a connection against it: for header/API-key auth call `connections.create` with the value; for OAuth-protected servers run `oauth.probe` + `oauth.start`. Tools are produced per-connection at connection create / refresh.",
"Register an MCP server in the catalog as an integration. Returns its `slug`. Stdio servers are connected automatically as `org/default` with template `none`. For remote header/API-key auth, call `connections.create` with the value; for OAuth-protected remote servers, run `oauth.probe` + `oauth.start`. Tools are produced per-connection at connection create / refresh.",
annotations: {
requiresApproval: true,
approvalDescription: "Add an MCP server",
Expand Down
Loading