Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/polite-caches-pull.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"executor": patch
---

Add a host-provided cache primitive to the SDK executor surface. Hosts can now pass an Effect KeyValueStore to `createExecutor`, while executors without one use an in-memory fallback.
6 changes: 6 additions & 0 deletions packages/core/api/src/server/scoped-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
// ---------------------------------------------------------------------------

import { Context, Effect, Option } from "effect";
import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore";

import {
createExecutor,
Expand Down Expand Up @@ -205,6 +206,7 @@ export const makeScopedExecutor = <
const { db, blobs } = yield* DbProvider;
const { plugins: pluginsFactory } = yield* PluginsProvider;
const config = yield* HostConfig;
const cache = yield* Effect.serviceOption(KeyValueStore.KeyValueStore);
// Explicit config wins; otherwise fall back to the request origin if a host
// provided one (HTTP middleware / MCP session DO). Stays `undefined` for
// non-request callers — `coreTools.webBaseUrl` is optional and only the
Expand Down Expand Up @@ -255,6 +257,10 @@ export const makeScopedExecutor = <
subject: Subject.make(accountId),
db,
blobs,
...Option.match(cache, {
onNone: () => ({}),
onSome: (store) => ({ cache: store }),
}),
plugins,
httpClientLayer,
fetch: hostedFetch,
Expand Down
92 changes: 92 additions & 0 deletions packages/core/sdk/src/executor-cache.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect } from "effect";

import { createExecutor, type Executor } from "./executor";
import { Tenant } from "./ids";

const MEMORY_CACHE_CAPACITY = 2_048;
const MEMORY_CACHE_TTL_MS = 10 * 60 * 1000;

const makeExecutor = Effect.acquireRelease(
createExecutor({
tenant: Tenant.make("test-tenant"),
onElicitation: "accept-all",
}),
(executor) => executor.close().pipe(Effect.ignore),
);

const withFakeNow = <A, E, R>(
initialNow: number,
run: (clock: { readonly advance: (ms: number) => void }) => Effect.Effect<A, E, R>,
): Effect.Effect<A, E, R> =>
Effect.acquireUseRelease(
Effect.sync(() => {
const originalNow = Date.now;
let now = initialNow;
Date.now = () => now;
return {
advance: (ms: number) => {
now += ms;
},
restore: () => {
Date.now = originalNow;
},
};
}),
(clock) => run({ advance: clock.advance }),
(clock) => Effect.sync(clock.restore),
);

describe("executor cache", () => {
it.effect("uses an in-memory fallback when no cache is configured", () =>
Effect.scoped(
Effect.gen(function* () {
const executor = yield* makeExecutor;

yield* executor.cache.set("a", "value");
expect(yield* executor.cache.get("a")).toBe("value");

yield* executor.cache.remove("a");
expect(yield* executor.cache.get("a")).toBeUndefined();
}),
),
);

it.effect("expires fallback entries by TTL on get and size", () =>
withFakeNow(1_000, ({ advance }) =>
Effect.scoped(
Effect.gen(function* () {
const executor = yield* makeExecutor;

yield* executor.cache.set("a", "value");
expect(yield* executor.cache.size).toBe(1);

advance(MEMORY_CACHE_TTL_MS);

expect(yield* executor.cache.get("a")).toBeUndefined();
expect(yield* executor.cache.size).toBe(0);
}),
),
),
);

it.effect("refreshes fallback LRU position when an existing key is written", () =>
Effect.scoped(
Effect.gen(function* () {
const executor: Executor = yield* makeExecutor;

yield* executor.cache.set("a", "old");
for (let index = 0; index < MEMORY_CACHE_CAPACITY - 1; index += 1) {
yield* executor.cache.set(`key-${index}`, String(index));
}

yield* executor.cache.set("a", "new");
yield* executor.cache.set("overflow", "value");

expect(yield* executor.cache.get("a")).toBe("new");
expect(yield* executor.cache.get("key-0")).toBeUndefined();
expect(yield* executor.cache.get("key-1")).toBe("1");
}),
),
);
});
59 changes: 59 additions & 0 deletions packages/core/sdk/src/executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect";
import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore";
import { FetchHttpClient, type HttpClient } from "effect/unstable/http";
import { fumadb } from "@executor-js/fumadb";
import { memoryAdapter } from "@executor-js/fumadb/adapters/memory";
Expand Down Expand Up @@ -319,6 +320,8 @@ export type Executor<TPlugins extends readonly AnyPlugin[] = readonly []> = {
) => Effect.Effect<unknown, ExecuteError>;

