Skip to content

feat(fumadb): add aggregate and keyset query support#1119

Open
aryasaatvik wants to merge 4 commits into
RhysSullivan:mainfrom
aryasaatvik:contrib/fumadb-aggregate-keyset
Open

feat(fumadb): add aggregate and keyset query support#1119
aryasaatvik wants to merge 4 commits into
RhysSullivan:mainfrom
aryasaatvik:contrib/fumadb-aggregate-keyset

Conversation

@aryasaatvik

Copy link
Copy Markdown
Contributor

Summary

  • Add reusable FumaDB JSON-document aggregate primitives and keyset pagination over JSON document columns.
  • Implement the primitives for memory and Drizzle SQLite/Postgres adapters, with shared in-memory semantics for parity and SQLite percentile fallback.
  • Expose the capability through plugin storage as collection.aggregate.{count,groupCount,timeBuckets,stats} and collection.queryKeyset(...).

Changes

  • Added public FumaDB query types for JSON paths, filters, stats, keyset order, and keyset cursors.
  • Added AbstractQuery methods: jsonCount, jsonGroupCount, jsonTimeBuckets, jsonStats, and jsonPage.
  • Added optional ORMAdapter hooks and toORM forwarding that applies table read policies before adapter calls and fails loudly for unsupported adapters.
  • Added memory adapter implementations using shared extraction, coercion, filtering, percentile, and nullable compare helpers.
  • Added Drizzle pushdown for SQLite and Postgres, including JSON path quoting, literal LIKE escaping, SQL NULL-aware filters, continuous Postgres percentiles, SQLite percentile fallback, and null-aware keyset cursor predicates.
  • Added SDK runtime plumbing so plugin storage can call the new FumaDB methods through the safe Fuma query layer.
  • Added plugin storage aggregate and keyset facade types and implementations over declared indexed fields.
  • Added a changeset for the public @executor-js/fumadb and @executor-js/sdk surface.

AST-level Outline

JsonPath = readonly [string, ...string[]]
JsonFilter =
  | { kind: "compare"; path; valueType; operator; value }
  | { kind: "array"; path; valueType; operator; values }
  | { kind: "and"; items }
  | { kind: "or"; items }

AbstractQuery<S> += {
  jsonCount(table, options): Promise<number>
  jsonGroupCount(table, options): Promise<JsonGroupCountRow[]>
  jsonTimeBuckets(table, options): Promise<JsonTimeBucketRow[]>
  jsonStats(table, options): Promise<JsonStats>
  jsonPage(table, options): Promise<Row[]>
}

ORMAdapter += optional {
  jsonCount(table, adapterOptions)
  jsonGroupCount(table, adapterOptions)
  jsonTimeBuckets(table, adapterOptions)
  jsonStats(table, adapterOptions)
  jsonPage(table, adapterOptions)
}

PluginStorageCollectionFacade<T> += {
  aggregate: {
    count(input?)
    groupCount(input)
    timeBuckets(input)
    stats(input)
  }
  queryKeyset(input)
}

Call-stack Trace

Direct FumaDB query

client.orm("1.0.0").jsonStats("events", options)
  -> toORM.jsonStats(...)
     -> build public where condition
     -> apply table onRead policies
     -> require adapter jsonStats hook
     -> adapter.jsonStats(table, { column, where, filter, path, percentiles })
        -> memory: shared JS evaluator
        -> drizzle: SQL JSON extraction, filters, aggregate, percentile handling

Plugin storage aggregate

ctx.storage.runs.aggregate.groupCount({ field: "status", where })
  -> validate field and where keys are declared indexes
  -> pluginStorageWhereToJsonFilter(where)
  -> core.jsonGroupCount("plugin_storage", {
       column: "data",
       where: plugin id + collection + key prefix,
       filter,
       path: [field]
     })
  -> SafeFumaQuery forwards to FumaDB query
  -> table policies add owner and tenant scoping
  -> adapter pushdown returns aggregate rows

Keyset page

