From c27077ab9ed19b6084781226369215949b9c5ed1 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 17:46:08 +0530 Subject: [PATCH 1/2] feat(sdk): add cache primitive --- .changeset/polite-caches-pull.md | 5 + .../core/api/src/server/scoped-executor.ts | 6 ++ packages/core/sdk/src/executor.ts | 58 ++++++++++++ packages/core/sdk/src/test-config.ts | 3 + packages/hosts/cloudflare/package.json | 4 + .../cloudflare/src/key-value-store.test.ts | 93 +++++++++++++++++++ .../hosts/cloudflare/src/key-value-store.ts | 66 +++++++++++++ 7 files changed, 235 insertions(+) create mode 100644 .changeset/polite-caches-pull.md create mode 100644 packages/hosts/cloudflare/src/key-value-store.test.ts create mode 100644 packages/hosts/cloudflare/src/key-value-store.ts diff --git a/.changeset/polite-caches-pull.md b/.changeset/polite-caches-pull.md new file mode 100644 index 000000000..0b0f1c4e9 --- /dev/null +++ b/.changeset/polite-caches-pull.md @@ -0,0 +1,5 @@ +--- +"executor": patch +--- + +Add a host-provided cache primitive to the SDK executor surface. Hosts can now pass an Effect KeyValueStore to `createExecutor`, while executors without one use an in-memory fallback. diff --git a/packages/core/api/src/server/scoped-executor.ts b/packages/core/api/src/server/scoped-executor.ts index cd48cf0c8..4e0c675c3 100644 --- a/packages/core/api/src/server/scoped-executor.ts +++ b/packages/core/api/src/server/scoped-executor.ts @@ -32,6 +32,7 @@ // --------------------------------------------------------------------------- import { Context, Effect, Option } from "effect"; +import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; import { createExecutor, @@ -205,6 +206,7 @@ export const makeScopedExecutor = < const { db, blobs } = yield* DbProvider; const { plugins: pluginsFactory } = yield* PluginsProvider; const config = yield* HostConfig; + const cache = yield* Effect.serviceOption(KeyValueStore.KeyValueStore); // Explicit config wins; otherwise fall back to the request origin if a host // provided one (HTTP middleware / MCP session DO). Stays `undefined` for // non-request callers — `coreTools.webBaseUrl` is optional and only the @@ -255,6 +257,10 @@ export const makeScopedExecutor = < subject: Subject.make(accountId), db, blobs, + ...Option.match(cache, { + onNone: () => ({}), + onSome: (store) => ({ cache: store }), + }), plugins, httpClientLayer, fetch: hostedFetch, diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 38e12a3c6..d6969c75e 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -1,4 +1,5 @@ import { Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect"; +import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; import { FetchHttpClient, type HttpClient } from "effect/unstable/http"; import { fumadb } from "@executor-js/fumadb"; import { memoryAdapter } from "@executor-js/fumadb/adapters/memory"; @@ -319,6 +320,8 @@ export type Executor = { ) => Effect.Effect; readonly close: () => Effect.Effect; + + readonly cache: KeyValueStore.KeyValueStore; } & PluginExtensions; export interface ExecutorDb { @@ -346,6 +349,7 @@ export interface ExecutorConfig isStorageFailure(cause) ? cause : new StorageError({ message, cause }); +const MEMORY_CACHE_CAPACITY = 2_048; +const MEMORY_CACHE_TTL_MS = 10 * 60 * 1000; + +const makeMemoryCacheStore = (): KeyValueStore.KeyValueStore => { + const rows = new Map(); + const evictExpired = (now: number): void => { + for (const [key, entry] of rows) { + if (entry.expiresAt <= now) rows.delete(key); + } + }; + const evictCapacity = (): void => { + while (rows.size > MEMORY_CACHE_CAPACITY) { + const oldest = rows.keys().next().value; + if (oldest === undefined) break; + rows.delete(oldest); + } + }; + return KeyValueStore.makeStringOnly({ + get: (key) => + Effect.sync(() => { + const now = Date.now(); + const entry = rows.get(key); + if (entry === undefined) return undefined; + if (entry.expiresAt <= now) { + rows.delete(key); + return undefined; + } + rows.delete(key); + rows.set(key, entry); + return entry.value; + }), + set: (key, value) => + Effect.sync(() => { + const now = Date.now(); + evictExpired(now); + rows.set(key, { value, expiresAt: now + MEMORY_CACHE_TTL_MS }); + evictCapacity(); + }), + remove: (key) => + Effect.sync(() => { + rows.delete(key); + }), + clear: Effect.sync(() => { + rows.clear(); + }), + size: Effect.sync(() => { + evictExpired(Date.now()); + return rows.size; + }), + }); +}; + const pluginStorageFailure = (pluginId: string, hook: string, cause: unknown): StorageFailure => storageFailureFromUnknown(`${hook} failed for plugin ${pluginId}`, cause); @@ -1277,6 +1333,7 @@ export const createExecutor = (effect: Effect.Effect) => fuma.transaction(effect); // Populated once, never mutated after startup. @@ -3299,6 +3356,7 @@ export const createExecutor = => value as Executor; diff --git a/packages/core/sdk/src/test-config.ts b/packages/core/sdk/src/test-config.ts index 7aba38ff0..8f8f5989f 100644 --- a/packages/core/sdk/src/test-config.ts +++ b/packages/core/sdk/src/test-config.ts @@ -1,4 +1,5 @@ import { Context, Effect, Layer } from "effect"; +import type * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; import { withQueryContext } from "@executor-js/fumadb/query"; import { collectTables, createExecutor, type Executor, type ExecutorConfig } from "./executor"; import type { FumaDb } from "./fuma-runtime"; @@ -118,6 +119,7 @@ export type TestConfigOptions["coreTools"]; + readonly cache?: KeyValueStore.KeyValueStore; /** Override the OAuth callback URL. Pass `null` to construct an executor with * no OAuth callback (exercises the fail-loud redirect path). */ readonly redirectUri?: string | null; @@ -155,6 +157,7 @@ export const makeTestConfig = ; + readonly maxConcurrentDeletes: () => number; +} => { + const values = new Map(); + let activeDeletes = 0; + let maxActiveDeletes = 0; + + // oxlint-disable-next-line executor/no-double-cast -- test double: only the KV slice the adapter calls is implemented + const kv = { + get: async (key: string) => values.get(key) ?? null, + put: async (key: string, value: string) => { + values.set(key, value); + }, + delete: async (key: string) => { + activeDeletes += 1; + maxActiveDeletes = Math.max(maxActiveDeletes, activeDeletes); + await new Promise((resolve) => setTimeout(resolve, 0)); + values.delete(key); + activeDeletes -= 1; + }, + list: async (options?: { readonly cursor?: string }) => { + const offset = options?.cursor === undefined ? 0 : Number(options.cursor); + const keys = [...values.keys()].sort().slice(offset, offset + pageSize); + const nextOffset = offset + pageSize; + const listComplete = nextOffset >= values.size; + + return { + keys: keys.map((name) => ({ name })), + list_complete: listComplete, + cursor: listComplete ? "" : String(nextOffset), + }; + }, + } as unknown as KVNamespace; + + return { + kv, + values, + maxConcurrentDeletes: () => maxActiveDeletes, + }; +}; + +describe("makeCloudflareKeyValueStore", () => { + it.effect("round-trips string values", () => + Effect.gen(function* () { + const { kv } = makeFakeKv(10); + const store = makeCloudflareKeyValueStore(kv); + + yield* store.set("a", "value"); + expect(yield* store.get("a")).toBe("value"); + + yield* store.remove("a"); + expect(yield* store.get("a")).toBeUndefined(); + }), + ); + + it.effect("counts paginated keys", () => + Effect.gen(function* () { + const { values, kv } = makeFakeKv(2); + values.set("a", "1"); + values.set("b", "2"); + values.set("c", "3"); + + const store = makeCloudflareKeyValueStore(kv); + expect(yield* store.size).toBe(3); + }), + ); + + it.effect("clears paginated keys in bounded parallel batches", () => + Effect.gen(function* () { + const { values, kv, maxConcurrentDeletes } = makeFakeKv(25); + for (let index = 0; index < 75; index += 1) { + values.set(`key-${index.toString().padStart(2, "0")}`, String(index)); + } + + const store = makeCloudflareKeyValueStore(kv); + yield* store.clear; + + expect(values.size).toBe(0); + expect(maxConcurrentDeletes()).toBeGreaterThan(1); + expect(maxConcurrentDeletes()).toBeLessThanOrEqual(50); + }), + ); +}); diff --git a/packages/hosts/cloudflare/src/key-value-store.ts b/packages/hosts/cloudflare/src/key-value-store.ts new file mode 100644 index 000000000..9737d8273 --- /dev/null +++ b/packages/hosts/cloudflare/src/key-value-store.ts @@ -0,0 +1,66 @@ +import { Effect, Layer } from "effect"; +import * as KeyValueStore from "effect/unstable/persistence/KeyValueStore"; +import type { KVNamespace } from "@cloudflare/workers-types"; + +const KV_DELETE_CONCURRENCY = 50; + +const storeError = (method: string, key: string | undefined, cause: unknown) => + new KeyValueStore.KeyValueStoreError({ + method, + key, + message: `Cloudflare KV ${method} failed`, + cause, + }); + +const listKeys = async (kv: KVNamespace): Promise> => { + const keys: Array = []; + let cursor: string | undefined; + + do { + const page = await kv.list({ cursor }); + keys.push(...page.keys.map((key) => key.name)); + cursor = page.list_complete ? undefined : page.cursor; + } while (cursor !== undefined); + + return keys; +}; + +const deleteKeys = async (kv: KVNamespace, keys: ReadonlyArray): Promise => { + for (let index = 0; index < keys.length; index += KV_DELETE_CONCURRENCY) { + await Promise.all( + keys.slice(index, index + KV_DELETE_CONCURRENCY).map((key) => kv.delete(key)), + ); + } +}; + +export const makeCloudflareKeyValueStore = (kv: KVNamespace): KeyValueStore.KeyValueStore => + KeyValueStore.makeStringOnly({ + get: (key) => + Effect.tryPromise({ + try: async () => (await kv.get(key)) ?? undefined, + catch: (cause) => storeError("get", key, cause), + }), + set: (key, value) => + Effect.tryPromise({ + try: () => kv.put(key, value), + catch: (cause) => storeError("set", key, cause), + }), + remove: (key) => + Effect.tryPromise({ + try: () => kv.delete(key), + catch: (cause) => storeError("remove", key, cause), + }), + clear: Effect.tryPromise({ + try: async () => deleteKeys(kv, await listKeys(kv)), + catch: (cause) => storeError("clear", undefined, cause), + }), + size: Effect.tryPromise({ + try: async () => (await listKeys(kv)).length, + catch: (cause) => storeError("size", undefined, cause), + }), + }); + +export const layerCloudflareKeyValueStore = ( + kv: KVNamespace, +): Layer.Layer => + Layer.succeed(KeyValueStore.KeyValueStore, makeCloudflareKeyValueStore(kv)); From a71371c8ceeb97db8c6c342bf8cf092174e621fe Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 17:59:31 +0530 Subject: [PATCH 2/2] fix(sdk): refresh cache writes in LRU order (greptile) --- packages/core/sdk/src/executor-cache.test.ts | 92 ++++++++++++++++++++ packages/core/sdk/src/executor.ts | 1 + 2 files changed, 93 insertions(+) create mode 100644 packages/core/sdk/src/executor-cache.test.ts diff --git a/packages/core/sdk/src/executor-cache.test.ts b/packages/core/sdk/src/executor-cache.test.ts new file mode 100644 index 000000000..1acaf8318 --- /dev/null +++ b/packages/core/sdk/src/executor-cache.test.ts @@ -0,0 +1,92 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect } from "effect"; + +import { createExecutor, type Executor } from "./executor"; +import { Tenant } from "./ids"; + +const MEMORY_CACHE_CAPACITY = 2_048; +const MEMORY_CACHE_TTL_MS = 10 * 60 * 1000; + +const makeExecutor = Effect.acquireRelease( + createExecutor({ + tenant: Tenant.make("test-tenant"), + onElicitation: "accept-all", + }), + (executor) => executor.close().pipe(Effect.ignore), +); + +const withFakeNow = ( + initialNow: number, + run: (clock: { readonly advance: (ms: number) => void }) => Effect.Effect, +): Effect.Effect => + Effect.acquireUseRelease( + Effect.sync(() => { + const originalNow = Date.now; + let now = initialNow; + Date.now = () => now; + return { + advance: (ms: number) => { + now += ms; + }, + restore: () => { + Date.now = originalNow; + }, + }; + }), + (clock) => run({ advance: clock.advance }), + (clock) => Effect.sync(clock.restore), + ); + +describe("executor cache", () => { + it.effect("uses an in-memory fallback when no cache is configured", () => + Effect.scoped( + Effect.gen(function* () { + const executor = yield* makeExecutor; + + yield* executor.cache.set("a", "value"); + expect(yield* executor.cache.get("a")).toBe("value"); + + yield* executor.cache.remove("a"); + expect(yield* executor.cache.get("a")).toBeUndefined(); + }), + ), + ); + + it.effect("expires fallback entries by TTL on get and size", () => + withFakeNow(1_000, ({ advance }) => + Effect.scoped( + Effect.gen(function* () { + const executor = yield* makeExecutor; + + yield* executor.cache.set("a", "value"); + expect(yield* executor.cache.size).toBe(1); + + advance(MEMORY_CACHE_TTL_MS); + + expect(yield* executor.cache.get("a")).toBeUndefined(); + expect(yield* executor.cache.size).toBe(0); + }), + ), + ), + ); + + it.effect("refreshes fallback LRU position when an existing key is written", () => + Effect.scoped( + Effect.gen(function* () { + const executor: Executor = yield* makeExecutor; + + yield* executor.cache.set("a", "old"); + for (let index = 0; index < MEMORY_CACHE_CAPACITY - 1; index += 1) { + yield* executor.cache.set(`key-${index}`, String(index)); + } + + yield* executor.cache.set("a", "new"); + yield* executor.cache.set("overflow", "value"); + + expect(yield* executor.cache.get("a")).toBe("new"); + expect(yield* executor.cache.get("key-0")).toBeUndefined(); + expect(yield* executor.cache.get("key-1")).toBe("1"); + }), + ), + ); +}); diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index d6969c75e..d4af81eef 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -457,6 +457,7 @@ const makeMemoryCacheStore = (): KeyValueStore.KeyValueStore => { Effect.sync(() => { const now = Date.now(); evictExpired(now); + rows.delete(key); rows.set(key, { value, expiresAt: now + MEMORY_CACHE_TTL_MS }); evictCapacity(); }),