From 6ff216c8c3d68c2e5bdb934377326e61a28b3aeb Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 17:46:50 +0530 Subject: [PATCH 1/4] feat(fumadb): add aggregate and keyset query support Add reusable JSON-document aggregate and keyset pagination primitives across FumaDB memory and Drizzle adapters. Expose the pushdown through plugin storage aggregate and queryKeyset facades with focused coverage for SQLite parity, null handling, path escaping, policy scoping, and unsupported adapters. --- .changeset/fumadb-json-pushdown.md | 16 + .../core/fumadb/src/adapters/drizzle/query.ts | 296 +++++++++++++ .../core/fumadb/src/adapters/memory/index.ts | 103 +++++ .../core/fumadb/src/query/aggregate-eval.ts | 151 +++++++ .../core/fumadb/src/query/aggregate.test.ts | 388 ++++++++++++++++++ packages/core/fumadb/src/query/aggregate.ts | 168 ++++++++ packages/core/fumadb/src/query/index.ts | 46 +++ packages/core/fumadb/src/query/orm/index.ts | 119 ++++++ .../fumadb/src/query/table-policy.test.ts | 111 +++++ packages/core/sdk/src/executor.ts | 302 +++++++++++++- packages/core/sdk/src/fuma-runtime.ts | 5 + .../sdk/src/plugin-storage-aggregate.test.ts | 200 +++++++++ packages/core/sdk/src/plugin-storage.ts | 106 +++++ packages/core/sdk/src/test-config.ts | 5 + 14 files changed, 2015 insertions(+), 1 deletion(-) create mode 100644 .changeset/fumadb-json-pushdown.md create mode 100644 packages/core/fumadb/src/query/aggregate-eval.ts create mode 100644 packages/core/fumadb/src/query/aggregate.test.ts create mode 100644 packages/core/fumadb/src/query/aggregate.ts create mode 100644 packages/core/sdk/src/plugin-storage-aggregate.test.ts diff --git a/.changeset/fumadb-json-pushdown.md b/.changeset/fumadb-json-pushdown.md new file mode 100644 index 000000000..2f728d491 --- /dev/null +++ b/.changeset/fumadb-json-pushdown.md @@ -0,0 +1,16 @@ +--- +"@executor-js/fumadb": minor +"@executor-js/sdk": minor +--- + +Push JSON-document aggregation and keyset pagination down to SQL. FumaDB's query +layer gains `jsonCount`, `jsonGroupCount`, `jsonTimeBuckets`, `jsonStats` +(min/max + continuous percentiles), and `jsonPage` (cursor pagination) over a +JSON document column, implemented natively for the memory and Drizzle +(SQLite/Postgres) adapters and failing loudly on adapters that don't support +them. Plugin storage exposes this as `collection.aggregate.{count,groupCount, +timeBuckets,stats}` and `collection.queryKeyset(...)`, translating a +collection's indexed-field `where` into JSON-path predicates while owner/tenant +scoping stays enforced by the storage policy. Aggregates and pages are computed +in the database instead of by fetching the whole collection and reducing in +memory. diff --git a/packages/core/fumadb/src/adapters/drizzle/query.ts b/packages/core/fumadb/src/adapters/drizzle/query.ts index 9471ecda0..82f3a81e3 100644 --- a/packages/core/fumadb/src/adapters/drizzle/query.ts +++ b/packages/core/fumadb/src/adapters/drizzle/query.ts @@ -1,6 +1,13 @@ import * as Drizzle from "drizzle-orm"; import type * as PostgreSQL from "drizzle-orm/pg-core"; import type { AbstractQuery, FindManyOptions } from "../../query"; +import type { + JsonFilter, + JsonPath, + JsonScalar, + JsonValueType, +} from "../../query/aggregate"; +import { coerceJsonValue, computePercentiles } from "../../query/aggregate-eval"; import { type Condition, ConditionType } from "../../query/condition-builder"; import { type SimplifyFindOptions, toORM } from "../../query/orm"; import { @@ -263,6 +270,119 @@ export function fromDrizzle( return out; } + // --- JSON-document aggregation helpers ---------------------------------- + // Extract a JSON path as a typed SQL expression. SQLite `json_extract` is + // natively typed; Postgres `#>>` returns text and needs explicit casts. + function jsonExtractSql( + jsonColumn: ColumnType, + path: JsonPath, + valueType: JsonValueType, + ): Drizzle.SQL { + if (provider === "postgresql") { + // Build a `text[]` array literal with each segment double-quoted so a + // segment containing a comma (the array element separator) descends the + // intended key instead of splitting into two. Inside a quoted element a + // backslash and a double-quote must be backslash-escaped. + const pgPath = `{${path + .map((segment) => `"${segment.replace(/\\/g, "\\\\").replace(/"/g, '\\"')}"`) + .join(",")}}`; + const text = Drizzle.sql`(${jsonColumn} #>> ${pgPath})`; + if (valueType === "number") return Drizzle.sql`${text}::numeric`; + if (valueType === "boolean") return Drizzle.sql`${text}::boolean`; + return text; + } + // Build a quoted SQLite JSON path (`$."seg1"."seg2"`) so a segment + // containing a dot (the path separator) is treated as one key. Inside a + // quoted segment an embedded backslash or double-quote is backslash-escaped. + const jsonPath = `$${path + .map((segment) => `."${segment.replace(/\\/g, "\\\\").replace(/"/g, '\\"')}"`) + .join("")}`; + return Drizzle.sql`json_extract(${jsonColumn}, ${jsonPath})`; + } + + // Escape LIKE wildcards (`%`, `_`) and the escape character itself so the + // value matches literally, matching the memory adapter's + // includes/startsWith/endsWith. Paired with an explicit `escape '\'` clause. + function escapeLikePattern(value: string): string { + return value.replace(/[\\%_]/g, (char) => `\\${char}`); + } + + function jsonCompareSql( + operator: string, + expr: Drizzle.SQL, + value: JsonScalar, + ): Drizzle.SQL { + switch (operator) { + case "=": + return Drizzle.eq(expr, value); + case "!=": + return Drizzle.ne(expr, value); + case ">": + return Drizzle.gt(expr, value); + case ">=": + return Drizzle.gte(expr, value); + case "<": + return Drizzle.lt(expr, value); + case "<=": + return Drizzle.lte(expr, value); + case "contains": { + const escaped = escapeLikePattern(String(value)); + return Drizzle.sql`${expr} like ${`%${escaped}%`} escape '\\'`; + } + case "starts with": { + const escaped = escapeLikePattern(String(value)); + return Drizzle.sql`${expr} like ${`${escaped}%`} escape '\\'`; + } + case "ends with": { + const escaped = escapeLikePattern(String(value)); + return Drizzle.sql`${expr} like ${`%${escaped}`} escape '\\'`; + } + default: + throw new Error(`[FumaDB Drizzle] Unsupported JSON operator: ${operator}`); + } + } + + function buildJsonFilter( + jsonColumn: ColumnType, + filter: JsonFilter, + ): Drizzle.SQL | undefined { + if (filter.kind === "and") { + return Drizzle.and( + ...filter.items.map((item) => buildJsonFilter(jsonColumn, item)), + ); + } + if (filter.kind === "or") { + return Drizzle.or( + ...filter.items.map((item) => buildJsonFilter(jsonColumn, item)), + ); + } + const expr = jsonExtractSql(jsonColumn, filter.path, filter.valueType); + if (filter.kind === "array") { + return filter.operator === "in" + ? Drizzle.inArray(expr, filter.values as unknown[]) + : Drizzle.notInArray(expr, filter.values as unknown[]); + } + return jsonCompareSql(filter.operator, expr, filter.value); + } + + function buildScopedConditions( + jsonColumn: ColumnType, + where: Condition | undefined, + filter: JsonFilter | undefined, + ): Drizzle.SQL | undefined { + const parts: Drizzle.SQL[] = []; + if (where) { + const compiled = buildWhere(toDrizzleColumn, where); + if (compiled) parts.push(compiled); + } + if (filter) { + const compiled = buildJsonFilter(jsonColumn, filter); + if (compiled) parts.push(compiled); + } + if (parts.length === 0) return undefined; + return Drizzle.and(...parts); + } + return toORM({ tables: schema.tables, async count(table, v) { @@ -402,6 +522,182 @@ export function fromDrizzle( await query; }, + async jsonCount(table, { column, where, filter }) { + const conditions = buildScopedConditions(toDrizzleColumn(column), where, filter); + return await db.$count(toDrizzle(table), conditions); + }, + async jsonGroupCount(table, { column, where, filter, path, valueType }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const groupExpr = jsonExtractSql(jsonColumn, path, valueType ?? "text"); + const conditions = buildScopedConditions(jsonColumn, where, filter); + const rows = await db + .select({ value: groupExpr, count: Drizzle.sql`count(*)` }) + .from(drizzleTable) + .where(conditions) + .groupBy(groupExpr); + return rows.map((row) => ({ + value: coerceJsonValue(row.value, valueType ?? "text"), + count: Number(row.count), + })); + }, + async jsonTimeBuckets(table, { column, where, filter, path, bucketMs }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const valueExpr = jsonExtractSql(jsonColumn, path, "number"); + // `value - (value % bucket)` floors to the bucket start without relying + // on integer division (SQLite binds numeric params as REAL, so `/` would + // be float division). Matches `bucketFloor` for non-negative epochs. + const bucketExpr = Drizzle.sql`(${valueExpr} - (${valueExpr} % ${bucketMs}))`; + const conditions = buildScopedConditions(jsonColumn, where, filter); + const rows = await db + .select({ bucket: bucketExpr, count: Drizzle.sql`count(*)` }) + .from(drizzleTable) + .where(conditions) + .groupBy(bucketExpr) + .orderBy(bucketExpr); + return rows.map((row) => ({ bucket: Number(row.bucket), count: Number(row.count) })); + }, + async jsonStats(table, { column, where, filter, path, percentiles }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const valueExpr = jsonExtractSql(jsonColumn, path, "number"); + const conditions = buildScopedConditions(jsonColumn, where, filter); + const aggregate = await db + .select({ + count: Drizzle.sql`count(${valueExpr})`, + min: Drizzle.sql`min(${valueExpr})`, + max: Drizzle.sql`max(${valueExpr})`, + }) + .from(drizzleTable) + .where(conditions); + const summary = aggregate[0]; + const count = Number(summary?.count ?? 0); + if (count === 0) return { count: 0, min: null, max: null, percentiles: [] }; + const min = summary?.min == null ? null : Number(summary.min); + const max = summary?.max == null ? null : Number(summary.max); + const fractions = percentiles ?? []; + if (fractions.length === 0) return { count, min, max, percentiles: [] }; + + if (provider === "postgresql") { + const pctExpr = Drizzle.sql`percentile_cont(array[${Drizzle.sql.join( + fractions.map((fraction) => Drizzle.sql`${fraction}`), + Drizzle.sql`, `, + )}]) within group (order by ${valueExpr})`; + const pctRows = await db + .select({ values: Drizzle.sql`${pctExpr}` }) + .from(drizzleTable) + .where(conditions); + const values = pctRows[0]?.values ?? []; + return { + count, + min, + max, + percentiles: fractions.map((fraction, index) => ({ + fraction, + value: Number(values[index]), + })), + }; + } + + // SQLite has no percentile_cont, so compute over the projected values. + const valueRows = await db + .select({ value: valueExpr }) + .from(drizzleTable) + .where(conditions) + .orderBy(valueExpr); + const sorted = valueRows + .map((row) => row.value) + .filter((value): value is number | string => value != null) + .map((value) => Number(value)) + .filter((value) => !Number.isNaN(value)) + .sort((a, b) => a - b); + return { count, min, max, percentiles: computePercentiles(sorted, fractions) }; + }, + async jsonPage(table, { column, where, filter, orderBy, keyColumn, keyDirection, cursor, limit }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const keyDrizzle = toDrizzleColumn(keyColumn); + const scoped = buildScopedConditions(jsonColumn, where, filter); + + const orderExprs: Drizzle.SQL[] = orderBy.map((entry) => { + const expr = jsonExtractSql(jsonColumn, entry.path, entry.valueType); + // Mirror the memory adapter's null ordering (null first in asc, last in + // desc, see compareNullableAscending). SQLite defaults to this; Postgres + // defaults to the opposite, so make it explicit on both dialects. + return entry.direction === "desc" + ? Drizzle.sql`${expr} desc nulls last` + : Drizzle.sql`${expr} asc nulls first`; + }); + orderExprs.push(keyDirection === "desc" ? Drizzle.desc(keyDrizzle) : Drizzle.asc(keyDrizzle)); + + // Keyset boundary terms with SQL three-valued-logic null handling. A naive + // `expr > NULL` / `expr < NULL` is always unknown, which silently empties + // every page once the cursor row's sort value is null. These mirror + // compareNullableAscending so a nullable sort column paginates correctly. + const eqTerm = (expr: Drizzle.SQL, cv: unknown): Drizzle.SQL => + cv == null ? Drizzle.isNull(expr) : Drizzle.eq(expr, cv); + const strictTerm = ( + expr: Drizzle.SQL, + cv: unknown, + dir: "asc" | "desc", + ): Drizzle.SQL | null => { + if (dir === "asc") { + // null is first: everything non-null is after a null cursor. + return cv == null ? Drizzle.isNotNull(expr) : Drizzle.gt(expr, cv); + } + // desc, null last: nothing is strictly after a null cursor on this field; + // and null rows fall after any non-null cursor value. + if (cv == null) return null; + return Drizzle.or(Drizzle.lt(expr, cv), Drizzle.isNull(expr)) ?? null; + }; + + let conditions = scoped; + if (cursor) { + const orTerms: Drizzle.SQL[] = []; + for (let boundary = 0; boundary <= orderBy.length; boundary += 1) { + const andTerms: Drizzle.SQL[] = []; + for (let prior = 0; prior < boundary; prior += 1) { + const entry = orderBy[prior]!; + andTerms.push( + eqTerm( + jsonExtractSql(jsonColumn, entry.path, entry.valueType), + cursor.values[prior] as unknown, + ), + ); + } + if (boundary < orderBy.length) { + const entry = orderBy[boundary]!; + const expr = jsonExtractSql(jsonColumn, entry.path, entry.valueType); + const strict = strictTerm(expr, cursor.values[boundary] as unknown, entry.direction); + if (strict === null) continue; + andTerms.push(strict); + } else { + andTerms.push( + keyDirection === "asc" + ? Drizzle.gt(keyDrizzle, cursor.key) + : Drizzle.lt(keyDrizzle, cursor.key), + ); + } + const combined = Drizzle.and(...andTerms); + if (combined) orTerms.push(combined); + } + const afterCursor = Drizzle.or(...orTerms); + conditions = scoped && afterCursor ? Drizzle.and(scoped, afterCursor) : (afterCursor ?? scoped); + } + + const projection: Record = {}; + for (const tableColumn of Object.values(table.columns)) { + projection[tableColumn.ormName] = drizzleTable[tableColumn.names.drizzle]; + } + + return await db + .select(projection) + .from(drizzleTable) + .where(conditions) + .orderBy(...orderExprs) + .limit(limit); + }, async transaction(run) { // Some SQLite-compatible engines (Cloudflare D1) reject interactive // transactions — both raw BEGIN/COMMIT and the driver's `.transaction()`. diff --git a/packages/core/fumadb/src/adapters/memory/index.ts b/packages/core/fumadb/src/adapters/memory/index.ts index a596373f5..7b931407b 100644 --- a/packages/core/fumadb/src/adapters/memory/index.ts +++ b/packages/core/fumadb/src/adapters/memory/index.ts @@ -1,5 +1,14 @@ import type { FumaDBAdapter } from "../"; import type { AbstractQuery } from "../../query"; +import type { JsonScalar } from "../../query/aggregate"; +import { + bucketFloor, + coerceJsonValue, + compareNullableAscending, + computePercentiles, + extractJsonPath, + matchesJsonFilter, +} from "../../query/aggregate-eval"; import { ConditionType, type Condition } from "../../query/condition-builder"; import { toORM, type SimplifyFindOptions } from "../../query/orm"; import type { AnyColumn, AnySchema, AnyTable } from "../../schema"; @@ -189,6 +198,100 @@ export function memoryAdapter(options: MemoryAdapterOptions = {}): FumaDBAdapter const rows = tableRows(db, table); db[table.ormName] = rows.filter((row) => !matchesCondition(row, v.where)); }, + async jsonCount(table, { column, where, filter }) { + return tableRows(db, table).filter( + (row) => + matchesCondition(row, where) && + (!filter || matchesJsonFilter(row[column.ormName], filter)), + ).length; + }, + async jsonGroupCount(table, { column, where, filter, path, valueType }) { + const counts = new Map(); + for (const row of tableRows(db, table)) { + if (!matchesCondition(row, where)) continue; + if (filter && !matchesJsonFilter(row[column.ormName], filter)) continue; + const value = coerceJsonValue( + extractJsonPath(row[column.ormName], path), + valueType ?? "text", + ); + counts.set(value, (counts.get(value) ?? 0) + 1); + } + return [...counts.entries()].map(([value, count]) => ({ value, count })); + }, + async jsonTimeBuckets(table, { column, where, filter, path, bucketMs }) { + const counts = new Map(); + for (const row of tableRows(db, table)) { + if (!matchesCondition(row, where)) continue; + if (filter && !matchesJsonFilter(row[column.ormName], filter)) continue; + const raw = coerceJsonValue(extractJsonPath(row[column.ormName], path), "number"); + if (typeof raw !== "number") continue; + const bucket = bucketFloor(raw, bucketMs); + counts.set(bucket, (counts.get(bucket) ?? 0) + 1); + } + return [...counts.entries()] + .map(([bucket, count]) => ({ bucket, count })) + .sort((a, b) => a.bucket - b.bucket); + }, + async jsonStats(table, { column, where, filter, path, percentiles }) { + const values: number[] = []; + for (const row of tableRows(db, table)) { + if (!matchesCondition(row, where)) continue; + if (filter && !matchesJsonFilter(row[column.ormName], filter)) continue; + const raw = coerceJsonValue(extractJsonPath(row[column.ormName], path), "number"); + if (typeof raw === "number") values.push(raw); + } + if (values.length === 0) return { count: 0, min: null, max: null, percentiles: [] }; + values.sort((a, b) => a - b); + return { + count: values.length, + min: values[0]!, + max: values[values.length - 1]!, + percentiles: computePercentiles(values, percentiles ?? []), + }; + }, + async jsonPage(table, { column, where, filter, orderBy, keyColumn, keyDirection, cursor, limit }) { + const sortValue = (row: Record, index: number): JsonScalar => + coerceJsonValue( + extractJsonPath(row[column.ormName], orderBy[index]!.path), + orderBy[index]!.valueType, + ); + const keyOf = (row: Record): string => String(row[keyColumn.ormName]); + const keyDir = keyDirection === "desc" ? -1 : 1; + + let rows = tableRows(db, table).filter( + (row) => + matchesCondition(row, where) && + (!filter || matchesJsonFilter(row[column.ormName], filter)), + ); + + rows = [...rows].sort((a, b) => { + for (let index = 0; index < orderBy.length; index += 1) { + const direction = orderBy[index]!.direction === "desc" ? -1 : 1; + const compared = + compareNullableAscending(sortValue(a, index), sortValue(b, index)) * direction; + if (compared !== 0) return compared; + } + return compareNullableAscending(keyOf(a), keyOf(b)) * keyDir; + }); + + if (cursor) { + const cursorValues = cursor.values.map((value, index) => + coerceJsonValue(value, orderBy[index]!.valueType), + ); + rows = rows.filter((row) => { + for (let index = 0; index < orderBy.length; index += 1) { + const direction = orderBy[index]!.direction === "desc" ? -1 : 1; + const compared = + compareNullableAscending(sortValue(row, index), cursorValues[index] ?? null) * + direction; + if (compared !== 0) return compared > 0; + } + return compareNullableAscending(keyOf(row), cursor.key) * keyDir > 0; + }); + } + + return rows.slice(0, limit).map((row) => selectRow(table, row, true)); + }, async transaction(run: (transactionInstance: AbstractQuery) => Promise) { const snapshot = cloneValue(db); try { diff --git a/packages/core/fumadb/src/query/aggregate-eval.ts b/packages/core/fumadb/src/query/aggregate-eval.ts new file mode 100644 index 000000000..55f2c7f3c --- /dev/null +++ b/packages/core/fumadb/src/query/aggregate-eval.ts @@ -0,0 +1,151 @@ +import type { + JsonFilter, + JsonPath, + JsonPercentile, + JsonScalar, + JsonValueType, +} from "./aggregate"; + +// --------------------------------------------------------------------------- +// Pure, in-memory evaluation of the JSON-document aggregation primitives. +// Shared by the memory adapter and used as the per-dialect fallback where a +// database can't express an operation natively (e.g. SQLite percentiles). The +// percentile definition matches Postgres `percentile_cont` (continuous, linear +// interpolation) so every backend agrees. +// --------------------------------------------------------------------------- + +export const extractJsonPath = (doc: unknown, path: JsonPath): unknown => { + let current: unknown = doc; + for (const segment of path) { + if (current == null || typeof current !== "object") return undefined; + current = (current as Record)[segment]; + } + return current; +}; + +/** Coerce a raw value to a comparable scalar per its declared extraction type. */ +export const coerceJsonValue = ( + value: unknown, + valueType: JsonValueType, +): JsonScalar => { + if (value == null) return null; + if (valueType === "number") { + const numeric = typeof value === "number" ? value : Number(value); + return Number.isNaN(numeric) ? null : numeric; + } + if (valueType === "boolean") { + if (typeof value === "boolean") return value; + if (value === "true" || value === 1) return true; + if (value === "false" || value === 0) return false; + return null; + } + return typeof value === "string" ? value : String(value); +}; + +const scalarsEqual = (left: JsonScalar, right: JsonScalar): boolean => left === right; + +/** Three-way compare; `null` when either side is null (SQL-like incomparable). */ +const compareScalars = (left: JsonScalar, right: JsonScalar): number | null => { + if (left == null || right == null) return null; + if (left < right) return -1; + if (left > right) return 1; + return 0; +}; + +const matchesCompareOperator = ( + operator: string, + left: JsonScalar, + right: JsonScalar, +): boolean => { + switch (operator) { + // SQL three-valued logic: any comparison with NULL is unknown and excluded + // from a WHERE clause, so `= NULL` and `!= NULL` both match nothing. Mirror + // that here so the memory adapter agrees with the SQL adapters. + case "=": + return left != null && right != null && scalarsEqual(left, right); + case "!=": + return left != null && right != null && !scalarsEqual(left, right); + case ">": { + const compared = compareScalars(left, right); + return compared != null && compared > 0; + } + case ">=": { + const compared = compareScalars(left, right); + return compared != null && compared >= 0; + } + case "<": { + const compared = compareScalars(left, right); + return compared != null && compared < 0; + } + case "<=": { + const compared = compareScalars(left, right); + return compared != null && compared <= 0; + } + case "contains": + return typeof left === "string" && typeof right === "string" && left.includes(right); + case "starts with": + return typeof left === "string" && typeof right === "string" && left.startsWith(right); + case "ends with": + return typeof left === "string" && typeof right === "string" && left.endsWith(right); + default: + return false; + } +}; + +export const matchesJsonFilter = (doc: unknown, filter: JsonFilter): boolean => { + switch (filter.kind) { + case "and": + return filter.items.every((item) => matchesJsonFilter(doc, item)); + case "or": + return filter.items.some((item) => matchesJsonFilter(doc, item)); + case "compare": { + const left = coerceJsonValue(extractJsonPath(doc, filter.path), filter.valueType); + const right = coerceJsonValue(filter.value, filter.valueType); + return matchesCompareOperator(filter.operator, left, right); + } + case "array": { + const left = coerceJsonValue(extractJsonPath(doc, filter.path), filter.valueType); + // SQL: `NULL IN (...)` and `NULL NOT IN (...)` are both unknown, so the + // row is excluded either way. Mirror that (the Drizzle adapter does this + // implicitly via NULL semantics). + if (left == null) return false; + const found = filter.values.some((candidate) => + scalarsEqual(left, coerceJsonValue(candidate, filter.valueType)), + ); + return filter.operator === "in" ? found : !found; + } + } +}; + +export const bucketFloor = (value: number, bucketMs: number): number => + Math.floor(value / bucketMs) * bucketMs; + +/** Percentiles over an ascending-sorted numeric array (Postgres `percentile_cont`). */ +export const computePercentiles = ( + sortedAscending: readonly number[], + fractions: readonly number[], +): JsonPercentile[] => { + const length = sortedAscending.length; + if (length === 0) return []; + return fractions.map((fraction) => { + const clamped = Math.min(1, Math.max(0, fraction)); + const rank = clamped * (length - 1); + const lower = Math.floor(rank); + const upper = Math.ceil(rank); + const value = + lower === upper + ? sortedAscending[lower]! + : sortedAscending[lower]! + (sortedAscending[upper]! - sortedAscending[lower]!) * (rank - lower); + return { fraction, value }; + }); +}; + +/** Null-aware ascending compare (null sorts first). */ +export const compareNullableAscending = (left: JsonScalar, right: JsonScalar): number => { + if (left == null && right == null) return 0; + if (left == null) return -1; + if (right == null) return 1; + if (left < right) return -1; + if (left > right) return 1; + return 0; +}; diff --git a/packages/core/fumadb/src/query/aggregate.test.ts b/packages/core/fumadb/src/query/aggregate.test.ts new file mode 100644 index 000000000..ab7a5a3a3 --- /dev/null +++ b/packages/core/fumadb/src/query/aggregate.test.ts @@ -0,0 +1,388 @@ +import Database from "better-sqlite3"; +import { drizzle } from "drizzle-orm/better-sqlite3"; +import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest"; +import { fumadb } from "@executor-js/fumadb"; +import { + createDrizzleRuntimeSchemaFromTables, + createDrizzleRuntimeSchemaSqlFromTables, + drizzleAdapter, +} from "@executor-js/fumadb/adapters/drizzle"; +import { memoryAdapter } from "@executor-js/fumadb/adapters/memory"; +import type { AbstractQuery, JsonFilter } from "@executor-js/fumadb/query"; +import { column, idColumn, schema, table } from "@executor-js/fumadb/schema"; +import { toORM, type ORMAdapter } from "./orm"; + +const events = table("events", { + id: idColumn("id", "varchar(255)"), + tenant: column("tenant", "varchar(255)"), + data: column("data", "json"), +}); + +const v1 = schema({ version: "1.0.0", tables: { events } }); + +type EventsQuery = AbstractQuery; + +interface SeedRow { + readonly id: string; + readonly tenant: string; + readonly data: { + readonly status: string; + readonly trigger: string; + readonly startedAt: number; + readonly durationMs: number | null; + }; +} + +const seedRows: readonly SeedRow[] = [ + { id: "e1", tenant: "t1", data: { status: "completed", trigger: "cli", startedAt: 1000, durationMs: 100 } }, + { id: "e2", tenant: "t1", data: { status: "completed", trigger: "cli", startedAt: 2000, durationMs: 200 } }, + { id: "e3", tenant: "t1", data: { status: "failed", trigger: "http", startedAt: 3000, durationMs: 300 } }, + { id: "e4", tenant: "t1", data: { status: "running", trigger: "http", startedAt: 4000, durationMs: null } }, + { id: "e5", tenant: "t2", data: { status: "completed", trigger: "cli", startedAt: 5000, durationMs: 500 } }, +]; + +// Rows whose status carries literal LIKE wildcards (`_`, `%`). Kept out of the +// shared seed (existing assertions count exact t1 rows) and seeded only by the +// wildcard-parity test under a dedicated tenant. `a_b%c` must be matched +// literally; `axbyc` is the lookalike an unescaped LIKE would wrongly catch. +const wildcardRows: readonly SeedRow[] = [ + { id: "w1", tenant: "tw", data: { status: "a_b%c", trigger: "cli", startedAt: 6000, durationMs: 600 } }, + { id: "w2", tenant: "tw", data: { status: "axbyc", trigger: "cli", startedAt: 7000, durationMs: 700 } }, +]; + +const inT1 = (eb: Parameters[1]["where"]>>[0]) => + eb("tenant", "=", "t1"); + +const statusIn = (values: readonly string[]): JsonFilter => ({ + kind: "array", + path: ["status"], + valueType: "text", + operator: "in", + values, +}); + +const statusCompare = ( + operator: "=" | "contains" | "starts with" | "ends with", + value: string, +): JsonFilter => ({ + kind: "compare", + path: ["status"], + valueType: "text", + operator, + value, +}); + +const seed = async (orm: EventsQuery) => { + await orm.createMany("events", seedRows.map((row) => ({ ...row }))); +}; + +interface Harness { + readonly orm: EventsQuery; + readonly close: () => Promise; +} + +const makeMemoryHarness = async (): Promise => { + const client = fumadb({ namespace: "aggregate_test", schemas: [v1] }).client(memoryAdapter()); + return { orm: client.orm("1.0.0") as EventsQuery, close: async () => {} }; +}; + +const makeSqliteHarness = async (): Promise => { + const sqlite = new Database(":memory:"); + const schemaArgs = { + tables: v1.tables, + namespace: "aggregate_test", + version: "1.0.0", + provider: "sqlite", + } as const; + const drizzleDb = drizzle(sqlite, { schema: createDrizzleRuntimeSchemaFromTables(schemaArgs) }); + for (const statement of createDrizzleRuntimeSchemaSqlFromTables(schemaArgs)) { + sqlite.exec(statement); + } + const client = fumadb({ namespace: "aggregate_test", schemas: [v1] }).client( + drizzleAdapter({ db: drizzleDb, provider: "sqlite" }), + ); + return { + orm: client.orm("1.0.0") as EventsQuery, + close: async () => { + sqlite.close(); + }, + }; +}; + +const runSuite = (name: string, makeHarness: () => Promise) => { + describe(`json aggregation (${name})`, () => { + let harness: Harness; + beforeEach(async () => { + harness = await makeHarness(); + await seed(harness.orm); + }); + afterEach(async () => { + await harness.close(); + }); + + it("jsonCount scopes by real columns and JSON filter", async () => { + const { orm } = harness; + expect(await orm.jsonCount("events", { column: "data", where: inT1 })).toBe(4); + expect(await orm.jsonCount("events", { column: "data", where: (eb) => eb("tenant", "=", "t2") })).toBe(1); + expect( + await orm.jsonCount("events", { column: "data", where: inT1, filter: statusIn(["completed"]) }), + ).toBe(2); + }); + + it("jsonGroupCount counts distinct values", async () => { + const rows = await harness.orm.jsonGroupCount("events", { + column: "data", + where: inT1, + path: ["status"], + }); + const byValue = Object.fromEntries(rows.map((row) => [row.value, row.count])); + expect(byValue).toEqual({ completed: 2, failed: 1, running: 1 }); + }); + + it("jsonTimeBuckets buckets a numeric path", async () => { + const rows = await harness.orm.jsonTimeBuckets("events", { + column: "data", + where: inT1, + path: ["startedAt"], + bucketMs: 2000, + }); + expect(rows).toEqual([ + { bucket: 0, count: 1 }, + { bucket: 2000, count: 2 }, + { bucket: 4000, count: 1 }, + ]); + }); + + it("jsonStats computes count/min/max and continuous percentiles", async () => { + const stats = await harness.orm.jsonStats("events", { + column: "data", + where: inT1, + path: ["durationMs"], + percentiles: [0, 0.5, 1], + }); + expect(stats.count).toBe(3); + expect(stats.min).toBe(100); + expect(stats.max).toBe(300); + expect(stats.percentiles).toEqual([ + { fraction: 0, value: 100 }, + { fraction: 0.5, value: 200 }, + { fraction: 1, value: 300 }, + ]); + }); + + it("jsonPage keyset-paginates by a JSON path with a real-column tiebreak", async () => { + const { orm } = harness; + const page1 = await orm.jsonPage("events", { + column: "data", + where: inT1, + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "desc" }], + keyColumn: "id", + keyDirection: "desc", + limit: 2, + }); + expect(page1.map((row) => row.id)).toEqual(["e4", "e3"]); + + const page2 = await orm.jsonPage("events", { + column: "data", + where: inT1, + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "desc" }], + keyColumn: "id", + keyDirection: "desc", + limit: 2, + cursor: { values: [3000], key: "e3" }, + }); + expect(page2.map((row) => row.id)).toEqual(["e2", "e1"]); + }); + + it("jsonPage applies the JSON filter", async () => { + const rows = await harness.orm.jsonPage("events", { + column: "data", + where: inT1, + filter: statusIn(["completed", "failed"]), + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 10, + }); + expect(rows.map((row) => row.id)).toEqual(["e1", "e2", "e3"]); + }); + + it("excludes null-path rows from = / != filters (SQL three-valued logic)", async () => { + const { orm } = harness; + // e4 has durationMs: null. `!= 200` excludes e2 (it matches 200) AND e4 + // (NULL comparisons are unknown in SQL), leaving e1 + e3. + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "compare", path: ["durationMs"], valueType: "number", operator: "!=", value: 200 }, + }), + ).toBe(2); + // `= 100` matches only e1; the null row never matches `=`. + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "compare", path: ["durationMs"], valueType: "number", operator: "=", value: 100 }, + }), + ).toBe(1); + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { + kind: "array", + path: ["durationMs"], + valueType: "number", + operator: "in", + values: [100, 300], + }, + }), + ).toBe(2); + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { + kind: "array", + path: ["durationMs"], + valueType: "number", + operator: "not in", + values: [200], + }, + }), + ).toBe(2); + }); + + it("quotes JSON path segments for nested object keys", async () => { + const { orm } = harness; + await orm.create("events", { + id: "path-1", + tenant: "tp", + data: { + "a.b": { + 'c"d': 7, + }, + a: { + b: { + 'c"d': 99, + }, + }, + }, + }); + + await expect( + orm.jsonCount("events", { + column: "data", + where: (eb) => eb("tenant", "=", "tp"), + filter: { + kind: "compare", + path: ["a.b", 'c"d'], + valueType: "number", + operator: "=", + value: 7, + }, + }), + ).resolves.toBe(1); + }); + + it("keyset-paginates a nullable sort column without truncating", async () => { + const { orm } = harness; + const collected: string[] = []; + let cursor: { readonly values: readonly (number | null)[]; readonly key: string } | undefined; + for (let i = 0; i < 10; i += 1) { + const rows = await orm.jsonPage("events", { + column: "data", + where: inT1, + orderBy: [{ path: ["durationMs"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 1, + cursor, + }); + if (rows.length === 0) break; + const last = rows[rows.length - 1]!; + const key = last.id as string; + collected.push(key); + cursor = { values: [(last.data as { durationMs: number | null }).durationMs], key }; + } + // asc, nulls first: e4(null), then e1(100), e2(200), e3(300). The page + // whose cursor is the null row must still return the non-null rows, a + // naive `durationMs > NULL` predicate would truncate here. + expect(collected).toEqual(["e4", "e1", "e2", "e3"]); + }); + }); +}; + +runSuite("memory", makeMemoryHarness); +runSuite("sqlite", makeSqliteHarness); + +// LIKE-wildcard parity: a value carrying literal `_`/`%` must match the same +// rows under the drizzle (sqlite) adapter as under the literal memory adapter. +// Without ESCAPE these wildcards would make sqlite over-match (`a_b%c` would +// also catch `axbyc`). +describe("json filter LIKE-wildcard parity (memory vs sqlite)", () => { + let memory: Harness; + let sqlite: Harness; + beforeEach(async () => { + memory = await makeMemoryHarness(); + sqlite = await makeSqliteHarness(); + await memory.orm.createMany("events", wildcardRows.map((row) => ({ ...row }))); + await sqlite.orm.createMany("events", wildcardRows.map((row) => ({ ...row }))); + }); + afterEach(async () => { + await memory.close(); + await sqlite.close(); + }); + + const idsFor = async (orm: EventsQuery, filter: JsonFilter): Promise => { + const rows = await orm.jsonPage("events", { + column: "data", + where: (eb) => eb("tenant", "=", "tw"), + filter, + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 100, + }); + return rows.map((row) => row.id as string); + }; + + it.each([ + ["contains", statusCompare("contains", "_b%")], + ["eq", statusCompare("=", "a_b%c")], + ["starts with", statusCompare("starts with", "a_")], + ["ends with", statusCompare("ends with", "%c")], + ] as const)("matches identically for %s", async (_label, filter) => { + const fromMemory = await idsFor(memory.orm, filter); + const fromSqlite = await idsFor(sqlite.orm, filter); + expect(fromSqlite).toEqual(fromMemory); + // The wildcard row must be selected, the lookalike (`axbyc`) must not. + expect(fromMemory).toContain("w1"); + expect(fromMemory).not.toContain("w2"); + }); +}); + +describe("json aggregation unsupported adapters", () => { + it("fails loudly when the adapter has no JSON aggregate hooks", async () => { + let unsupported: EventsQuery; + const adapter: ORMAdapter = { + tables: v1.tables, + count: async () => 0, + findFirst: async () => null, + findMany: async () => [], + updateMany: async () => {}, + upsert: async () => {}, + create: async () => ({}), + createMany: async () => [], + deleteMany: async () => {}, + transaction: async (run: (transactionInstance: EventsQuery) => Promise): Promise => + run(unsupported), + }; + unsupported = toORM(adapter); + + await expect(unsupported.jsonCount("events", { column: "data" })).rejects.toThrow( + "[FumaDB] jsonCount is not supported by this adapter.", + ); + }); +}); diff --git a/packages/core/fumadb/src/query/aggregate.ts b/packages/core/fumadb/src/query/aggregate.ts new file mode 100644 index 000000000..6d6e3fcbf --- /dev/null +++ b/packages/core/fumadb/src/query/aggregate.ts @@ -0,0 +1,168 @@ +import type { AnyColumn } from "../schema/create"; +import type { Condition, ConditionBuilder } from "./condition-builder"; + +// --------------------------------------------------------------------------- +// JSON-document aggregation + keyset pagination. +// +// These operate over values addressed *inside* a JSON document column (e.g. +// `plugin_storage.data`), not over real typed columns. The addressing model is +// therefore path-based and carries an explicit extraction type so adapters can +// emit correct per-dialect SQL (Postgres `->>` returns text and needs casts; +// SQLite `json_extract` is natively typed). Only adapters that implement the +// matching `ORMAdapter` hooks support these; others throw loudly. +// --------------------------------------------------------------------------- + +export type JsonValueType = "text" | "number" | "boolean"; + +export type JsonScalar = string | number | boolean | null; + +/** A non-empty path into a JSON document column, e.g. `["status"]`. */ +export type JsonPath = readonly [string, ...string[]]; + +export type JsonCompareOperator = + | "=" + | "!=" + | ">" + | ">=" + | "<" + | "<=" + | "contains" + | "starts with" + | "ends with"; + +export type JsonArrayOperator = "in" | "not in"; + +/** A predicate tree over JSON-document paths, ANDed into the real-column where. */ +export type JsonFilter = + | { + readonly kind: "compare"; + readonly path: JsonPath; + readonly valueType: JsonValueType; + readonly operator: JsonCompareOperator; + readonly value: JsonScalar; + } + | { + readonly kind: "array"; + readonly path: JsonPath; + readonly valueType: JsonValueType; + readonly operator: JsonArrayOperator; + readonly values: readonly JsonScalar[]; + } + | { readonly kind: "and"; readonly items: readonly JsonFilter[] } + | { readonly kind: "or"; readonly items: readonly JsonFilter[] }; + +// --- Adapter-facing options (column/keyColumn resolved to AnyColumn, where +// compiled to a Condition with read policies already applied) --------------- + +export interface JsonAdapterBase { + /** The JSON document column the paths address. */ + readonly column: AnyColumn; + /** Real-column scoping condition (read policies already applied). */ + readonly where?: Condition; + /** JSON-path predicates, conjoined with `where`. */ + readonly filter?: JsonFilter; +} + +export type JsonCountAdapterOptions = JsonAdapterBase; + +export interface JsonGroupCountAdapterOptions extends JsonAdapterBase { + readonly path: JsonPath; + /** Extraction type for the group key; defaults to "text". */ + readonly valueType?: JsonValueType; +} + +export interface JsonGroupCountRow { + readonly value: JsonScalar; + readonly count: number; +} + +export interface JsonTimeBucketAdapterOptions extends JsonAdapterBase { + /** Numeric (epoch-ms) path the buckets are computed from. */ + readonly path: JsonPath; + readonly bucketMs: number; +} + +export interface JsonTimeBucketRow { + /** Bucket floor: `floor(value / bucketMs) * bucketMs`. */ + readonly bucket: number; + readonly count: number; +} + +export interface JsonStatsAdapterOptions extends JsonAdapterBase { + /** Numeric path the stats are computed over. */ + readonly path: JsonPath; + /** Percentile fractions in [0, 1] (e.g. 0.5, 0.95). */ + readonly percentiles?: readonly number[]; +} + +export interface JsonPercentile { + readonly fraction: number; + readonly value: number; +} + +export interface JsonStats { + readonly count: number; + readonly min: number | null; + readonly max: number | null; + readonly percentiles: readonly JsonPercentile[]; +} + +export interface JsonKeysetOrder { + readonly path: JsonPath; + readonly valueType: JsonValueType; + readonly direction: "asc" | "desc"; +} + +export interface JsonKeysetCursor { + /** One value per `orderBy` entry, in order. */ + readonly values: readonly JsonScalar[]; + /** Tiebreak value of the `keyColumn` for the cursor row. */ + readonly key: string; +} + +export interface JsonPageAdapterOptions extends JsonAdapterBase { + readonly orderBy: readonly JsonKeysetOrder[]; + /** Real column used as a stable tiebreak (e.g. the storage key). */ + readonly keyColumn: AnyColumn; + readonly keyDirection: "asc" | "desc"; + readonly cursor?: JsonKeysetCursor; + readonly limit: number; +} + +// --- Public (AbstractQuery) options: name strings + a where builder -------- + +export interface JsonPublicBase> { + readonly column: keyof TColumns & string; + readonly where?: (eb: ConditionBuilder) => Condition | boolean; + readonly filter?: JsonFilter; +} + +export type JsonCountOptions> = + JsonPublicBase; + +export interface JsonGroupCountOptions> + extends JsonPublicBase { + readonly path: JsonPath; + readonly valueType?: JsonValueType; +} + +export interface JsonTimeBucketOptions> + extends JsonPublicBase { + readonly path: JsonPath; + readonly bucketMs: number; +} + +export interface JsonStatsOptions> + extends JsonPublicBase { + readonly path: JsonPath; + readonly percentiles?: readonly number[]; +} + +export interface JsonPageOptions> + extends JsonPublicBase { + readonly orderBy: readonly JsonKeysetOrder[]; + readonly keyColumn: keyof TColumns & string; + readonly keyDirection?: "asc" | "desc"; + readonly cursor?: JsonKeysetCursor; + readonly limit: number; +} diff --git a/packages/core/fumadb/src/query/index.ts b/packages/core/fumadb/src/query/index.ts index 2128adb0e..a339b8b86 100644 --- a/packages/core/fumadb/src/query/index.ts +++ b/packages/core/fumadb/src/query/index.ts @@ -6,9 +6,20 @@ import type { TableInsertValues, TableUpdateValues, } from "../schema/create"; +import type { + JsonCountOptions, + JsonGroupCountOptions, + JsonGroupCountRow, + JsonPageOptions, + JsonStats, + JsonStatsOptions, + JsonTimeBucketOptions, + JsonTimeBucketRow, +} from "./aggregate"; import type { Condition, ConditionBuilder } from "./condition-builder"; import type { ORMAdapter } from "./orm"; +export type * from "./aggregate"; export type { Condition, ConditionBuilder } from "./condition-builder"; export { ConditionType } from "./condition-builder"; export { withQueryContext } from "./orm"; @@ -208,4 +219,39 @@ export interface AbstractQuery { eb: ConditionBuilder ) => Condition | boolean; }) => Promise; + + // --- JSON-document aggregation + keyset pagination ----------------------- + // Operate over values addressed inside a JSON document column. Only adapters + // that implement the matching `ORMAdapter` hooks support these; the rest + // throw `[FumaDB] is not supported by this adapter`. + + /** Count rows matching the real-column `where` and JSON-path `filter`. */ + jsonCount: ( + table: TableName, + options: JsonCountOptions + ) => Promise; + + /** Group by a JSON path and count rows per distinct value. */ + jsonGroupCount: ( + table: TableName, + options: JsonGroupCountOptions + ) => Promise; + + /** Bucket a numeric JSON path by `bucketMs` and count rows per bucket. */ + jsonTimeBuckets: ( + table: TableName, + options: JsonTimeBucketOptions + ) => Promise; + + /** Compute count/min/max and percentiles over a numeric JSON path. */ + jsonStats: ( + table: TableName, + options: JsonStatsOptions + ) => Promise; + + /** Keyset-paginate rows ordered by JSON paths with a real-column tiebreak. */ + jsonPage: ( + table: TableName, + options: JsonPageOptions + ) => Promise[]>; } diff --git a/packages/core/fumadb/src/query/orm/index.ts b/packages/core/fumadb/src/query/orm/index.ts index f0bac42e7..1d76a758f 100644 --- a/packages/core/fumadb/src/query/orm/index.ts +++ b/packages/core/fumadb/src/query/orm/index.ts @@ -12,6 +12,16 @@ import type { JoinBuilder, OrderBy, } from ".."; +import type { + JsonCountAdapterOptions, + JsonGroupCountAdapterOptions, + JsonGroupCountRow, + JsonPageAdapterOptions, + JsonStats, + JsonStatsAdapterOptions, + JsonTimeBucketAdapterOptions, + JsonTimeBucketRow, +} from "../aggregate"; import { buildCondition, createBuilder, type Condition } from "../condition-builder"; export interface CompiledJoin { @@ -311,6 +321,25 @@ export interface ORMAdapter { transaction: ( run: (transactionInstance: AbstractQuery) => Promise, ) => Promise; + + // --- JSON-document aggregation + keyset pagination (optional) ------------ + // Adapters that can push these into the database implement them; `toORM` + // throws a clear error for adapters that don't. + + jsonCount?: (table: AnyTable, options: JsonCountAdapterOptions) => Promise; + jsonGroupCount?: ( + table: AnyTable, + options: JsonGroupCountAdapterOptions, + ) => Promise; + jsonTimeBuckets?: ( + table: AnyTable, + options: JsonTimeBucketAdapterOptions, + ) => Promise; + jsonStats?: (table: AnyTable, options: JsonStatsAdapterOptions) => Promise; + jsonPage?: ( + table: AnyTable, + options: JsonPageAdapterOptions, + ) => Promise[]>; } export interface ToORMOptions { @@ -334,6 +363,33 @@ export function toORM( return table; } + function toColumn(table: AnyTable, name: string): AnyColumn { + const column = table.columns[name]; + if (!column) + throw new Error(`[FumaDB] Invalid column name ${name} in ${table.ormName}.`); + return column; + } + + // Compiles a public where builder, applies read policies, and yields the + // scoping condition. Returns `false` when the query is statically empty. + async function compileScopedWhere( + table: AnyTable, + where: + | ((eb: ReturnType) => Condition | boolean) + | undefined, + ): Promise { + let conditions = where ? buildCondition(table.columns, where) : undefined; + if (conditions === true) conditions = undefined; + if (conditions === false) return false; + return applyReadPolicies(table, conditions, context); + } + + function requireJsonOp(op: T | undefined, name: string): T { + if (!op) + throw new Error(`[FumaDB] ${name} is not supported by this adapter.`); + return op; + } + const query = { internal, async count(name, { where } = {}) { @@ -428,6 +484,69 @@ export function toORM( if (constrainedWhere === false) return; return internal.updateMany(table, { set, where: constrainedWhere }); }, + async jsonCount(name, { column, where, filter }) { + const op = requireJsonOp(internal.jsonCount, "jsonCount"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) return 0; + return op(table, { column: toColumn(table, column), where: scopedWhere, filter }); + }, + async jsonGroupCount(name, { column, where, filter, path, valueType }) { + const op = requireJsonOp(internal.jsonGroupCount, "jsonGroupCount"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) return []; + return op(table, { + column: toColumn(table, column), + where: scopedWhere, + filter, + path, + valueType, + }); + }, + async jsonTimeBuckets(name, { column, where, filter, path, bucketMs }) { + const op = requireJsonOp(internal.jsonTimeBuckets, "jsonTimeBuckets"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) return []; + return op(table, { + column: toColumn(table, column), + where: scopedWhere, + filter, + path, + bucketMs, + }); + }, + async jsonStats(name, { column, where, filter, path, percentiles }) { + const op = requireJsonOp(internal.jsonStats, "jsonStats"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) + return { count: 0, min: null, max: null, percentiles: [] }; + return op(table, { + column: toColumn(table, column), + where: scopedWhere, + filter, + path, + percentiles, + }); + }, + async jsonPage(name, options) { + const op = requireJsonOp(internal.jsonPage, "jsonPage"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, options.where); + if (scopedWhere === false) return []; + return op(table, { + column: toColumn(table, options.column), + where: scopedWhere, + filter: options.filter, + orderBy: options.orderBy, + keyColumn: toColumn(table, options.keyColumn), + keyDirection: options.keyDirection ?? "asc", + cursor: options.cursor, + limit: options.limit, + }) as Promise[]>; + }, async transaction(run) { return internal.transaction((transactionInstance) => run(withQueryContext(transactionInstance, context)), diff --git a/packages/core/fumadb/src/query/table-policy.test.ts b/packages/core/fumadb/src/query/table-policy.test.ts index 51d0c2b8c..d325894aa 100644 --- a/packages/core/fumadb/src/query/table-policy.test.ts +++ b/packages/core/fumadb/src/query/table-policy.test.ts @@ -103,12 +103,35 @@ const comments = table("policy_comments", { }, }); +const events = table("policy_events", { + id: idColumn("id", "varchar(255)"), + tenantId: column("tenant_id", "varchar(255)"), + data: column("data", "json"), +}).policy({ + name: "tenant.events", + onRead: ({ builder, context }) => { + if (isReadDenied("events", context)) return false; + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onCreate: ({ values, context }) => assertTenantAllowed("events", context, values.tenantId), + onUpdate: ({ builder, set, context }) => { + observe(context, "events:update"); + if (set.tenantId !== undefined) assertTenantAllowed("events", context, set.tenantId); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onDelete: ({ builder, context }) => { + observe(context, "events:delete"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, +}); + const v1 = schema({ version: "1.0.0", tables: { authors, posts, comments, + events, }, relations: { authors: ({ many }) => ({ @@ -235,6 +258,33 @@ const seedTenants = async (orm: TablePolicyQuery) => { body: "B comment", }, ]); + + await seed.createMany("events", [ + { + id: "event-a-1", + tenantId: "tenant-a", + data: { + status: "completed", + startedAt: 1000, + }, + }, + { + id: "event-a-2", + tenantId: "tenant-a", + data: { + status: "failed", + startedAt: 2000, + }, + }, + { + id: "event-b-1", + tenantId: "tenant-b", + data: { + status: "completed", + startedAt: 3000, + }, + }, + ]); }; describe("FumaDB table policies", () => { @@ -460,6 +510,67 @@ describe("FumaDB table policies", () => { }), ); + it.effect("applies read policies before JSON aggregation and keyset queries", () => + useHarness(async (orm) => { + await seedTenants(orm); + + const tenantAContext = makeContext(["tenant-a"], "tenant-a"); + const tenantA = withQueryContext(orm, tenantAContext); + + await expect(tenantA.jsonCount("events", { column: "data" })).resolves.toBe(2); + await expect( + tenantA.jsonCount("events", { + column: "data", + filter: { + kind: "compare", + path: ["status"], + valueType: "text", + operator: "=", + value: "completed", + }, + }), + ).resolves.toBe(1); + + const groups = await tenantA.jsonGroupCount("events", { + column: "data", + path: ["status"], + }); + expect(Object.fromEntries(groups.map((row) => [row.value, row.count]))).toEqual({ + completed: 1, + failed: 1, + }); + + await expect( + tenantA.jsonPage("events", { + column: "data", + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 10, + }), + ).resolves.toMatchObject([{ id: "event-a-1" }, { id: "event-a-2" }]); + + const deniedEvents = withQueryContext( + orm, + makeContext(["tenant-a"], "denied-events", ["events"]), + ); + await expect(deniedEvents.jsonCount("events", { column: "data" })).resolves.toBe(0); + await expect( + deniedEvents.jsonPage("events", { + column: "data", + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 10, + }), + ).resolves.toEqual([]); + + expect(tenantAContext.observed).toEqual( + expect.arrayContaining(["tenant-a:events:read"]), + ); + }), + ); + it.effect("fails closed when a query wrapper does not forward context rebinding", () => useHarness(async (orm) => { const wrapped = { ...orm }; diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 38e12a3c6..28809622e 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -2,7 +2,20 @@ import { Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect"; import { FetchHttpClient, type HttpClient } from "effect/unstable/http"; import { fumadb } from "@executor-js/fumadb"; import { memoryAdapter } from "@executor-js/fumadb/adapters/memory"; -import { withQueryContext, type Condition, type ConditionBuilder } from "@executor-js/fumadb/query"; +import { + withQueryContext, + type Condition, + type ConditionBuilder, + type JsonFilter, + type JsonGroupCountRow, + type JsonKeysetCursor, + type JsonKeysetOrder, + type JsonPath, + type JsonScalar, + type JsonStats, + type JsonTimeBucketRow, + type JsonValueType, +} from "@executor-js/fumadb/query"; import { schema as fumaSchema, type RelationsMap } from "@executor-js/fumadb/schema"; import type { AnyColumn } from "@executor-js/fumadb/schema"; import { generateKeyBetween } from "fractional-indexing"; @@ -591,6 +604,31 @@ type CoreProjectedRow = TSelect extends re ? Pick, Extract>> : CoreRow; +type CoreJsonBase = { + readonly column: string; + readonly where?: CoreWhere; + readonly filter?: JsonFilter; +}; +type CoreJsonGroupCountOptions = CoreJsonBase & { + readonly path: JsonPath; + readonly valueType?: JsonValueType; +}; +type CoreJsonTimeBucketOptions = CoreJsonBase & { + readonly path: JsonPath; + readonly bucketMs: number; +}; +type CoreJsonStatsOptions = CoreJsonBase & { + readonly path: JsonPath; + readonly percentiles?: readonly number[]; +}; +type CoreJsonPageOptions = CoreJsonBase & { + readonly orderBy: readonly JsonKeysetOrder[]; + readonly keyColumn: string; + readonly keyDirection?: "asc" | "desc"; + readonly cursor?: JsonKeysetCursor; + readonly limit: number; +}; + type LooseStorageDb = { readonly count: (tableName: string, options?: unknown) => Promise; readonly create: ( @@ -611,6 +649,20 @@ type LooseStorageDb = { options?: unknown, ) => Promise[]>; readonly updateMany: (tableName: string, options: unknown) => Promise; + readonly jsonCount: (tableName: string, options: unknown) => Promise; + readonly jsonGroupCount: ( + tableName: string, + options: unknown, + ) => Promise; + readonly jsonTimeBuckets: ( + tableName: string, + options: unknown, + ) => Promise; + readonly jsonStats: (tableName: string, options: unknown) => Promise; + readonly jsonPage: ( + tableName: string, + options: unknown, + ) => Promise[]>; }; const asLooseStorageDb = (db: unknown): LooseStorageDb => db as LooseStorageDb; @@ -668,6 +720,37 @@ const makeCoreDb = (fuma: ReturnType) => ({ fuma.use(`${tableName}.updateMany`, (db) => asLooseStorageDb(db).updateMany(tableName, options), ), + jsonCount: ( + tableName: CoreTableName, + options: CoreJsonBase, + ): Effect.Effect => + fuma.use(`${tableName}.jsonCount`, (db) => asLooseStorageDb(db).jsonCount(tableName, options)), + jsonGroupCount: ( + tableName: CoreTableName, + options: CoreJsonGroupCountOptions, + ): Effect.Effect => + fuma.use(`${tableName}.jsonGroupCount`, (db) => + asLooseStorageDb(db).jsonGroupCount(tableName, options), + ), + jsonTimeBuckets: ( + tableName: CoreTableName, + options: CoreJsonTimeBucketOptions, + ): Effect.Effect => + fuma.use(`${tableName}.jsonTimeBuckets`, (db) => + asLooseStorageDb(db).jsonTimeBuckets(tableName, options), + ), + jsonStats: ( + tableName: CoreTableName, + options: CoreJsonStatsOptions, + ): Effect.Effect => + fuma.use(`${tableName}.jsonStats`, (db) => asLooseStorageDb(db).jsonStats(tableName, options)), + jsonPage: ( + tableName: TName, + options: CoreJsonPageOptions, + ): Effect.Effect[], StorageFailure> => + fuma.use(`${tableName}.jsonPage`, (db) => + asLooseStorageDb(db).jsonPage(tableName, options), + ) as Effect.Effect[], StorageFailure>, }); type CoreDb = ReturnType; @@ -803,6 +886,97 @@ const rowMatchesPluginStorageWhere = ( return true; }; +const inferJsonValueType = (value: unknown): JsonValueType => { + if (value instanceof Date) return "number"; + if (typeof value === "number") return "number"; + if (typeof value === "boolean") return "boolean"; + return "text"; +}; + +const pluginStorageJsonScalar = (value: unknown): JsonScalar => pluginStorageComparableValue(value); + +// Translate the facade's indexed-field `where` into a JSON-path filter the SQL +// pushdown understands. Field value types are inferred from the operands. +const pluginStorageWhereToJsonFilter = ( + where: Readonly> | undefined, +): JsonFilter | undefined => { + if (!where) return undefined; + const items: JsonFilter[] = []; + for (const [field, condition] of Object.entries(where)) { + const path: JsonPath = [field]; + if (isPluginStorageWhereFilter(condition)) { + for (const [operator, operand] of Object.entries(condition)) { + if (operator === "in") { + const values = Array.isArray(operand) ? operand : []; + items.push({ + kind: "array", + path, + valueType: inferJsonValueType(values[0]), + operator: "in", + values: values.map(pluginStorageJsonScalar), + }); + continue; + } + const compareOperator = + operator === "gt" + ? ">" + : operator === "gte" + ? ">=" + : operator === "lt" + ? "<" + : operator === "lte" + ? "<=" + : "="; + items.push({ + kind: "compare", + path, + valueType: inferJsonValueType(operand), + operator: compareOperator, + value: pluginStorageJsonScalar(operand), + }); + } + } else { + items.push({ + kind: "compare", + path, + valueType: inferJsonValueType(condition), + operator: "=", + value: pluginStorageJsonScalar(condition), + }); + } + } + if (items.length === 0) return undefined; + if (items.length === 1) return items[0]!; + return { kind: "and", items }; +}; + +const pluginStorageFieldsValidationError = ( + definition: PluginStorageRuntimeCollectionDefinition, + fields: Iterable, +): StorageError | null => { + const indexedFields = pluginStorageCollectionIndexedFields(definition); + for (const field of fields) { + if (!indexedFields.has(field)) { + return new StorageError({ + message: `Plugin storage collection "${definition.name}" cannot query field "${field}" because it is not declared as an index`, + cause: undefined, + }); + } + } + return null; +}; + +const pluginStorageInvalidLimitError = ( + definition: PluginStorageRuntimeCollectionDefinition, + limit: number, +): StorageError | null => + Number.isInteger(limit) && limit >= 0 + ? null + : new StorageError({ + message: `Plugin storage collection "${definition.name}" received an invalid query limit`, + cause: undefined, + }); + const makePluginStorageFacade = (input: { readonly core: CoreDb; readonly pluginId: string; @@ -828,6 +1002,18 @@ const makePluginStorageFacade = (input: { key === undefined ? true : b("key", "=", key), ); + // Like `whereFor` but with an optional key-prefix match, used by the + // pushdown aggregate/keyset paths. Owner/tenant scoping is added by the + // table policy, exactly as for `whereFor`. + const whereForPrefix = + (collection: string, keyPrefix?: string): CoreWhere => + (b: AnyCb) => + b.and( + b("plugin_id", "=", input.pluginId), + b("collection", "=", collection), + keyPrefix === undefined ? true : b("key", "starts with", keyPrefix), + ); + const whereOwner = (owner: Owner, collection: string, key: string): CoreWhere => { const os = ownerSubject(owner); return (b: AnyCb) => @@ -1112,6 +1298,120 @@ const makePluginStorageFacade = (input: { query: (storageInput) => queryCollection(definition, storageInput), count: (storageInput) => queryCollection(definition, storageInput).pipe(Effect.map((rows) => rows.length)), + queryKeyset: (storageInput) => + Effect.gen(function* () { + const fields = new Set([ + ...Object.keys(storageInput.where ?? {}), + ...storageInput.orderBy.map((order) => order.field), + ]); + const validationError = pluginStorageFieldsValidationError(definition, fields); + if (validationError) return yield* validationError; + const limitError = pluginStorageInvalidLimitError(definition, storageInput.limit); + if (limitError) return yield* limitError; + + const orderBy: JsonKeysetOrder[] = storageInput.orderBy.map((order) => ({ + path: [order.field], + valueType: order.valueType ?? "text", + direction: order.direction ?? "asc", + })); + const keyDirection = orderBy[0]?.direction ?? "asc"; + + const rows = yield* input.core.jsonPage("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + orderBy, + keyColumn: "key", + keyDirection, + cursor: storageInput.cursor, + limit: storageInput.limit, + }); + + const entries = rows.map((row) => + pluginStorageEntryFromRow>(row), + ); + const last = entries.at(-1); + const nextCursor = + last && entries.length >= storageInput.limit + ? { + values: storageInput.orderBy.map((order) => + pluginStorageJsonScalar(pluginStorageDataField(last.data, order.field)), + ), + key: last.key, + } + : null; + return { entries, nextCursor }; + }), + aggregate: { + count: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError( + definition, + Object.keys(storageInput?.where ?? {}), + ); + if (validationError) return yield* validationError; + return yield* input.core.jsonCount("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput?.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput?.where as Readonly> | undefined, + ), + }); + }), + groupCount: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError(definition, [ + storageInput.field, + ...Object.keys(storageInput.where ?? {}), + ]); + if (validationError) return yield* validationError; + return yield* input.core.jsonGroupCount("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + path: [storageInput.field], + valueType: storageInput.valueType ?? "text", + }); + }), + timeBuckets: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError(definition, [ + storageInput.field, + ...Object.keys(storageInput.where ?? {}), + ]); + if (validationError) return yield* validationError; + return yield* input.core.jsonTimeBuckets("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + path: [storageInput.field], + bucketMs: storageInput.bucketMs, + }); + }), + stats: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError(definition, [ + storageInput.field, + ...Object.keys(storageInput.where ?? {}), + ]); + if (validationError) return yield* validationError; + return yield* input.core.jsonStats("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + path: [storageInput.field], + percentiles: storageInput.percentiles, + }); + }), + }, remove: (storageInput) => removeImpl(storageInput.owner, definition.name, storageInput.key), }), get: (storageInput) => getVisible(storageInput.collection, storageInput.key), diff --git a/packages/core/sdk/src/fuma-runtime.ts b/packages/core/sdk/src/fuma-runtime.ts index e560b1b09..89f7b7e16 100644 --- a/packages/core/sdk/src/fuma-runtime.ts +++ b/packages/core/sdk/src/fuma-runtime.ts @@ -139,6 +139,11 @@ const makeSafeFumaQuery = ( deleteMany: (name, value) => db.deleteMany(table(name), value), findFirst: (name, value) => db.findFirst(table(name), value), findMany: (name, value) => db.findMany(table(name), value), + jsonCount: (name, value) => db.jsonCount(table(name), value), + jsonGroupCount: (name, value) => db.jsonGroupCount(table(name), value), + jsonTimeBuckets: (name, value) => db.jsonTimeBuckets(table(name), value), + jsonStats: (name, value) => db.jsonStats(table(name), value), + jsonPage: (name, value) => db.jsonPage(table(name), value), transaction: (run) => db.transaction((transactionDb) => run(makeSafeFumaQuery(transactionDb, options))), updateMany: (name, value) => db.updateMany(table(name), value), diff --git a/packages/core/sdk/src/plugin-storage-aggregate.test.ts b/packages/core/sdk/src/plugin-storage-aggregate.test.ts new file mode 100644 index 000000000..3a2575e84 --- /dev/null +++ b/packages/core/sdk/src/plugin-storage-aggregate.test.ts @@ -0,0 +1,200 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect, Schema } from "effect"; + +import type { StorageFailure } from "./fuma-runtime"; +import { Owner } from "./ids"; +import { definePlugin } from "./plugin"; +import { + definePluginStorageCollection, + type PluginStorageAggregateFilter, + type PluginStorageGroupCountInput, + type PluginStorageQueryKeysetInput, + type PluginStorageStatsInput, + type PluginStorageTimeBucketInput, +} from "./plugin-storage"; +import { makeTestExecutor } from "./testing"; + +const Run = Schema.Struct({ + status: Schema.Literals(["completed", "failed", "running"]), + triggerKind: Schema.NullOr(Schema.String), + startedAt: Schema.Number, + durationMs: Schema.NullOr(Schema.Number), + hadInteraction: Schema.Boolean, +}); +type Run = typeof Run.Type; + +const runs = definePluginStorageCollection("runs", Run, { + indexes: ["status", "triggerKind", "startedAt", "durationMs", "hadInteraction"], +}); + +const runsPlugin = definePlugin(() => ({ + id: "runs" as const, + pluginStorage: { runs }, + storage: ({ pluginStorage }) => ({ runs: pluginStorage.collection(runs) }), + extension: (ctx) => ({ + record: (owner: Owner, key: string, data: Run) => ctx.storage.runs.put({ owner, key, data }), + count: (input?: PluginStorageAggregateFilter) => + ctx.storage.runs.aggregate.count(input), + groupCount: (input: PluginStorageGroupCountInput) => + ctx.storage.runs.aggregate.groupCount(input), + timeBuckets: (input: PluginStorageTimeBucketInput) => + ctx.storage.runs.aggregate.timeBuckets(input), + stats: (input: PluginStorageStatsInput) => ctx.storage.runs.aggregate.stats(input), + keyset: (input: PluginStorageQueryKeysetInput) => + ctx.storage.runs.queryKeyset(input), + }), +}))(); + +const seedRows: readonly (readonly [string, Run])[] = [ + [ + "r1", + { + status: "completed", + triggerKind: "cli", + startedAt: 1000, + durationMs: 100, + hadInteraction: false, + }, + ], + [ + "r2", + { + status: "completed", + triggerKind: "cli", + startedAt: 2000, + durationMs: 200, + hadInteraction: false, + }, + ], + [ + "r3", + { + status: "failed", + triggerKind: "http", + startedAt: 3000, + durationMs: 300, + hadInteraction: true, + }, + ], + [ + "r4", + { + status: "running", + triggerKind: "http", + startedAt: 4000, + durationMs: null, + hadInteraction: false, + }, + ], +]; + +const seed = (executor: { + runs: { + record: (owner: Owner, key: string, data: Run) => Effect.Effect; + }; +}) => + Effect.forEach(seedRows, ([key, data]) => executor.runs.record("org", key, data), { + discard: true, + }); + +describe("plugin storage aggregate + keyset (SQLite pushdown)", () => { + it.effect("counts with JSON filters and numeric ranges", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + expect(yield* executor.runs.count()).toBe(4); + expect(yield* executor.runs.count({ where: { status: "completed" } })).toBe(2); + expect(yield* executor.runs.count({ where: { startedAt: { gte: 3000 } } })).toBe(2); + expect(yield* executor.runs.count({ where: { durationMs: { gte: 200 } } })).toBe(2); + expect(yield* executor.runs.count({ where: { hadInteraction: true } })).toBe(1); + }), + ); + + it.effect("groups counts by a JSON field (facets)", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const byStatus = yield* executor.runs.groupCount({ field: "status" }); + expect(Object.fromEntries(byStatus.map((row) => [row.value, row.count]))).toEqual({ + completed: 2, + failed: 1, + running: 1, + }); + + const byTrigger = yield* executor.runs.groupCount({ field: "triggerKind" }); + expect(Object.fromEntries(byTrigger.map((row) => [row.value, row.count]))).toEqual({ + cli: 2, + http: 2, + }); + }), + ); + + it.effect("buckets a numeric field over time", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const buckets = yield* executor.runs.timeBuckets({ field: "startedAt", bucketMs: 2000 }); + expect(buckets).toEqual([ + { bucket: 0, count: 1 }, + { bucket: 2000, count: 2 }, + { bucket: 4000, count: 1 }, + ]); + }), + ); + + it.effect("computes duration stats and percentiles", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const stats = yield* executor.runs.stats({ field: "durationMs", percentiles: [0, 0.5, 1] }); + expect(stats.count).toBe(3); + expect(stats.min).toBe(100); + expect(stats.max).toBe(300); + expect(stats.percentiles).toEqual([ + { fraction: 0, value: 100 }, + { fraction: 0.5, value: 200 }, + { fraction: 1, value: 300 }, + ]); + }), + ); + + it.effect("keyset-paginates with a cursor and JSON filter", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const page1 = yield* executor.runs.keyset({ + orderBy: [{ field: "startedAt", direction: "desc", valueType: "number" }], + limit: 2, + }); + expect(page1.entries.map((entry) => entry.key)).toEqual(["r4", "r3"]); + expect(page1.nextCursor).not.toBeNull(); + + const page2 = yield* executor.runs.keyset({ + orderBy: [{ field: "startedAt", direction: "desc", valueType: "number" }], + limit: 2, + cursor: page1.nextCursor ?? undefined, + }); + expect(page2.entries.map((entry) => entry.key)).toEqual(["r2", "r1"]); + + const page3 = yield* executor.runs.keyset({ + orderBy: [{ field: "startedAt", direction: "desc", valueType: "number" }], + limit: 2, + cursor: page2.nextCursor ?? undefined, + }); + expect(page3.entries).toHaveLength(0); + expect(page3.nextCursor).toBeNull(); + + const completed = yield* executor.runs.keyset({ + where: { status: "completed" }, + orderBy: [{ field: "startedAt", direction: "asc", valueType: "number" }], + limit: 10, + }); + expect(completed.entries.map((entry) => entry.key)).toEqual(["r1", "r2"]); + }), + ); +}); diff --git a/packages/core/sdk/src/plugin-storage.ts b/packages/core/sdk/src/plugin-storage.ts index e854feb51..b926965b6 100644 --- a/packages/core/sdk/src/plugin-storage.ts +++ b/packages/core/sdk/src/plugin-storage.ts @@ -164,6 +164,103 @@ export interface PluginStorageEntry { readonly updatedAt: Date; } +export type PluginStorageScalar = string | number | boolean | null; + +export type PluginStorageFieldType = "text" | "number" | "boolean"; + +export interface PluginStorageAggregateFilter { + readonly where?: PluginStorageCollectionWhere; + readonly keyPrefix?: string; +} + +export interface PluginStorageGroupCountInput< + TDefinition, +> extends PluginStorageAggregateFilter { + readonly field: PluginStorageCollectionIndexedField; + readonly valueType?: PluginStorageFieldType; +} + +export interface PluginStorageTimeBucketInput< + TDefinition, +> extends PluginStorageAggregateFilter { + /** Numeric (epoch-ms) field the buckets are computed from. */ + readonly field: PluginStorageCollectionIndexedField; + readonly bucketMs: number; +} + +export interface PluginStorageStatsInput< + TDefinition, +> extends PluginStorageAggregateFilter { + /** Numeric field the stats are computed over. */ + readonly field: PluginStorageCollectionIndexedField; + /** Percentile fractions in [0, 1] (e.g. 0.5, 0.95). */ + readonly percentiles?: readonly number[]; +} + +export interface PluginStorageGroupCount { + readonly value: PluginStorageScalar; + readonly count: number; +} + +export interface PluginStorageTimeBucket { + readonly bucket: number; + readonly count: number; +} + +export interface PluginStoragePercentile { + readonly fraction: number; + readonly value: number; +} + +export interface PluginStorageStats { + readonly count: number; + readonly min: number | null; + readonly max: number | null; + readonly percentiles: readonly PluginStoragePercentile[]; +} + +export interface PluginStorageKeysetOrderBy { + readonly field: PluginStorageCollectionIndexedField; + readonly direction?: "asc" | "desc"; + readonly valueType?: PluginStorageFieldType; +} + +export interface PluginStorageKeysetCursor { + readonly values: readonly PluginStorageScalar[]; + readonly key: string; +} + +export interface PluginStorageQueryKeysetInput< + TDefinition, +> extends PluginStorageAggregateFilter { + readonly orderBy: readonly PluginStorageKeysetOrderBy[]; + readonly limit: number; + readonly cursor?: PluginStorageKeysetCursor; +} + +export interface PluginStorageKeysetPage { + readonly entries: readonly PluginStorageEntry[]; + readonly nextCursor: PluginStorageKeysetCursor | null; +} + +/** SQL-pushed aggregation over a collection's JSON documents. */ +export interface PluginStorageAggregateFacade< + TDefinition extends PluginStorageCollectionDefinition, +> { + readonly count: ( + input?: PluginStorageAggregateFilter, + ) => Effect.Effect; + readonly groupCount: ( + input: PluginStorageGroupCountInput, + ) => Effect.Effect; + readonly timeBuckets: ( + input: PluginStorageTimeBucketInput, + ) => Effect.Effect; + readonly stats: ( + input: PluginStorageStatsInput, + ) => Effect.Effect; +} + export interface PluginStorageCollectionFacade< TDefinition extends PluginStorageCollectionDefinition = PluginStorageCollectionDefinition, > { @@ -197,6 +294,15 @@ export interface PluginStorageCollectionFacade< readonly count: ( input?: Omit, "orderBy" | "limit" | "offset">, ) => Effect.Effect; + /** SQL-pushed keyset pagination ordered by indexed JSON fields. */ + readonly queryKeyset: ( + input: PluginStorageQueryKeysetInput, + ) => Effect.Effect< + PluginStorageKeysetPage>, + StorageFailure + >; + /** SQL-pushed aggregation (counts, facets, time buckets, percentiles). */ + readonly aggregate: PluginStorageAggregateFacade; readonly remove: ( input: PluginStorageCollectionScopedKeyInput, ) => Effect.Effect; diff --git a/packages/core/sdk/src/test-config.ts b/packages/core/sdk/src/test-config.ts index 7aba38ff0..66948f5c3 100644 --- a/packages/core/sdk/src/test-config.ts +++ b/packages/core/sdk/src/test-config.ts @@ -62,6 +62,11 @@ const makeLazyTestFumaDb = (options: { "deleteMany", "findFirst", "findMany", + "jsonCount", + "jsonGroupCount", + "jsonTimeBuckets", + "jsonStats", + "jsonPage", "transaction", "updateMany", "upsert", From de40957974aaf6bf7ef6c2eaae2c72c935197247 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 18:01:31 +0530 Subject: [PATCH 2/4] fix(fumadb): address aggregate review notes (greptile) --- packages/core/fumadb/src/query/aggregate.ts | 15 ++++++++++++- packages/core/sdk/src/executor.ts | 25 +++++++++++++-------- packages/core/sdk/src/plugin-storage.ts | 8 ++++++- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/packages/core/fumadb/src/query/aggregate.ts b/packages/core/fumadb/src/query/aggregate.ts index 6d6e3fcbf..5f3075440 100644 --- a/packages/core/fumadb/src/query/aggregate.ts +++ b/packages/core/fumadb/src/query/aggregate.ts @@ -91,7 +91,13 @@ export interface JsonTimeBucketRow { export interface JsonStatsAdapterOptions extends JsonAdapterBase { /** Numeric path the stats are computed over. */ readonly path: JsonPath; - /** Percentile fractions in [0, 1] (e.g. 0.5, 0.95). */ + /** + * Percentile fractions in [0, 1] (e.g. 0.5, 0.95). + * + * SQLite adapters compute percentiles from all matching numeric values after + * projection because SQLite has no native percentile aggregate. Count, min, + * and max still run as SQL aggregates. + */ readonly percentiles?: readonly number[]; } @@ -155,6 +161,13 @@ export interface JsonTimeBucketOptions> extends JsonPublicBase { readonly path: JsonPath; + /** + * Percentile fractions in [0, 1] (e.g. 0.5, 0.95). + * + * SQLite adapters compute percentiles from all matching numeric values after + * projection because SQLite has no native percentile aggregate. Count, min, + * and max still run as SQL aggregates. + */ readonly percentiles?: readonly number[]; } diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 28809622e..f5316d05b 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -6,6 +6,7 @@ import { withQueryContext, type Condition, type ConditionBuilder, + type JsonCompareOperator, type JsonFilter, type JsonGroupCountRow, type JsonKeysetCursor, @@ -824,6 +825,17 @@ const pluginStorageWhereOperators = ["eq", "in", "gt", "gte", "lt", "lte"] as co const isPluginStorageWhereFilter = (value: unknown): value is Readonly> => isPluginStorageRecord(value) && pluginStorageWhereOperators.some((operator) => operator in value); +const pluginStorageJsonCompareOperators = { + eq: "=", + gt: ">", + gte: ">=", + lt: "<", + lte: "<=", +} satisfies Record< + Exclude<(typeof pluginStorageWhereOperators)[number], "in">, + JsonCompareOperator +>; + const pluginStorageComparableValue = (value: unknown): string | number | boolean | null => { if (value instanceof Date) return value.getTime(); if (typeof value === "number" || typeof value === "string" || typeof value === "boolean") { @@ -917,16 +929,11 @@ const pluginStorageWhereToJsonFilter = ( }); continue; } + if (!(operator in pluginStorageJsonCompareOperators)) continue; const compareOperator = - operator === "gt" - ? ">" - : operator === "gte" - ? ">=" - : operator === "lt" - ? "<" - : operator === "lte" - ? "<=" - : "="; + pluginStorageJsonCompareOperators[ + operator as keyof typeof pluginStorageJsonCompareOperators + ]; items.push({ kind: "compare", path, diff --git a/packages/core/sdk/src/plugin-storage.ts b/packages/core/sdk/src/plugin-storage.ts index b926965b6..9a560e321 100644 --- a/packages/core/sdk/src/plugin-storage.ts +++ b/packages/core/sdk/src/plugin-storage.ts @@ -193,7 +193,13 @@ export interface PluginStorageStatsInput< > extends PluginStorageAggregateFilter { /** Numeric field the stats are computed over. */ readonly field: PluginStorageCollectionIndexedField; - /** Percentile fractions in [0, 1] (e.g. 0.5, 0.95). */ + /** + * Percentile fractions in [0, 1] (e.g. 0.5, 0.95). + * + * SQLite has no native percentile aggregate, so SQLite-backed storage computes + * percentiles from all matching numeric values after projection. Count, min, + * and max still run as SQL aggregates. + */ readonly percentiles?: readonly number[]; } From 1c77dbd4f275506be3423bc8cdd992c136458c0a Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 18:06:43 +0530 Subject: [PATCH 3/4] fix(openapi): update storage facade mock (greptile) --- packages/plugins/openapi/src/sdk/store.test.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/plugins/openapi/src/sdk/store.test.ts b/packages/plugins/openapi/src/sdk/store.test.ts index ef4a4ed2c..bab86fbbe 100644 --- a/packages/plugins/openapi/src/sdk/store.test.ts +++ b/packages/plugins/openapi/src/sdk/store.test.ts @@ -51,6 +51,13 @@ describe("OpenAPI operation store", () => { ), query: () => Effect.succeed([]), count: () => Effect.succeed(0), + queryKeyset: () => Effect.succeed({ entries: [], nextCursor: null }), + aggregate: { + count: () => Effect.succeed(0), + groupCount: () => Effect.succeed([]), + timeBuckets: () => Effect.succeed([]), + stats: () => Effect.succeed({ count: 0, min: null, max: null, percentiles: [] }), + }, remove: () => Effect.void, }), get: (input: { readonly collection: string; readonly key: string }) => From 6ed7d99abc3b2eefd281ed3327a508a6eb95fbf1 Mon Sep 17 00:00:00 2001 From: Saatvik Arya Date: Wed, 24 Jun 2026 18:17:36 +0530 Subject: [PATCH 4/4] fix(fumadb): preserve empty or filter semantics (greptile) --- .../core/fumadb/src/adapters/drizzle/query.ts | 1 + .../core/fumadb/src/query/aggregate.test.ts | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/packages/core/fumadb/src/adapters/drizzle/query.ts b/packages/core/fumadb/src/adapters/drizzle/query.ts index 82f3a81e3..8158c690f 100644 --- a/packages/core/fumadb/src/adapters/drizzle/query.ts +++ b/packages/core/fumadb/src/adapters/drizzle/query.ts @@ -352,6 +352,7 @@ export function fromDrizzle( ); } if (filter.kind === "or") { + if (filter.items.length === 0) return Drizzle.sql`1 = 0`; return Drizzle.or( ...filter.items.map((item) => buildJsonFilter(jsonColumn, item)), ); diff --git a/packages/core/fumadb/src/query/aggregate.test.ts b/packages/core/fumadb/src/query/aggregate.test.ts index ab7a5a3a3..1a80049ea 100644 --- a/packages/core/fumadb/src/query/aggregate.test.ts +++ b/packages/core/fumadb/src/query/aggregate.test.ts @@ -129,6 +129,24 @@ const runSuite = (name: string, makeHarness: () => Promise) => { ).toBe(2); }); + it("matches empty composite filter semantics", async () => { + const { orm } = harness; + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "or", items: [] }, + }), + ).toBe(0); + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "and", items: [] }, + }), + ).toBe(4); + }); + it("jsonGroupCount counts distinct values", async () => { const rows = await harness.orm.jsonGroupCount("events", { column: "data",