ctx.storage.runs.queryKeyset({ orderBy, cursor, limit })
  -> validate indexed order fields and limit
  -> core.jsonPage("plugin_storage", {
       column: "data",
       orderBy: orderBy.map(field -> JSON path order),
       keyColumn: "key",
       keyDirection: first order direction,
       cursor,
       limit
     })
  -> adapter applies JSON filter and real-column scope
  -> adapter orders by JSON paths plus key tiebreak
  -> adapter applies null-aware cursor boundary
  -> facade builds nextCursor from the last returned entry

Usage Pseudocode

const runs = definePluginStorageCollection("runs", RunSchema, {
  indexes: ["status", "startedAt", "durationMs"],
});

const byStatus = yield* ctx.storage.runs.aggregate.groupCount({
  field: "status",
});

const latency = yield* ctx.storage.runs.aggregate.stats({
  field: "durationMs",
  percentiles: [0.5, 0.95],
});

const page = yield* ctx.storage.runs.queryKeyset({
  where: { status: "completed" },
  orderBy: [{ field: "startedAt", direction: "desc", valueType: "number" }],
  limit: 50,
});

Tests

  • bun run bootstrap
  • bun run --cwd packages/core/fumadb test -- src/query/aggregate.test.ts src/query/table-policy.test.ts
  • bun run --cwd packages/core/sdk test -- src/plugin-storage-aggregate.test.ts src/plugin-storage.test.ts
  • bun run --cwd packages/core/fumadb typecheck
  • bun run --cwd packages/core/sdk typecheck
  • bunx oxfmt --check .changeset/fumadb-json-pushdown.md packages/core/fumadb/src/adapters/drizzle/query.ts packages/core/fumadb/src/adapters/memory/index.ts packages/core/fumadb/src/query/aggregate-eval.ts packages/core/fumadb/src/query/aggregate.test.ts packages/core/fumadb/src/query/aggregate.ts packages/core/fumadb/src/query/index.ts packages/core/fumadb/src/query/orm/index.ts packages/core/fumadb/src/query/table-policy.test.ts packages/core/sdk/src/executor.ts packages/core/sdk/src/fuma-runtime.ts packages/core/sdk/src/plugin-storage-aggregate.test.ts packages/core/sdk/src/plugin-storage.ts packages/core/sdk/src/test-config.ts
  • bunx oxlint -c .oxlintrc.jsonc --deny-warnings .changeset/fumadb-json-pushdown.md packages/core/fumadb/src/adapters/drizzle/query.ts packages/core/fumadb/src/adapters/memory/index.ts packages/core/fumadb/src/query/aggregate-eval.ts packages/core/fumadb/src/query/aggregate.test.ts packages/core/fumadb/src/query/aggregate.ts packages/core/fumadb/src/query/index.ts packages/core/fumadb/src/query/orm/index.ts packages/core/fumadb/src/query/table-policy.test.ts packages/core/sdk/src/executor.ts packages/core/sdk/src/fuma-runtime.ts packages/core/sdk/src/plugin-storage-aggregate.test.ts packages/core/sdk/src/plugin-storage.ts packages/core/sdk/src/test-config.ts
  • bun run lint:changelog-stubs
  • git diff HEAD --check

Root bun run format:check and bun run lint currently fail on preexisting .scratchpad files outside this PR. The touched-file format and lint checks above pass.

Deferred Scope

This PR intentionally does not include execution-history UI, runs API, semantic search, tool manifest, cache primitive work, or bulk plugin-storage operations.

Add reusable JSON-document aggregate and keyset pagination primitives across FumaDB memory and Drizzle adapters.

Expose the pushdown through plugin storage aggregate and queryKeyset facades with focused coverage for SQLite parity, null handling, path escaping, policy scoping, and unsupported adapters.
@greptile-apps

greptile-apps Bot commented Jun 24, 2026

Copy link
Copy Markdown

Greptile Summary