readonly close: () => Effect.Effect<void, StorageFailure>;

readonly cache: KeyValueStore.KeyValueStore;
} & PluginExtensions<TPlugins>;

export interface ExecutorDb {
Expand Down Expand Up @@ -346,6 +349,7 @@ export interface ExecutorConfig<TPlugins extends readonly AnyPlugin[] = readonly
* values stay out of the relational tier.
*/
readonly blobs?: BlobStore;
readonly cache?: KeyValueStore.KeyValueStore;
readonly plugins?: TPlugins;
/** Config-level credential providers, merged with every
* `plugin.credentialProviders`. Config providers register first, so the
Expand Down Expand Up @@ -418,6 +422,59 @@ const validateExecutorDbTables = (required: FumaTables, actual: FumaTables): voi
const storageFailureFromUnknown = (message: string, cause: unknown): StorageFailure =>
isStorageFailure(cause) ? cause : new StorageError({ message, cause });

const MEMORY_CACHE_CAPACITY = 2_048;
const MEMORY_CACHE_TTL_MS = 10 * 60 * 1000;

const makeMemoryCacheStore = (): KeyValueStore.KeyValueStore => {
const rows = new Map<string, { readonly value: string; readonly expiresAt: number }>();
const evictExpired = (now: number): void => {
for (const [key, entry] of rows) {
if (entry.expiresAt <= now) rows.delete(key);
}
};
const evictCapacity = (): void => {
while (rows.size > MEMORY_CACHE_CAPACITY) {
const oldest = rows.keys().next().value;
if (oldest === undefined) break;
rows.delete(oldest);
}
};
return KeyValueStore.makeStringOnly({
get: (key) =>
Effect.sync(() => {
const now = Date.now();
const entry = rows.get(key);
if (entry === undefined) return undefined;
if (entry.expiresAt <= now) {
rows.delete(key);
return undefined;
}
rows.delete(key);
rows.set(key, entry);
return entry.value;
}),
set: (key, value) =>
Effect.sync(() => {
const now = Date.now();
evictExpired(now);
rows.delete(key);
rows.set(key, { value, expiresAt: now + MEMORY_CACHE_TTL_MS });
evictCapacity();
}),
Comment thread
aryasaatvik marked this conversation as resolved.
remove: (key) =>
Effect.sync(() => {
rows.delete(key);
}),
clear: Effect.sync(() => {
rows.clear();
}),
size: Effect.sync(() => {
evictExpired(Date.now());
return rows.size;
}),
});
Comment thread
aryasaatvik marked this conversation as resolved.
};

const pluginStorageFailure = (pluginId: string, hook: string, cause: unknown): StorageFailure =>
storageFailureFromUnknown(`${hook} failed for plugin ${pluginId}`, cause);

Expand Down Expand Up @@ -1277,6 +1334,7 @@ export const createExecutor = <const TPlugins extends readonly AnyPlugin[] = rea
const fuma = makeFumaClient(rootDb);
const core = makeCoreDb(fuma);
const blobs = config.blobs ?? makeFumaBlobStore(fuma);
const cacheStore = config.cache ?? makeMemoryCacheStore();
const transaction = <A, E>(effect: Effect.Effect<A, E>) => fuma.transaction(effect);

// Populated once, never mutated after startup.
Expand Down Expand Up @@ -3299,6 +3357,7 @@ export const createExecutor = <const TPlugins extends readonly AnyPlugin[] = rea
},
execute,
close,
cache: cacheStore,
};

const toExecutor = (value: unknown): Executor<TPlugins> => value as Executor<TPlugins>;
Expand Down
3 changes: 3 additions & 0 deletions packages/core/sdk/src/test-config.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Context, Effect, Layer } from "effect";
import type * as KeyValueStore from "effect/unstable/persistence/KeyValueStore";
import { withQueryContext } from "@executor-js/fumadb/query";
import { collectTables, createExecutor, type Executor, type ExecutorConfig } from "./executor";
import type { FumaDb } from "./fuma-runtime";
Expand Down Expand Up @@ -118,6 +119,7 @@ export type TestConfigOptions<TPlugins extends readonly AnyPlugin[] = readonly [
readonly backend?: TestDatabaseBackend;
readonly dataDir?: string;
readonly coreTools?: ExecutorConfig<TPlugins>["coreTools"];
readonly cache?: KeyValueStore.KeyValueStore;
/** Override the OAuth callback URL. Pass `null` to construct an executor with
* no OAuth callback (exercises the fail-loud redirect path). */
readonly redirectUri?: string | null;
Expand Down Expand Up @@ -155,6 +157,7 @@ export const makeTestConfig = <const TPlugins extends readonly AnyPlugin[] = rea
db,
plugins: options?.plugins,
coreTools: options?.coreTools,
cache: options?.cache,
testDb,
// Tests default to auto-accepting elicitation prompts.
onElicitation: "accept-all",
Expand Down
4 changes: 4 additions & 0 deletions packages/hosts/cloudflare/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
"types": "./src/blob-store.ts",
"default": "./src/blob-store.ts"
},
"./key-value-store": {
"types": "./src/key-value-store.ts",
"default": "./src/key-value-store.ts"
},
"./mcp/worker-transport": {
"types": "./src/mcp/worker-transport.ts",
"default": "./src/mcp/worker-transport.ts"
Expand Down
93 changes: 93 additions & 0 deletions packages/hosts/cloudflare/src/key-value-store.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { describe, expect, it } from "@effect/vitest";
import { Effect } from "effect";
import type { KVNamespace } from "@cloudflare/workers-types";

import { makeCloudflareKeyValueStore } from "./key-value-store";

const makeFakeKv = (
pageSize: number,
): {
readonly kv: KVNamespace;
readonly values: Map<string, string>;
readonly maxConcurrentDeletes: () => number;
} => {
const values = new Map<string, string>();
let activeDeletes = 0;
let maxActiveDeletes = 0;

// oxlint-disable-next-line executor/no-double-cast -- test double: only the KV slice the adapter calls is implemented
const kv = {
get: async (key: string) => values.get(key) ?? null,
put: async (key: string, value: string) => {
values.set(key, value);
},
delete: async (key: string) => {
activeDeletes += 1;
maxActiveDeletes = Math.max(maxActiveDeletes, activeDeletes);
await new Promise((resolve) => setTimeout(resolve, 0));
values.delete(key);
activeDeletes -= 1;
},
list: async (options?: { readonly cursor?: string }) => {
const offset = options?.cursor === undefined ? 0 : Number(options.cursor);
const keys = [...values.keys()].sort().slice(offset, offset + pageSize);
const nextOffset = offset + pageSize;
const listComplete = nextOffset >= values.size;

return {
keys: keys.map((name) => ({ name })),
list_complete: listComplete,
cursor: listComplete ? "" : String(nextOffset),
};
},
} as unknown as KVNamespace;

return {
kv,
values,
maxConcurrentDeletes: () => maxActiveDeletes,
};
};

describe("makeCloudflareKeyValueStore", () => {
it.effect("round-trips string values", () =>
Effect.gen(function* () {
const { kv } = makeFakeKv(10);
const store = makeCloudflareKeyValueStore(kv);

yield* store.set("a", "value");
expect(yield* store.get("a")).toBe("value");

yield* store.remove("a");
expect(yield* store.get("a")).toBeUndefined();
}),
);

it.effect("counts paginated keys", () =>
Effect.gen(function* () {
const { values, kv } = makeFakeKv(2);
values.set("a", "1");
values.set("b", "2");
values.set("c", "3");

const store = makeCloudflareKeyValueStore(kv);
expect(yield* store.size).toBe(3);
}),
);

it.effect("clears paginated keys in bounded parallel batches", () =>
Effect.gen(function* () {
const { values, kv, maxConcurrentDeletes } = makeFakeKv(25);
for (let index = 0; index < 75; index += 1) {
values.set(`key-${index.toString().padStart(2, "0")}`, String(index));
}

const store = makeCloudflareKeyValueStore(kv);
yield* store.clear;

expect(values.size).toBe(0);
expect(maxConcurrentDeletes()).toBeGreaterThan(1);
expect(maxConcurrentDeletes()).toBeLessThanOrEqual(50);
}),
);
});
Loading
Loading