This PR adds SQL-pushed JSON-document aggregate primitives and keyset pagination to FumaDB, exposing them through the plugin storage SDK facade. All previous review concerns (empty or filter semantics, explicit operator mapping, SQLite percentile documentation) have been addressed in this revision.

  • FumaDB core: New jsonCount, jsonGroupCount, jsonTimeBuckets, jsonStats, and jsonPage methods on AbstractQuery, backed by shared in-memory evaluation helpers and Drizzle SQLite/Postgres pushdown with null-aware keyset cursor predicates and dialect-specific JSON path quoting/LIKE escaping.
  • Plugin storage facade: New aggregate.{count,groupCount,timeBuckets,stats} and queryKeyset(...) methods on collection facades, translating indexed-field where conditions into JsonFilter predicates while table-read policies enforce owner/tenant scoping before every adapter call.
  • Test coverage: Memory and SQLite harnesses exercising count, group count, time buckets, stats, keyset pagination with null sort values, LIKE-wildcard parity, path-quoting edge cases, and policy enforcement for the new operations.

Confidence Score: 5/5

Safe to merge — all five new query operations apply table read policies before reaching the adapter, and the empty-or / explicit-operator issues from the previous review are resolved.

The implementation is correct and well-tested across memory, SQLite, and policy harnesses. The null-aware keyset cursor logic is exercised end-to-end (including the nullable-sort-column truncation regression). JSON path quoting, LIKE escaping, and empty composite filter semantics all have explicit parity tests between the memory and Drizzle adapters.

No files require special attention. The most complex file (drizzle/query.ts) is fully covered by the aggregate.test.ts harness for SQLite, and the Postgres-specific paths (percentile_cont, nulls last/first) are intentionally deferred to a future Postgres test harness as noted in the PR description.

Important Files Changed

Filename Overview
packages/core/fumadb/src/adapters/drizzle/query.ts Adds jsonCount/jsonGroupCount/jsonTimeBuckets/jsonStats/jsonPage adapter hooks with dialect-aware JSON path quoting, LIKE wildcard escaping with correct ESCAPE clause, null-aware keyset cursor predicates, and Postgres percentile_cont pushdown. Empty or filter correctly returns 1 = 0.
packages/core/fumadb/src/query/orm/index.ts Adds compileScopedWhere helper that applies read policies before every new JSON operation, requireJsonOp that throws clearly for unsupported adapters. The scopedWhere === false short-circuit correctly returns zero/empty results when a policy statically denies access.
packages/core/sdk/src/executor.ts Adds CoreDb plumbing for the five JSON operations, pluginStorageWhereToJsonFilter translation, pluginStorageInvalidLimitError guard, and the queryKeyset/aggregate facade implementations. nextCursor is correctly set only when entries.length >= limit. pluginStorageJsonCompareOperators is now an explicit mapping.
packages/core/fumadb/src/query/aggregate-eval.ts New file with pure in-memory evaluation helpers: extractJsonPath, coerceJsonValue, matchesJsonFilter, bucketFloor, computePercentiles (Postgres percentile_cont linear interpolation), and compareNullableAscending.
packages/core/fumadb/src/adapters/memory/index.ts Adds all five JSON operations using the shared aggregate-eval helpers. The jsonPage implementation correctly applies null-aware compareNullableAscending sort and cursor filter, consistent with the Drizzle adapter's SQL null ordering semantics.
packages/core/fumadb/src/query/aggregate.ts New public type definitions for all JSON aggregate and keyset pagination primitives, split into adapter-facing options and public options. Clean separation of concerns.
packages/core/fumadb/src/query/aggregate.test.ts New test file running the full aggregate + keyset suite against memory and SQLite harnesses, covering null SQL three-valued logic, empty composite filters, nullable sort column keyset pagination, LIKE wildcard parity, and path-quoting edge cases.
packages/core/sdk/src/plugin-storage-aggregate.test.ts New integration test running the full plugin storage aggregate + keyset suite against the SQLite pushdown path, including multi-page keyset cursor pagination with a JSON filter.
packages/core/fumadb/src/query/table-policy.test.ts Extended with a test confirming read policies are applied before all JSON aggregate and keyset operations, and that a denied context correctly returns zero count and empty page results.
packages/core/sdk/src/plugin-storage.ts Adds public facade types: PluginStorageAggregateFilter, PluginStorageQueryKeysetInput, PluginStorageKeysetPage, and PluginStorageAggregateFacade. queryKeyset and aggregate added to PluginStorageCollectionFacade.
packages/core/sdk/src/fuma-runtime.ts Adds the five new JSON operation forwards to makeSafeFumaQuery, consistent with the existing operation forwarding pattern.

Sequence Diagram

%%{init: {'theme': 'neutral'}}%%
sequenceDiagram
    participant Plugin as Plugin Code
    participant Facade as PluginStorageFacade
    participant CoreDb as CoreDb
    participant ORM as toORM (orm/index.ts)
    participant Adapter as Drizzle/Memory Adapter

    Plugin->>Facade: "storage.runs.aggregate.groupCount({ field, where })"
    Facade->>Facade: validate indexed fields
    Facade->>Facade: pluginStorageWhereToJsonFilter(where)
    Facade->>CoreDb: "jsonGroupCount(plugin_storage, { column, where, filter })"
    CoreDb->>ORM: jsonGroupCount(name, options)
    ORM->>ORM: requireJsonOp + compileScopedWhere
    ORM->>ORM: applyReadPolicies
    ORM->>Adapter: "jsonGroupCount(table, { column, where, filter, path })"
    Adapter-->>ORM: JsonGroupCountRow[]
    ORM-->>Facade: JsonGroupCountRow[]
    Facade-->>Plugin: PluginStorageGroupCount[]

    Plugin->>Facade: "storage.runs.queryKeyset({ orderBy, cursor, limit })"
    Facade->>Facade: validate indexed fields + limit
    Facade->>CoreDb: "jsonPage(plugin_storage, { orderBy, keyColumn, cursor, limit })"
    CoreDb->>ORM: jsonPage(name, options)
    ORM->>ORM: requireJsonOp + compileScopedWhere
    ORM->>ORM: applyReadPolicies
    ORM->>Adapter: "jsonPage(table, { orderBy, keyColumn, keyDirection, cursor, limit })"
    Note over Adapter: null-aware cursor predicate OR-terms
    Adapter-->>ORM: Row[]
    ORM-->>Facade: Row[]
    Facade->>Facade: build nextCursor from last entry
    Facade-->>Plugin: "{ entries, nextCursor }"
Loading
%%{init: {'theme': 'base', 'themeVariables': {"darkMode": true, "background": "#0d1117", "primaryColor": "#21262d", "primaryTextColor": "#e6edf3", "primaryBorderColor": "#8b949e", "lineColor": "#8b949e", "textColor": "#e6edf3", "edgeLabelBackground": "#161b22", "actorBkg": "#21262d", "actorBorder": "#8b949e", "actorTextColor": "#e6edf3", "actorLineColor": "#8b949e", "signalColor": "#8b949e", "signalTextColor": "#e6edf3", "noteBkgColor": "#373320", "noteBorderColor": "#d4a72c", "noteTextColor": "#f0e6c0", "labelBoxBkgColor": "#21262d", "labelBoxBorderColor": "#8b949e", "labelTextColor": "#e6edf3", "loopTextColor": "#e6edf3", "activationBkgColor": "#30363d", "activationBorderColor": "#8b949e"}}}%%
sequenceDiagram
    participant Plugin as Plugin Code
    participant Facade as PluginStorageFacade
    participant CoreDb as CoreDb
    participant ORM as toORM (orm/index.ts)
    participant Adapter as Drizzle/Memory Adapter

    Plugin->>Facade: "storage.runs.aggregate.groupCount({ field, where })"
    Facade->>Facade: validate indexed fields
    Facade->>Facade: pluginStorageWhereToJsonFilter(where)
    Facade->>CoreDb: "jsonGroupCount(plugin_storage, { column, where, filter })"
    CoreDb->>ORM: jsonGroupCount(name, options)
    ORM->>ORM: requireJsonOp + compileScopedWhere
    ORM->>ORM: applyReadPolicies
    ORM->>Adapter: "jsonGroupCount(table, { column, where, filter, path })"
    Adapter-->>ORM: JsonGroupCountRow[]
    ORM-->>Facade: JsonGroupCountRow[]
    Facade-->>Plugin: PluginStorageGroupCount[]

    Plugin->>Facade: "storage.runs.queryKeyset({ orderBy, cursor, limit })"
    Facade->>Facade: validate indexed fields + limit
    Facade->>CoreDb: "jsonPage(plugin_storage, { orderBy, keyColumn, cursor, limit })"
    CoreDb->>ORM: jsonPage(name, options)
    ORM->>ORM: requireJsonOp + compileScopedWhere
    ORM->>ORM: applyReadPolicies
    ORM->>Adapter: "jsonPage(table, { orderBy, keyColumn, keyDirection, cursor, limit })"
    Note over Adapter: null-aware cursor predicate OR-terms
    Adapter-->>ORM: Row[]
    ORM-->>Facade: Row[]
    Facade->>Facade: build nextCursor from last entry
    Facade-->>Plugin: "{ entries, nextCursor }"
Loading

Reviews (3): Last reviewed commit: "fix(fumadb): preserve empty or filter se..." | Re-trigger Greptile

Comment thread packages/core/sdk/src/executor.ts Outdated
Comment thread packages/core/fumadb/src/adapters/drizzle/query.ts
Comment thread packages/core/fumadb/src/adapters/drizzle/query.ts
Comment thread packages/core/fumadb/src/query/aggregate.test.ts
Comment thread packages/core/fumadb/src/adapters/drizzle/query.ts
@aryasaatvik

Copy link
Copy Markdown
Contributor Author

Additional downstream context from my fork: the FumaDB aggregate/keyset primitive is what lets an execution-history read model stay queryable without pulling whole plugin-storage collections into JS.

High-level shape:

plugin read model
  -> collection.aggregate.*
  -> collection.queryKeyset(...)
  -> plugin_storage JSON pushdown
  -> adapter aggregate/page query
  -> UI/API list response

AST outline of downstream usage:

executionHistoryStore.list(options)
  -> computeMeta(options)
     -> aggregate.count(...)
     -> aggregate.groupCount(...)
     -> aggregate.stats(...)
     -> aggregate.timeBuckets(...)
  -> runsC.queryKeyset(...)
  -> return { runs, nextCursor, meta }

Fork permalinks:

Call stack:

GET /execution-history/runs
  -> executionHistory.list(options)
  -> runsC.aggregate.count/groupCount/stats/timeBuckets
  -> runsC.queryKeyset({ where, orderBy, cursor, limit })
  -> FumaDB json* methods
  -> memory or SQL adapter pushdown

This PR stays focused on reusable query capability. If it lands, execution history, metrics dashboards, and other plugin-owned read models can build richer list and facet APIs without adding bespoke SQL per plugin.

aryasaatvik added a commit to aryasaatvik/executor that referenced this pull request Jun 26, 2026
## Summary

Mirrors the review-hardening deltas from upstream
[RhysSullivan#1119](RhysSullivan#1119)
onto `dev`.

`dev` already contains the broader FumaDB aggregate and keyset query
feature, so this PR only carries the remaining drift from the upstream
review loop.

## Changes

- Preserve memory and Drizzle parity for empty composite filters by
compiling empty JSON `or` filters to a constant false SQL predicate.
- Add regression coverage for empty `or` and empty `and` filters across
the aggregate harness.
- Replace nested plugin-storage operator selection with an explicit
typed JSON compare-operator map.
- Document SQLite percentile behavior on the public FumaDB and
plugin-storage stats inputs.

## Intentional Differences From Upstream RhysSullivan#1119

- This PR does not re-add the full aggregate and keyset implementation
because `dev` already has that feature surface.
- This PR does not touch the OpenAPI storage facade mock because `dev`
already has the mock shape needed by the expanded collection facade.
- This PR has no changeset because it only mirrors fixes to an existing
`dev` feature surface.

## Tests

- `bun run bootstrap`
- `bun run --cwd packages/core/fumadb test --
src/query/aggregate.test.ts src/query/table-policy.test.ts`
- `bun run --cwd packages/core/sdk test --
src/plugin-storage-aggregate.test.ts src/plugin-storage.test.ts`
- `bun run --cwd packages/core/fumadb typecheck`
- `bun run --cwd packages/core/sdk typecheck`
- `bun run typecheck`
- `./node_modules/.bin/oxfmt --check
packages/core/fumadb/src/adapters/drizzle/query.ts
packages/core/fumadb/src/query/aggregate.test.ts
packages/core/fumadb/src/query/aggregate.ts
packages/core/sdk/src/executor.ts
packages/core/sdk/src/plugin-storage.ts`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant