diff --git a/.changeset/fumadb-json-pushdown.md b/.changeset/fumadb-json-pushdown.md new file mode 100644 index 000000000..2f728d491 --- /dev/null +++ b/.changeset/fumadb-json-pushdown.md @@ -0,0 +1,16 @@ +--- +"@executor-js/fumadb": minor +"@executor-js/sdk": minor +--- + +Push JSON-document aggregation and keyset pagination down to SQL. FumaDB's query +layer gains `jsonCount`, `jsonGroupCount`, `jsonTimeBuckets`, `jsonStats` +(min/max + continuous percentiles), and `jsonPage` (cursor pagination) over a +JSON document column, implemented natively for the memory and Drizzle +(SQLite/Postgres) adapters and failing loudly on adapters that don't support +them. Plugin storage exposes this as `collection.aggregate.{count,groupCount, +timeBuckets,stats}` and `collection.queryKeyset(...)`, translating a +collection's indexed-field `where` into JSON-path predicates while owner/tenant +scoping stays enforced by the storage policy. Aggregates and pages are computed +in the database instead of by fetching the whole collection and reducing in +memory. diff --git a/packages/core/fumadb/src/adapters/drizzle/query.ts b/packages/core/fumadb/src/adapters/drizzle/query.ts index 9471ecda0..8158c690f 100644 --- a/packages/core/fumadb/src/adapters/drizzle/query.ts +++ b/packages/core/fumadb/src/adapters/drizzle/query.ts @@ -1,6 +1,13 @@ import * as Drizzle from "drizzle-orm"; import type * as PostgreSQL from "drizzle-orm/pg-core"; import type { AbstractQuery, FindManyOptions } from "../../query"; +import type { + JsonFilter, + JsonPath, + JsonScalar, + JsonValueType, +} from "../../query/aggregate"; +import { coerceJsonValue, computePercentiles } from "../../query/aggregate-eval"; import { type Condition, ConditionType } from "../../query/condition-builder"; import { type SimplifyFindOptions, toORM } from "../../query/orm"; import { @@ -263,6 +270,120 @@ export function fromDrizzle( return out; } + // --- JSON-document aggregation helpers ---------------------------------- + // Extract a JSON path as a typed SQL expression. SQLite `json_extract` is + // natively typed; Postgres `#>>` returns text and needs explicit casts. + function jsonExtractSql( + jsonColumn: ColumnType, + path: JsonPath, + valueType: JsonValueType, + ): Drizzle.SQL { + if (provider === "postgresql") { + // Build a `text[]` array literal with each segment double-quoted so a + // segment containing a comma (the array element separator) descends the + // intended key instead of splitting into two. Inside a quoted element a + // backslash and a double-quote must be backslash-escaped. + const pgPath = `{${path + .map((segment) => `"${segment.replace(/\\/g, "\\\\").replace(/"/g, '\\"')}"`) + .join(",")}}`; + const text = Drizzle.sql`(${jsonColumn} #>> ${pgPath})`; + if (valueType === "number") return Drizzle.sql`${text}::numeric`; + if (valueType === "boolean") return Drizzle.sql`${text}::boolean`; + return text; + } + // Build a quoted SQLite JSON path (`$."seg1"."seg2"`) so a segment + // containing a dot (the path separator) is treated as one key. Inside a + // quoted segment an embedded backslash or double-quote is backslash-escaped. + const jsonPath = `$${path + .map((segment) => `."${segment.replace(/\\/g, "\\\\").replace(/"/g, '\\"')}"`) + .join("")}`; + return Drizzle.sql`json_extract(${jsonColumn}, ${jsonPath})`; + } + + // Escape LIKE wildcards (`%`, `_`) and the escape character itself so the + // value matches literally, matching the memory adapter's + // includes/startsWith/endsWith. Paired with an explicit `escape '\'` clause. + function escapeLikePattern(value: string): string { + return value.replace(/[\\%_]/g, (char) => `\\${char}`); + } + + function jsonCompareSql( + operator: string, + expr: Drizzle.SQL, + value: JsonScalar, + ): Drizzle.SQL { + switch (operator) { + case "=": + return Drizzle.eq(expr, value); + case "!=": + return Drizzle.ne(expr, value); + case ">": + return Drizzle.gt(expr, value); + case ">=": + return Drizzle.gte(expr, value); + case "<": + return Drizzle.lt(expr, value); + case "<=": + return Drizzle.lte(expr, value); + case "contains": { + const escaped = escapeLikePattern(String(value)); + return Drizzle.sql`${expr} like ${`%${escaped}%`} escape '\\'`; + } + case "starts with": { + const escaped = escapeLikePattern(String(value)); + return Drizzle.sql`${expr} like ${`${escaped}%`} escape '\\'`; + } + case "ends with": { + const escaped = escapeLikePattern(String(value)); + return Drizzle.sql`${expr} like ${`%${escaped}`} escape '\\'`; + } + default: + throw new Error(`[FumaDB Drizzle] Unsupported JSON operator: ${operator}`); + } + } + + function buildJsonFilter( + jsonColumn: ColumnType, + filter: JsonFilter, + ): Drizzle.SQL | undefined { + if (filter.kind === "and") { + return Drizzle.and( + ...filter.items.map((item) => buildJsonFilter(jsonColumn, item)), + ); + } + if (filter.kind === "or") { + if (filter.items.length === 0) return Drizzle.sql`1 = 0`; + return Drizzle.or( + ...filter.items.map((item) => buildJsonFilter(jsonColumn, item)), + ); + } + const expr = jsonExtractSql(jsonColumn, filter.path, filter.valueType); + if (filter.kind === "array") { + return filter.operator === "in" + ? Drizzle.inArray(expr, filter.values as unknown[]) + : Drizzle.notInArray(expr, filter.values as unknown[]); + } + return jsonCompareSql(filter.operator, expr, filter.value); + } + + function buildScopedConditions( + jsonColumn: ColumnType, + where: Condition | undefined, + filter: JsonFilter | undefined, + ): Drizzle.SQL | undefined { + const parts: Drizzle.SQL[] = []; + if (where) { + const compiled = buildWhere(toDrizzleColumn, where); + if (compiled) parts.push(compiled); + } + if (filter) { + const compiled = buildJsonFilter(jsonColumn, filter); + if (compiled) parts.push(compiled); + } + if (parts.length === 0) return undefined; + return Drizzle.and(...parts); + } + return toORM({ tables: schema.tables, async count(table, v) { @@ -402,6 +523,182 @@ export function fromDrizzle( await query; }, + async jsonCount(table, { column, where, filter }) { + const conditions = buildScopedConditions(toDrizzleColumn(column), where, filter); + return await db.$count(toDrizzle(table), conditions); + }, + async jsonGroupCount(table, { column, where, filter, path, valueType }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const groupExpr = jsonExtractSql(jsonColumn, path, valueType ?? "text"); + const conditions = buildScopedConditions(jsonColumn, where, filter); + const rows = await db + .select({ value: groupExpr, count: Drizzle.sql`count(*)` }) + .from(drizzleTable) + .where(conditions) + .groupBy(groupExpr); + return rows.map((row) => ({ + value: coerceJsonValue(row.value, valueType ?? "text"), + count: Number(row.count), + })); + }, + async jsonTimeBuckets(table, { column, where, filter, path, bucketMs }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const valueExpr = jsonExtractSql(jsonColumn, path, "number"); + // `value - (value % bucket)` floors to the bucket start without relying + // on integer division (SQLite binds numeric params as REAL, so `/` would + // be float division). Matches `bucketFloor` for non-negative epochs. + const bucketExpr = Drizzle.sql`(${valueExpr} - (${valueExpr} % ${bucketMs}))`; + const conditions = buildScopedConditions(jsonColumn, where, filter); + const rows = await db + .select({ bucket: bucketExpr, count: Drizzle.sql`count(*)` }) + .from(drizzleTable) + .where(conditions) + .groupBy(bucketExpr) + .orderBy(bucketExpr); + return rows.map((row) => ({ bucket: Number(row.bucket), count: Number(row.count) })); + }, + async jsonStats(table, { column, where, filter, path, percentiles }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const valueExpr = jsonExtractSql(jsonColumn, path, "number"); + const conditions = buildScopedConditions(jsonColumn, where, filter); + const aggregate = await db + .select({ + count: Drizzle.sql`count(${valueExpr})`, + min: Drizzle.sql`min(${valueExpr})`, + max: Drizzle.sql`max(${valueExpr})`, + }) + .from(drizzleTable) + .where(conditions); + const summary = aggregate[0]; + const count = Number(summary?.count ?? 0); + if (count === 0) return { count: 0, min: null, max: null, percentiles: [] }; + const min = summary?.min == null ? null : Number(summary.min); + const max = summary?.max == null ? null : Number(summary.max); + const fractions = percentiles ?? []; + if (fractions.length === 0) return { count, min, max, percentiles: [] }; + + if (provider === "postgresql") { + const pctExpr = Drizzle.sql`percentile_cont(array[${Drizzle.sql.join( + fractions.map((fraction) => Drizzle.sql`${fraction}`), + Drizzle.sql`, `, + )}]) within group (order by ${valueExpr})`; + const pctRows = await db + .select({ values: Drizzle.sql`${pctExpr}` }) + .from(drizzleTable) + .where(conditions); + const values = pctRows[0]?.values ?? []; + return { + count, + min, + max, + percentiles: fractions.map((fraction, index) => ({ + fraction, + value: Number(values[index]), + })), + }; + } + + // SQLite has no percentile_cont, so compute over the projected values. + const valueRows = await db + .select({ value: valueExpr }) + .from(drizzleTable) + .where(conditions) + .orderBy(valueExpr); + const sorted = valueRows + .map((row) => row.value) + .filter((value): value is number | string => value != null) + .map((value) => Number(value)) + .filter((value) => !Number.isNaN(value)) + .sort((a, b) => a - b); + return { count, min, max, percentiles: computePercentiles(sorted, fractions) }; + }, + async jsonPage(table, { column, where, filter, orderBy, keyColumn, keyDirection, cursor, limit }) { + const drizzleTable = toDrizzle(table); + const jsonColumn = toDrizzleColumn(column); + const keyDrizzle = toDrizzleColumn(keyColumn); + const scoped = buildScopedConditions(jsonColumn, where, filter); + + const orderExprs: Drizzle.SQL[] = orderBy.map((entry) => { + const expr = jsonExtractSql(jsonColumn, entry.path, entry.valueType); + // Mirror the memory adapter's null ordering (null first in asc, last in + // desc, see compareNullableAscending). SQLite defaults to this; Postgres + // defaults to the opposite, so make it explicit on both dialects. + return entry.direction === "desc" + ? Drizzle.sql`${expr} desc nulls last` + : Drizzle.sql`${expr} asc nulls first`; + }); + orderExprs.push(keyDirection === "desc" ? Drizzle.desc(keyDrizzle) : Drizzle.asc(keyDrizzle)); + + // Keyset boundary terms with SQL three-valued-logic null handling. A naive + // `expr > NULL` / `expr < NULL` is always unknown, which silently empties + // every page once the cursor row's sort value is null. These mirror + // compareNullableAscending so a nullable sort column paginates correctly. + const eqTerm = (expr: Drizzle.SQL, cv: unknown): Drizzle.SQL => + cv == null ? Drizzle.isNull(expr) : Drizzle.eq(expr, cv); + const strictTerm = ( + expr: Drizzle.SQL, + cv: unknown, + dir: "asc" | "desc", + ): Drizzle.SQL | null => { + if (dir === "asc") { + // null is first: everything non-null is after a null cursor. + return cv == null ? Drizzle.isNotNull(expr) : Drizzle.gt(expr, cv); + } + // desc, null last: nothing is strictly after a null cursor on this field; + // and null rows fall after any non-null cursor value. + if (cv == null) return null; + return Drizzle.or(Drizzle.lt(expr, cv), Drizzle.isNull(expr)) ?? null; + }; + + let conditions = scoped; + if (cursor) { + const orTerms: Drizzle.SQL[] = []; + for (let boundary = 0; boundary <= orderBy.length; boundary += 1) { + const andTerms: Drizzle.SQL[] = []; + for (let prior = 0; prior < boundary; prior += 1) { + const entry = orderBy[prior]!; + andTerms.push( + eqTerm( + jsonExtractSql(jsonColumn, entry.path, entry.valueType), + cursor.values[prior] as unknown, + ), + ); + } + if (boundary < orderBy.length) { + const entry = orderBy[boundary]!; + const expr = jsonExtractSql(jsonColumn, entry.path, entry.valueType); + const strict = strictTerm(expr, cursor.values[boundary] as unknown, entry.direction); + if (strict === null) continue; + andTerms.push(strict); + } else { + andTerms.push( + keyDirection === "asc" + ? Drizzle.gt(keyDrizzle, cursor.key) + : Drizzle.lt(keyDrizzle, cursor.key), + ); + } + const combined = Drizzle.and(...andTerms); + if (combined) orTerms.push(combined); + } + const afterCursor = Drizzle.or(...orTerms); + conditions = scoped && afterCursor ? Drizzle.and(scoped, afterCursor) : (afterCursor ?? scoped); + } + + const projection: Record = {}; + for (const tableColumn of Object.values(table.columns)) { + projection[tableColumn.ormName] = drizzleTable[tableColumn.names.drizzle]; + } + + return await db + .select(projection) + .from(drizzleTable) + .where(conditions) + .orderBy(...orderExprs) + .limit(limit); + }, async transaction(run) { // Some SQLite-compatible engines (Cloudflare D1) reject interactive // transactions — both raw BEGIN/COMMIT and the driver's `.transaction()`. diff --git a/packages/core/fumadb/src/adapters/memory/index.ts b/packages/core/fumadb/src/adapters/memory/index.ts index a596373f5..7b931407b 100644 --- a/packages/core/fumadb/src/adapters/memory/index.ts +++ b/packages/core/fumadb/src/adapters/memory/index.ts @@ -1,5 +1,14 @@ import type { FumaDBAdapter } from "../"; import type { AbstractQuery } from "../../query"; +import type { JsonScalar } from "../../query/aggregate"; +import { + bucketFloor, + coerceJsonValue, + compareNullableAscending, + computePercentiles, + extractJsonPath, + matchesJsonFilter, +} from "../../query/aggregate-eval"; import { ConditionType, type Condition } from "../../query/condition-builder"; import { toORM, type SimplifyFindOptions } from "../../query/orm"; import type { AnyColumn, AnySchema, AnyTable } from "../../schema"; @@ -189,6 +198,100 @@ export function memoryAdapter(options: MemoryAdapterOptions = {}): FumaDBAdapter const rows = tableRows(db, table); db[table.ormName] = rows.filter((row) => !matchesCondition(row, v.where)); }, + async jsonCount(table, { column, where, filter }) { + return tableRows(db, table).filter( + (row) => + matchesCondition(row, where) && + (!filter || matchesJsonFilter(row[column.ormName], filter)), + ).length; + }, + async jsonGroupCount(table, { column, where, filter, path, valueType }) { + const counts = new Map(); + for (const row of tableRows(db, table)) { + if (!matchesCondition(row, where)) continue; + if (filter && !matchesJsonFilter(row[column.ormName], filter)) continue; + const value = coerceJsonValue( + extractJsonPath(row[column.ormName], path), + valueType ?? "text", + ); + counts.set(value, (counts.get(value) ?? 0) + 1); + } + return [...counts.entries()].map(([value, count]) => ({ value, count })); + }, + async jsonTimeBuckets(table, { column, where, filter, path, bucketMs }) { + const counts = new Map(); + for (const row of tableRows(db, table)) { + if (!matchesCondition(row, where)) continue; + if (filter && !matchesJsonFilter(row[column.ormName], filter)) continue; + const raw = coerceJsonValue(extractJsonPath(row[column.ormName], path), "number"); + if (typeof raw !== "number") continue; + const bucket = bucketFloor(raw, bucketMs); + counts.set(bucket, (counts.get(bucket) ?? 0) + 1); + } + return [...counts.entries()] + .map(([bucket, count]) => ({ bucket, count })) + .sort((a, b) => a.bucket - b.bucket); + }, + async jsonStats(table, { column, where, filter, path, percentiles }) { + const values: number[] = []; + for (const row of tableRows(db, table)) { + if (!matchesCondition(row, where)) continue; + if (filter && !matchesJsonFilter(row[column.ormName], filter)) continue; + const raw = coerceJsonValue(extractJsonPath(row[column.ormName], path), "number"); + if (typeof raw === "number") values.push(raw); + } + if (values.length === 0) return { count: 0, min: null, max: null, percentiles: [] }; + values.sort((a, b) => a - b); + return { + count: values.length, + min: values[0]!, + max: values[values.length - 1]!, + percentiles: computePercentiles(values, percentiles ?? []), + }; + }, + async jsonPage(table, { column, where, filter, orderBy, keyColumn, keyDirection, cursor, limit }) { + const sortValue = (row: Record, index: number): JsonScalar => + coerceJsonValue( + extractJsonPath(row[column.ormName], orderBy[index]!.path), + orderBy[index]!.valueType, + ); + const keyOf = (row: Record): string => String(row[keyColumn.ormName]); + const keyDir = keyDirection === "desc" ? -1 : 1; + + let rows = tableRows(db, table).filter( + (row) => + matchesCondition(row, where) && + (!filter || matchesJsonFilter(row[column.ormName], filter)), + ); + + rows = [...rows].sort((a, b) => { + for (let index = 0; index < orderBy.length; index += 1) { + const direction = orderBy[index]!.direction === "desc" ? -1 : 1; + const compared = + compareNullableAscending(sortValue(a, index), sortValue(b, index)) * direction; + if (compared !== 0) return compared; + } + return compareNullableAscending(keyOf(a), keyOf(b)) * keyDir; + }); + + if (cursor) { + const cursorValues = cursor.values.map((value, index) => + coerceJsonValue(value, orderBy[index]!.valueType), + ); + rows = rows.filter((row) => { + for (let index = 0; index < orderBy.length; index += 1) { + const direction = orderBy[index]!.direction === "desc" ? -1 : 1; + const compared = + compareNullableAscending(sortValue(row, index), cursorValues[index] ?? null) * + direction; + if (compared !== 0) return compared > 0; + } + return compareNullableAscending(keyOf(row), cursor.key) * keyDir > 0; + }); + } + + return rows.slice(0, limit).map((row) => selectRow(table, row, true)); + }, async transaction(run: (transactionInstance: AbstractQuery) => Promise) { const snapshot = cloneValue(db); try { diff --git a/packages/core/fumadb/src/query/aggregate-eval.ts b/packages/core/fumadb/src/query/aggregate-eval.ts new file mode 100644 index 000000000..55f2c7f3c --- /dev/null +++ b/packages/core/fumadb/src/query/aggregate-eval.ts @@ -0,0 +1,151 @@ +import type { + JsonFilter, + JsonPath, + JsonPercentile, + JsonScalar, + JsonValueType, +} from "./aggregate"; + +// --------------------------------------------------------------------------- +// Pure, in-memory evaluation of the JSON-document aggregation primitives. +// Shared by the memory adapter and used as the per-dialect fallback where a +// database can't express an operation natively (e.g. SQLite percentiles). The +// percentile definition matches Postgres `percentile_cont` (continuous, linear +// interpolation) so every backend agrees. +// --------------------------------------------------------------------------- + +export const extractJsonPath = (doc: unknown, path: JsonPath): unknown => { + let current: unknown = doc; + for (const segment of path) { + if (current == null || typeof current !== "object") return undefined; + current = (current as Record)[segment]; + } + return current; +}; + +/** Coerce a raw value to a comparable scalar per its declared extraction type. */ +export const coerceJsonValue = ( + value: unknown, + valueType: JsonValueType, +): JsonScalar => { + if (value == null) return null; + if (valueType === "number") { + const numeric = typeof value === "number" ? value : Number(value); + return Number.isNaN(numeric) ? null : numeric; + } + if (valueType === "boolean") { + if (typeof value === "boolean") return value; + if (value === "true" || value === 1) return true; + if (value === "false" || value === 0) return false; + return null; + } + return typeof value === "string" ? value : String(value); +}; + +const scalarsEqual = (left: JsonScalar, right: JsonScalar): boolean => left === right; + +/** Three-way compare; `null` when either side is null (SQL-like incomparable). */ +const compareScalars = (left: JsonScalar, right: JsonScalar): number | null => { + if (left == null || right == null) return null; + if (left < right) return -1; + if (left > right) return 1; + return 0; +}; + +const matchesCompareOperator = ( + operator: string, + left: JsonScalar, + right: JsonScalar, +): boolean => { + switch (operator) { + // SQL three-valued logic: any comparison with NULL is unknown and excluded + // from a WHERE clause, so `= NULL` and `!= NULL` both match nothing. Mirror + // that here so the memory adapter agrees with the SQL adapters. + case "=": + return left != null && right != null && scalarsEqual(left, right); + case "!=": + return left != null && right != null && !scalarsEqual(left, right); + case ">": { + const compared = compareScalars(left, right); + return compared != null && compared > 0; + } + case ">=": { + const compared = compareScalars(left, right); + return compared != null && compared >= 0; + } + case "<": { + const compared = compareScalars(left, right); + return compared != null && compared < 0; + } + case "<=": { + const compared = compareScalars(left, right); + return compared != null && compared <= 0; + } + case "contains": + return typeof left === "string" && typeof right === "string" && left.includes(right); + case "starts with": + return typeof left === "string" && typeof right === "string" && left.startsWith(right); + case "ends with": + return typeof left === "string" && typeof right === "string" && left.endsWith(right); + default: + return false; + } +}; + +export const matchesJsonFilter = (doc: unknown, filter: JsonFilter): boolean => { + switch (filter.kind) { + case "and": + return filter.items.every((item) => matchesJsonFilter(doc, item)); + case "or": + return filter.items.some((item) => matchesJsonFilter(doc, item)); + case "compare": { + const left = coerceJsonValue(extractJsonPath(doc, filter.path), filter.valueType); + const right = coerceJsonValue(filter.value, filter.valueType); + return matchesCompareOperator(filter.operator, left, right); + } + case "array": { + const left = coerceJsonValue(extractJsonPath(doc, filter.path), filter.valueType); + // SQL: `NULL IN (...)` and `NULL NOT IN (...)` are both unknown, so the + // row is excluded either way. Mirror that (the Drizzle adapter does this + // implicitly via NULL semantics). + if (left == null) return false; + const found = filter.values.some((candidate) => + scalarsEqual(left, coerceJsonValue(candidate, filter.valueType)), + ); + return filter.operator === "in" ? found : !found; + } + } +}; + +export const bucketFloor = (value: number, bucketMs: number): number => + Math.floor(value / bucketMs) * bucketMs; + +/** Percentiles over an ascending-sorted numeric array (Postgres `percentile_cont`). */ +export const computePercentiles = ( + sortedAscending: readonly number[], + fractions: readonly number[], +): JsonPercentile[] => { + const length = sortedAscending.length; + if (length === 0) return []; + return fractions.map((fraction) => { + const clamped = Math.min(1, Math.max(0, fraction)); + const rank = clamped * (length - 1); + const lower = Math.floor(rank); + const upper = Math.ceil(rank); + const value = + lower === upper + ? sortedAscending[lower]! + : sortedAscending[lower]! + (sortedAscending[upper]! - sortedAscending[lower]!) * (rank - lower); + return { fraction, value }; + }); +}; + +/** Null-aware ascending compare (null sorts first). */ +export const compareNullableAscending = (left: JsonScalar, right: JsonScalar): number => { + if (left == null && right == null) return 0; + if (left == null) return -1; + if (right == null) return 1; + if (left < right) return -1; + if (left > right) return 1; + return 0; +}; diff --git a/packages/core/fumadb/src/query/aggregate.test.ts b/packages/core/fumadb/src/query/aggregate.test.ts new file mode 100644 index 000000000..1a80049ea --- /dev/null +++ b/packages/core/fumadb/src/query/aggregate.test.ts @@ -0,0 +1,406 @@ +import Database from "better-sqlite3"; +import { drizzle } from "drizzle-orm/better-sqlite3"; +import { afterEach, beforeEach, describe, expect, it } from "@effect/vitest"; +import { fumadb } from "@executor-js/fumadb"; +import { + createDrizzleRuntimeSchemaFromTables, + createDrizzleRuntimeSchemaSqlFromTables, + drizzleAdapter, +} from "@executor-js/fumadb/adapters/drizzle"; +import { memoryAdapter } from "@executor-js/fumadb/adapters/memory"; +import type { AbstractQuery, JsonFilter } from "@executor-js/fumadb/query"; +import { column, idColumn, schema, table } from "@executor-js/fumadb/schema"; +import { toORM, type ORMAdapter } from "./orm"; + +const events = table("events", { + id: idColumn("id", "varchar(255)"), + tenant: column("tenant", "varchar(255)"), + data: column("data", "json"), +}); + +const v1 = schema({ version: "1.0.0", tables: { events } }); + +type EventsQuery = AbstractQuery; + +interface SeedRow { + readonly id: string; + readonly tenant: string; + readonly data: { + readonly status: string; + readonly trigger: string; + readonly startedAt: number; + readonly durationMs: number | null; + }; +} + +const seedRows: readonly SeedRow[] = [ + { id: "e1", tenant: "t1", data: { status: "completed", trigger: "cli", startedAt: 1000, durationMs: 100 } }, + { id: "e2", tenant: "t1", data: { status: "completed", trigger: "cli", startedAt: 2000, durationMs: 200 } }, + { id: "e3", tenant: "t1", data: { status: "failed", trigger: "http", startedAt: 3000, durationMs: 300 } }, + { id: "e4", tenant: "t1", data: { status: "running", trigger: "http", startedAt: 4000, durationMs: null } }, + { id: "e5", tenant: "t2", data: { status: "completed", trigger: "cli", startedAt: 5000, durationMs: 500 } }, +]; + +// Rows whose status carries literal LIKE wildcards (`_`, `%`). Kept out of the +// shared seed (existing assertions count exact t1 rows) and seeded only by the +// wildcard-parity test under a dedicated tenant. `a_b%c` must be matched +// literally; `axbyc` is the lookalike an unescaped LIKE would wrongly catch. +const wildcardRows: readonly SeedRow[] = [ + { id: "w1", tenant: "tw", data: { status: "a_b%c", trigger: "cli", startedAt: 6000, durationMs: 600 } }, + { id: "w2", tenant: "tw", data: { status: "axbyc", trigger: "cli", startedAt: 7000, durationMs: 700 } }, +]; + +const inT1 = (eb: Parameters[1]["where"]>>[0]) => + eb("tenant", "=", "t1"); + +const statusIn = (values: readonly string[]): JsonFilter => ({ + kind: "array", + path: ["status"], + valueType: "text", + operator: "in", + values, +}); + +const statusCompare = ( + operator: "=" | "contains" | "starts with" | "ends with", + value: string, +): JsonFilter => ({ + kind: "compare", + path: ["status"], + valueType: "text", + operator, + value, +}); + +const seed = async (orm: EventsQuery) => { + await orm.createMany("events", seedRows.map((row) => ({ ...row }))); +}; + +interface Harness { + readonly orm: EventsQuery; + readonly close: () => Promise; +} + +const makeMemoryHarness = async (): Promise => { + const client = fumadb({ namespace: "aggregate_test", schemas: [v1] }).client(memoryAdapter()); + return { orm: client.orm("1.0.0") as EventsQuery, close: async () => {} }; +}; + +const makeSqliteHarness = async (): Promise => { + const sqlite = new Database(":memory:"); + const schemaArgs = { + tables: v1.tables, + namespace: "aggregate_test", + version: "1.0.0", + provider: "sqlite", + } as const; + const drizzleDb = drizzle(sqlite, { schema: createDrizzleRuntimeSchemaFromTables(schemaArgs) }); + for (const statement of createDrizzleRuntimeSchemaSqlFromTables(schemaArgs)) { + sqlite.exec(statement); + } + const client = fumadb({ namespace: "aggregate_test", schemas: [v1] }).client( + drizzleAdapter({ db: drizzleDb, provider: "sqlite" }), + ); + return { + orm: client.orm("1.0.0") as EventsQuery, + close: async () => { + sqlite.close(); + }, + }; +}; + +const runSuite = (name: string, makeHarness: () => Promise) => { + describe(`json aggregation (${name})`, () => { + let harness: Harness; + beforeEach(async () => { + harness = await makeHarness(); + await seed(harness.orm); + }); + afterEach(async () => { + await harness.close(); + }); + + it("jsonCount scopes by real columns and JSON filter", async () => { + const { orm } = harness; + expect(await orm.jsonCount("events", { column: "data", where: inT1 })).toBe(4); + expect(await orm.jsonCount("events", { column: "data", where: (eb) => eb("tenant", "=", "t2") })).toBe(1); + expect( + await orm.jsonCount("events", { column: "data", where: inT1, filter: statusIn(["completed"]) }), + ).toBe(2); + }); + + it("matches empty composite filter semantics", async () => { + const { orm } = harness; + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "or", items: [] }, + }), + ).toBe(0); + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "and", items: [] }, + }), + ).toBe(4); + }); + + it("jsonGroupCount counts distinct values", async () => { + const rows = await harness.orm.jsonGroupCount("events", { + column: "data", + where: inT1, + path: ["status"], + }); + const byValue = Object.fromEntries(rows.map((row) => [row.value, row.count])); + expect(byValue).toEqual({ completed: 2, failed: 1, running: 1 }); + }); + + it("jsonTimeBuckets buckets a numeric path", async () => { + const rows = await harness.orm.jsonTimeBuckets("events", { + column: "data", + where: inT1, + path: ["startedAt"], + bucketMs: 2000, + }); + expect(rows).toEqual([ + { bucket: 0, count: 1 }, + { bucket: 2000, count: 2 }, + { bucket: 4000, count: 1 }, + ]); + }); + + it("jsonStats computes count/min/max and continuous percentiles", async () => { + const stats = await harness.orm.jsonStats("events", { + column: "data", + where: inT1, + path: ["durationMs"], + percentiles: [0, 0.5, 1], + }); + expect(stats.count).toBe(3); + expect(stats.min).toBe(100); + expect(stats.max).toBe(300); + expect(stats.percentiles).toEqual([ + { fraction: 0, value: 100 }, + { fraction: 0.5, value: 200 }, + { fraction: 1, value: 300 }, + ]); + }); + + it("jsonPage keyset-paginates by a JSON path with a real-column tiebreak", async () => { + const { orm } = harness; + const page1 = await orm.jsonPage("events", { + column: "data", + where: inT1, + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "desc" }], + keyColumn: "id", + keyDirection: "desc", + limit: 2, + }); + expect(page1.map((row) => row.id)).toEqual(["e4", "e3"]); + + const page2 = await orm.jsonPage("events", { + column: "data", + where: inT1, + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "desc" }], + keyColumn: "id", + keyDirection: "desc", + limit: 2, + cursor: { values: [3000], key: "e3" }, + }); + expect(page2.map((row) => row.id)).toEqual(["e2", "e1"]); + }); + + it("jsonPage applies the JSON filter", async () => { + const rows = await harness.orm.jsonPage("events", { + column: "data", + where: inT1, + filter: statusIn(["completed", "failed"]), + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 10, + }); + expect(rows.map((row) => row.id)).toEqual(["e1", "e2", "e3"]); + }); + + it("excludes null-path rows from = / != filters (SQL three-valued logic)", async () => { + const { orm } = harness; + // e4 has durationMs: null. `!= 200` excludes e2 (it matches 200) AND e4 + // (NULL comparisons are unknown in SQL), leaving e1 + e3. + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "compare", path: ["durationMs"], valueType: "number", operator: "!=", value: 200 }, + }), + ).toBe(2); + // `= 100` matches only e1; the null row never matches `=`. + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { kind: "compare", path: ["durationMs"], valueType: "number", operator: "=", value: 100 }, + }), + ).toBe(1); + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { + kind: "array", + path: ["durationMs"], + valueType: "number", + operator: "in", + values: [100, 300], + }, + }), + ).toBe(2); + expect( + await orm.jsonCount("events", { + column: "data", + where: inT1, + filter: { + kind: "array", + path: ["durationMs"], + valueType: "number", + operator: "not in", + values: [200], + }, + }), + ).toBe(2); + }); + + it("quotes JSON path segments for nested object keys", async () => { + const { orm } = harness; + await orm.create("events", { + id: "path-1", + tenant: "tp", + data: { + "a.b": { + 'c"d': 7, + }, + a: { + b: { + 'c"d': 99, + }, + }, + }, + }); + + await expect( + orm.jsonCount("events", { + column: "data", + where: (eb) => eb("tenant", "=", "tp"), + filter: { + kind: "compare", + path: ["a.b", 'c"d'], + valueType: "number", + operator: "=", + value: 7, + }, + }), + ).resolves.toBe(1); + }); + + it("keyset-paginates a nullable sort column without truncating", async () => { + const { orm } = harness; + const collected: string[] = []; + let cursor: { readonly values: readonly (number | null)[]; readonly key: string } | undefined; + for (let i = 0; i < 10; i += 1) { + const rows = await orm.jsonPage("events", { + column: "data", + where: inT1, + orderBy: [{ path: ["durationMs"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 1, + cursor, + }); + if (rows.length === 0) break; + const last = rows[rows.length - 1]!; + const key = last.id as string; + collected.push(key); + cursor = { values: [(last.data as { durationMs: number | null }).durationMs], key }; + } + // asc, nulls first: e4(null), then e1(100), e2(200), e3(300). The page + // whose cursor is the null row must still return the non-null rows, a + // naive `durationMs > NULL` predicate would truncate here. + expect(collected).toEqual(["e4", "e1", "e2", "e3"]); + }); + }); +}; + +runSuite("memory", makeMemoryHarness); +runSuite("sqlite", makeSqliteHarness); + +// LIKE-wildcard parity: a value carrying literal `_`/`%` must match the same +// rows under the drizzle (sqlite) adapter as under the literal memory adapter. +// Without ESCAPE these wildcards would make sqlite over-match (`a_b%c` would +// also catch `axbyc`). +describe("json filter LIKE-wildcard parity (memory vs sqlite)", () => { + let memory: Harness; + let sqlite: Harness; + beforeEach(async () => { + memory = await makeMemoryHarness(); + sqlite = await makeSqliteHarness(); + await memory.orm.createMany("events", wildcardRows.map((row) => ({ ...row }))); + await sqlite.orm.createMany("events", wildcardRows.map((row) => ({ ...row }))); + }); + afterEach(async () => { + await memory.close(); + await sqlite.close(); + }); + + const idsFor = async (orm: EventsQuery, filter: JsonFilter): Promise => { + const rows = await orm.jsonPage("events", { + column: "data", + where: (eb) => eb("tenant", "=", "tw"), + filter, + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 100, + }); + return rows.map((row) => row.id as string); + }; + + it.each([ + ["contains", statusCompare("contains", "_b%")], + ["eq", statusCompare("=", "a_b%c")], + ["starts with", statusCompare("starts with", "a_")], + ["ends with", statusCompare("ends with", "%c")], + ] as const)("matches identically for %s", async (_label, filter) => { + const fromMemory = await idsFor(memory.orm, filter); + const fromSqlite = await idsFor(sqlite.orm, filter); + expect(fromSqlite).toEqual(fromMemory); + // The wildcard row must be selected, the lookalike (`axbyc`) must not. + expect(fromMemory).toContain("w1"); + expect(fromMemory).not.toContain("w2"); + }); +}); + +describe("json aggregation unsupported adapters", () => { + it("fails loudly when the adapter has no JSON aggregate hooks", async () => { + let unsupported: EventsQuery; + const adapter: ORMAdapter = { + tables: v1.tables, + count: async () => 0, + findFirst: async () => null, + findMany: async () => [], + updateMany: async () => {}, + upsert: async () => {}, + create: async () => ({}), + createMany: async () => [], + deleteMany: async () => {}, + transaction: async (run: (transactionInstance: EventsQuery) => Promise): Promise => + run(unsupported), + }; + unsupported = toORM(adapter); + + await expect(unsupported.jsonCount("events", { column: "data" })).rejects.toThrow( + "[FumaDB] jsonCount is not supported by this adapter.", + ); + }); +}); diff --git a/packages/core/fumadb/src/query/aggregate.ts b/packages/core/fumadb/src/query/aggregate.ts new file mode 100644 index 000000000..5f3075440 --- /dev/null +++ b/packages/core/fumadb/src/query/aggregate.ts @@ -0,0 +1,181 @@ +import type { AnyColumn } from "../schema/create"; +import type { Condition, ConditionBuilder } from "./condition-builder"; + +// --------------------------------------------------------------------------- +// JSON-document aggregation + keyset pagination. +// +// These operate over values addressed *inside* a JSON document column (e.g. +// `plugin_storage.data`), not over real typed columns. The addressing model is +// therefore path-based and carries an explicit extraction type so adapters can +// emit correct per-dialect SQL (Postgres `->>` returns text and needs casts; +// SQLite `json_extract` is natively typed). Only adapters that implement the +// matching `ORMAdapter` hooks support these; others throw loudly. +// --------------------------------------------------------------------------- + +export type JsonValueType = "text" | "number" | "boolean"; + +export type JsonScalar = string | number | boolean | null; + +/** A non-empty path into a JSON document column, e.g. `["status"]`. */ +export type JsonPath = readonly [string, ...string[]]; + +export type JsonCompareOperator = + | "=" + | "!=" + | ">" + | ">=" + | "<" + | "<=" + | "contains" + | "starts with" + | "ends with"; + +export type JsonArrayOperator = "in" | "not in"; + +/** A predicate tree over JSON-document paths, ANDed into the real-column where. */ +export type JsonFilter = + | { + readonly kind: "compare"; + readonly path: JsonPath; + readonly valueType: JsonValueType; + readonly operator: JsonCompareOperator; + readonly value: JsonScalar; + } + | { + readonly kind: "array"; + readonly path: JsonPath; + readonly valueType: JsonValueType; + readonly operator: JsonArrayOperator; + readonly values: readonly JsonScalar[]; + } + | { readonly kind: "and"; readonly items: readonly JsonFilter[] } + | { readonly kind: "or"; readonly items: readonly JsonFilter[] }; + +// --- Adapter-facing options (column/keyColumn resolved to AnyColumn, where +// compiled to a Condition with read policies already applied) --------------- + +export interface JsonAdapterBase { + /** The JSON document column the paths address. */ + readonly column: AnyColumn; + /** Real-column scoping condition (read policies already applied). */ + readonly where?: Condition; + /** JSON-path predicates, conjoined with `where`. */ + readonly filter?: JsonFilter; +} + +export type JsonCountAdapterOptions = JsonAdapterBase; + +export interface JsonGroupCountAdapterOptions extends JsonAdapterBase { + readonly path: JsonPath; + /** Extraction type for the group key; defaults to "text". */ + readonly valueType?: JsonValueType; +} + +export interface JsonGroupCountRow { + readonly value: JsonScalar; + readonly count: number; +} + +export interface JsonTimeBucketAdapterOptions extends JsonAdapterBase { + /** Numeric (epoch-ms) path the buckets are computed from. */ + readonly path: JsonPath; + readonly bucketMs: number; +} + +export interface JsonTimeBucketRow { + /** Bucket floor: `floor(value / bucketMs) * bucketMs`. */ + readonly bucket: number; + readonly count: number; +} + +export interface JsonStatsAdapterOptions extends JsonAdapterBase { + /** Numeric path the stats are computed over. */ + readonly path: JsonPath; + /** + * Percentile fractions in [0, 1] (e.g. 0.5, 0.95). + * + * SQLite adapters compute percentiles from all matching numeric values after + * projection because SQLite has no native percentile aggregate. Count, min, + * and max still run as SQL aggregates. + */ + readonly percentiles?: readonly number[]; +} + +export interface JsonPercentile { + readonly fraction: number; + readonly value: number; +} + +export interface JsonStats { + readonly count: number; + readonly min: number | null; + readonly max: number | null; + readonly percentiles: readonly JsonPercentile[]; +} + +export interface JsonKeysetOrder { + readonly path: JsonPath; + readonly valueType: JsonValueType; + readonly direction: "asc" | "desc"; +} + +export interface JsonKeysetCursor { + /** One value per `orderBy` entry, in order. */ + readonly values: readonly JsonScalar[]; + /** Tiebreak value of the `keyColumn` for the cursor row. */ + readonly key: string; +} + +export interface JsonPageAdapterOptions extends JsonAdapterBase { + readonly orderBy: readonly JsonKeysetOrder[]; + /** Real column used as a stable tiebreak (e.g. the storage key). */ + readonly keyColumn: AnyColumn; + readonly keyDirection: "asc" | "desc"; + readonly cursor?: JsonKeysetCursor; + readonly limit: number; +} + +// --- Public (AbstractQuery) options: name strings + a where builder -------- + +export interface JsonPublicBase> { + readonly column: keyof TColumns & string; + readonly where?: (eb: ConditionBuilder) => Condition | boolean; + readonly filter?: JsonFilter; +} + +export type JsonCountOptions> = + JsonPublicBase; + +export interface JsonGroupCountOptions> + extends JsonPublicBase { + readonly path: JsonPath; + readonly valueType?: JsonValueType; +} + +export interface JsonTimeBucketOptions> + extends JsonPublicBase { + readonly path: JsonPath; + readonly bucketMs: number; +} + +export interface JsonStatsOptions> + extends JsonPublicBase { + readonly path: JsonPath; + /** + * Percentile fractions in [0, 1] (e.g. 0.5, 0.95). + * + * SQLite adapters compute percentiles from all matching numeric values after + * projection because SQLite has no native percentile aggregate. Count, min, + * and max still run as SQL aggregates. + */ + readonly percentiles?: readonly number[]; +} + +export interface JsonPageOptions> + extends JsonPublicBase { + readonly orderBy: readonly JsonKeysetOrder[]; + readonly keyColumn: keyof TColumns & string; + readonly keyDirection?: "asc" | "desc"; + readonly cursor?: JsonKeysetCursor; + readonly limit: number; +} diff --git a/packages/core/fumadb/src/query/index.ts b/packages/core/fumadb/src/query/index.ts index 2128adb0e..a339b8b86 100644 --- a/packages/core/fumadb/src/query/index.ts +++ b/packages/core/fumadb/src/query/index.ts @@ -6,9 +6,20 @@ import type { TableInsertValues, TableUpdateValues, } from "../schema/create"; +import type { + JsonCountOptions, + JsonGroupCountOptions, + JsonGroupCountRow, + JsonPageOptions, + JsonStats, + JsonStatsOptions, + JsonTimeBucketOptions, + JsonTimeBucketRow, +} from "./aggregate"; import type { Condition, ConditionBuilder } from "./condition-builder"; import type { ORMAdapter } from "./orm"; +export type * from "./aggregate"; export type { Condition, ConditionBuilder } from "./condition-builder"; export { ConditionType } from "./condition-builder"; export { withQueryContext } from "./orm"; @@ -208,4 +219,39 @@ export interface AbstractQuery { eb: ConditionBuilder ) => Condition | boolean; }) => Promise; + + // --- JSON-document aggregation + keyset pagination ----------------------- + // Operate over values addressed inside a JSON document column. Only adapters + // that implement the matching `ORMAdapter` hooks support these; the rest + // throw `[FumaDB] is not supported by this adapter`. + + /** Count rows matching the real-column `where` and JSON-path `filter`. */ + jsonCount: ( + table: TableName, + options: JsonCountOptions + ) => Promise; + + /** Group by a JSON path and count rows per distinct value. */ + jsonGroupCount: ( + table: TableName, + options: JsonGroupCountOptions + ) => Promise; + + /** Bucket a numeric JSON path by `bucketMs` and count rows per bucket. */ + jsonTimeBuckets: ( + table: TableName, + options: JsonTimeBucketOptions + ) => Promise; + + /** Compute count/min/max and percentiles over a numeric JSON path. */ + jsonStats: ( + table: TableName, + options: JsonStatsOptions + ) => Promise; + + /** Keyset-paginate rows ordered by JSON paths with a real-column tiebreak. */ + jsonPage: ( + table: TableName, + options: JsonPageOptions + ) => Promise[]>; } diff --git a/packages/core/fumadb/src/query/orm/index.ts b/packages/core/fumadb/src/query/orm/index.ts index f0bac42e7..1d76a758f 100644 --- a/packages/core/fumadb/src/query/orm/index.ts +++ b/packages/core/fumadb/src/query/orm/index.ts @@ -12,6 +12,16 @@ import type { JoinBuilder, OrderBy, } from ".."; +import type { + JsonCountAdapterOptions, + JsonGroupCountAdapterOptions, + JsonGroupCountRow, + JsonPageAdapterOptions, + JsonStats, + JsonStatsAdapterOptions, + JsonTimeBucketAdapterOptions, + JsonTimeBucketRow, +} from "../aggregate"; import { buildCondition, createBuilder, type Condition } from "../condition-builder"; export interface CompiledJoin { @@ -311,6 +321,25 @@ export interface ORMAdapter { transaction: ( run: (transactionInstance: AbstractQuery) => Promise, ) => Promise; + + // --- JSON-document aggregation + keyset pagination (optional) ------------ + // Adapters that can push these into the database implement them; `toORM` + // throws a clear error for adapters that don't. + + jsonCount?: (table: AnyTable, options: JsonCountAdapterOptions) => Promise; + jsonGroupCount?: ( + table: AnyTable, + options: JsonGroupCountAdapterOptions, + ) => Promise; + jsonTimeBuckets?: ( + table: AnyTable, + options: JsonTimeBucketAdapterOptions, + ) => Promise; + jsonStats?: (table: AnyTable, options: JsonStatsAdapterOptions) => Promise; + jsonPage?: ( + table: AnyTable, + options: JsonPageAdapterOptions, + ) => Promise[]>; } export interface ToORMOptions { @@ -334,6 +363,33 @@ export function toORM( return table; } + function toColumn(table: AnyTable, name: string): AnyColumn { + const column = table.columns[name]; + if (!column) + throw new Error(`[FumaDB] Invalid column name ${name} in ${table.ormName}.`); + return column; + } + + // Compiles a public where builder, applies read policies, and yields the + // scoping condition. Returns `false` when the query is statically empty. + async function compileScopedWhere( + table: AnyTable, + where: + | ((eb: ReturnType) => Condition | boolean) + | undefined, + ): Promise { + let conditions = where ? buildCondition(table.columns, where) : undefined; + if (conditions === true) conditions = undefined; + if (conditions === false) return false; + return applyReadPolicies(table, conditions, context); + } + + function requireJsonOp(op: T | undefined, name: string): T { + if (!op) + throw new Error(`[FumaDB] ${name} is not supported by this adapter.`); + return op; + } + const query = { internal, async count(name, { where } = {}) { @@ -428,6 +484,69 @@ export function toORM( if (constrainedWhere === false) return; return internal.updateMany(table, { set, where: constrainedWhere }); }, + async jsonCount(name, { column, where, filter }) { + const op = requireJsonOp(internal.jsonCount, "jsonCount"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) return 0; + return op(table, { column: toColumn(table, column), where: scopedWhere, filter }); + }, + async jsonGroupCount(name, { column, where, filter, path, valueType }) { + const op = requireJsonOp(internal.jsonGroupCount, "jsonGroupCount"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) return []; + return op(table, { + column: toColumn(table, column), + where: scopedWhere, + filter, + path, + valueType, + }); + }, + async jsonTimeBuckets(name, { column, where, filter, path, bucketMs }) { + const op = requireJsonOp(internal.jsonTimeBuckets, "jsonTimeBuckets"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) return []; + return op(table, { + column: toColumn(table, column), + where: scopedWhere, + filter, + path, + bucketMs, + }); + }, + async jsonStats(name, { column, where, filter, path, percentiles }) { + const op = requireJsonOp(internal.jsonStats, "jsonStats"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, where); + if (scopedWhere === false) + return { count: 0, min: null, max: null, percentiles: [] }; + return op(table, { + column: toColumn(table, column), + where: scopedWhere, + filter, + path, + percentiles, + }); + }, + async jsonPage(name, options) { + const op = requireJsonOp(internal.jsonPage, "jsonPage"); + const table = toTable(name); + const scopedWhere = await compileScopedWhere(table, options.where); + if (scopedWhere === false) return []; + return op(table, { + column: toColumn(table, options.column), + where: scopedWhere, + filter: options.filter, + orderBy: options.orderBy, + keyColumn: toColumn(table, options.keyColumn), + keyDirection: options.keyDirection ?? "asc", + cursor: options.cursor, + limit: options.limit, + }) as Promise[]>; + }, async transaction(run) { return internal.transaction((transactionInstance) => run(withQueryContext(transactionInstance, context)), diff --git a/packages/core/fumadb/src/query/table-policy.test.ts b/packages/core/fumadb/src/query/table-policy.test.ts index 51d0c2b8c..d325894aa 100644 --- a/packages/core/fumadb/src/query/table-policy.test.ts +++ b/packages/core/fumadb/src/query/table-policy.test.ts @@ -103,12 +103,35 @@ const comments = table("policy_comments", { }, }); +const events = table("policy_events", { + id: idColumn("id", "varchar(255)"), + tenantId: column("tenant_id", "varchar(255)"), + data: column("data", "json"), +}).policy({ + name: "tenant.events", + onRead: ({ builder, context }) => { + if (isReadDenied("events", context)) return false; + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onCreate: ({ values, context }) => assertTenantAllowed("events", context, values.tenantId), + onUpdate: ({ builder, set, context }) => { + observe(context, "events:update"); + if (set.tenantId !== undefined) assertTenantAllowed("events", context, set.tenantId); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, + onDelete: ({ builder, context }) => { + observe(context, "events:delete"); + return builder("tenantId", "in", [...context.allowedTenantIds]); + }, +}); + const v1 = schema({ version: "1.0.0", tables: { authors, posts, comments, + events, }, relations: { authors: ({ many }) => ({ @@ -235,6 +258,33 @@ const seedTenants = async (orm: TablePolicyQuery) => { body: "B comment", }, ]); + + await seed.createMany("events", [ + { + id: "event-a-1", + tenantId: "tenant-a", + data: { + status: "completed", + startedAt: 1000, + }, + }, + { + id: "event-a-2", + tenantId: "tenant-a", + data: { + status: "failed", + startedAt: 2000, + }, + }, + { + id: "event-b-1", + tenantId: "tenant-b", + data: { + status: "completed", + startedAt: 3000, + }, + }, + ]); }; describe("FumaDB table policies", () => { @@ -460,6 +510,67 @@ describe("FumaDB table policies", () => { }), ); + it.effect("applies read policies before JSON aggregation and keyset queries", () => + useHarness(async (orm) => { + await seedTenants(orm); + + const tenantAContext = makeContext(["tenant-a"], "tenant-a"); + const tenantA = withQueryContext(orm, tenantAContext); + + await expect(tenantA.jsonCount("events", { column: "data" })).resolves.toBe(2); + await expect( + tenantA.jsonCount("events", { + column: "data", + filter: { + kind: "compare", + path: ["status"], + valueType: "text", + operator: "=", + value: "completed", + }, + }), + ).resolves.toBe(1); + + const groups = await tenantA.jsonGroupCount("events", { + column: "data", + path: ["status"], + }); + expect(Object.fromEntries(groups.map((row) => [row.value, row.count]))).toEqual({ + completed: 1, + failed: 1, + }); + + await expect( + tenantA.jsonPage("events", { + column: "data", + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 10, + }), + ).resolves.toMatchObject([{ id: "event-a-1" }, { id: "event-a-2" }]); + + const deniedEvents = withQueryContext( + orm, + makeContext(["tenant-a"], "denied-events", ["events"]), + ); + await expect(deniedEvents.jsonCount("events", { column: "data" })).resolves.toBe(0); + await expect( + deniedEvents.jsonPage("events", { + column: "data", + orderBy: [{ path: ["startedAt"], valueType: "number", direction: "asc" }], + keyColumn: "id", + keyDirection: "asc", + limit: 10, + }), + ).resolves.toEqual([]); + + expect(tenantAContext.observed).toEqual( + expect.arrayContaining(["tenant-a:events:read"]), + ); + }), + ); + it.effect("fails closed when a query wrapper does not forward context rebinding", () => useHarness(async (orm) => { const wrapped = { ...orm }; diff --git a/packages/core/sdk/src/executor.ts b/packages/core/sdk/src/executor.ts index 38e12a3c6..f5316d05b 100644 --- a/packages/core/sdk/src/executor.ts +++ b/packages/core/sdk/src/executor.ts @@ -2,7 +2,21 @@ import { Effect, Inspectable, Layer, Option, Predicate, Schema } from "effect"; import { FetchHttpClient, type HttpClient } from "effect/unstable/http"; import { fumadb } from "@executor-js/fumadb"; import { memoryAdapter } from "@executor-js/fumadb/adapters/memory"; -import { withQueryContext, type Condition, type ConditionBuilder } from "@executor-js/fumadb/query"; +import { + withQueryContext, + type Condition, + type ConditionBuilder, + type JsonCompareOperator, + type JsonFilter, + type JsonGroupCountRow, + type JsonKeysetCursor, + type JsonKeysetOrder, + type JsonPath, + type JsonScalar, + type JsonStats, + type JsonTimeBucketRow, + type JsonValueType, +} from "@executor-js/fumadb/query"; import { schema as fumaSchema, type RelationsMap } from "@executor-js/fumadb/schema"; import type { AnyColumn } from "@executor-js/fumadb/schema"; import { generateKeyBetween } from "fractional-indexing"; @@ -591,6 +605,31 @@ type CoreProjectedRow = TSelect extends re ? Pick, Extract>> : CoreRow; +type CoreJsonBase = { + readonly column: string; + readonly where?: CoreWhere; + readonly filter?: JsonFilter; +}; +type CoreJsonGroupCountOptions = CoreJsonBase & { + readonly path: JsonPath; + readonly valueType?: JsonValueType; +}; +type CoreJsonTimeBucketOptions = CoreJsonBase & { + readonly path: JsonPath; + readonly bucketMs: number; +}; +type CoreJsonStatsOptions = CoreJsonBase & { + readonly path: JsonPath; + readonly percentiles?: readonly number[]; +}; +type CoreJsonPageOptions = CoreJsonBase & { + readonly orderBy: readonly JsonKeysetOrder[]; + readonly keyColumn: string; + readonly keyDirection?: "asc" | "desc"; + readonly cursor?: JsonKeysetCursor; + readonly limit: number; +}; + type LooseStorageDb = { readonly count: (tableName: string, options?: unknown) => Promise; readonly create: ( @@ -611,6 +650,20 @@ type LooseStorageDb = { options?: unknown, ) => Promise[]>; readonly updateMany: (tableName: string, options: unknown) => Promise; + readonly jsonCount: (tableName: string, options: unknown) => Promise; + readonly jsonGroupCount: ( + tableName: string, + options: unknown, + ) => Promise; + readonly jsonTimeBuckets: ( + tableName: string, + options: unknown, + ) => Promise; + readonly jsonStats: (tableName: string, options: unknown) => Promise; + readonly jsonPage: ( + tableName: string, + options: unknown, + ) => Promise[]>; }; const asLooseStorageDb = (db: unknown): LooseStorageDb => db as LooseStorageDb; @@ -668,6 +721,37 @@ const makeCoreDb = (fuma: ReturnType) => ({ fuma.use(`${tableName}.updateMany`, (db) => asLooseStorageDb(db).updateMany(tableName, options), ), + jsonCount: ( + tableName: CoreTableName, + options: CoreJsonBase, + ): Effect.Effect => + fuma.use(`${tableName}.jsonCount`, (db) => asLooseStorageDb(db).jsonCount(tableName, options)), + jsonGroupCount: ( + tableName: CoreTableName, + options: CoreJsonGroupCountOptions, + ): Effect.Effect => + fuma.use(`${tableName}.jsonGroupCount`, (db) => + asLooseStorageDb(db).jsonGroupCount(tableName, options), + ), + jsonTimeBuckets: ( + tableName: CoreTableName, + options: CoreJsonTimeBucketOptions, + ): Effect.Effect => + fuma.use(`${tableName}.jsonTimeBuckets`, (db) => + asLooseStorageDb(db).jsonTimeBuckets(tableName, options), + ), + jsonStats: ( + tableName: CoreTableName, + options: CoreJsonStatsOptions, + ): Effect.Effect => + fuma.use(`${tableName}.jsonStats`, (db) => asLooseStorageDb(db).jsonStats(tableName, options)), + jsonPage: ( + tableName: TName, + options: CoreJsonPageOptions, + ): Effect.Effect[], StorageFailure> => + fuma.use(`${tableName}.jsonPage`, (db) => + asLooseStorageDb(db).jsonPage(tableName, options), + ) as Effect.Effect[], StorageFailure>, }); type CoreDb = ReturnType; @@ -741,6 +825,17 @@ const pluginStorageWhereOperators = ["eq", "in", "gt", "gte", "lt", "lte"] as co const isPluginStorageWhereFilter = (value: unknown): value is Readonly> => isPluginStorageRecord(value) && pluginStorageWhereOperators.some((operator) => operator in value); +const pluginStorageJsonCompareOperators = { + eq: "=", + gt: ">", + gte: ">=", + lt: "<", + lte: "<=", +} satisfies Record< + Exclude<(typeof pluginStorageWhereOperators)[number], "in">, + JsonCompareOperator +>; + const pluginStorageComparableValue = (value: unknown): string | number | boolean | null => { if (value instanceof Date) return value.getTime(); if (typeof value === "number" || typeof value === "string" || typeof value === "boolean") { @@ -803,6 +898,92 @@ const rowMatchesPluginStorageWhere = ( return true; }; +const inferJsonValueType = (value: unknown): JsonValueType => { + if (value instanceof Date) return "number"; + if (typeof value === "number") return "number"; + if (typeof value === "boolean") return "boolean"; + return "text"; +}; + +const pluginStorageJsonScalar = (value: unknown): JsonScalar => pluginStorageComparableValue(value); + +// Translate the facade's indexed-field `where` into a JSON-path filter the SQL +// pushdown understands. Field value types are inferred from the operands. +const pluginStorageWhereToJsonFilter = ( + where: Readonly> | undefined, +): JsonFilter | undefined => { + if (!where) return undefined; + const items: JsonFilter[] = []; + for (const [field, condition] of Object.entries(where)) { + const path: JsonPath = [field]; + if (isPluginStorageWhereFilter(condition)) { + for (const [operator, operand] of Object.entries(condition)) { + if (operator === "in") { + const values = Array.isArray(operand) ? operand : []; + items.push({ + kind: "array", + path, + valueType: inferJsonValueType(values[0]), + operator: "in", + values: values.map(pluginStorageJsonScalar), + }); + continue; + } + if (!(operator in pluginStorageJsonCompareOperators)) continue; + const compareOperator = + pluginStorageJsonCompareOperators[ + operator as keyof typeof pluginStorageJsonCompareOperators + ]; + items.push({ + kind: "compare", + path, + valueType: inferJsonValueType(operand), + operator: compareOperator, + value: pluginStorageJsonScalar(operand), + }); + } + } else { + items.push({ + kind: "compare", + path, + valueType: inferJsonValueType(condition), + operator: "=", + value: pluginStorageJsonScalar(condition), + }); + } + } + if (items.length === 0) return undefined; + if (items.length === 1) return items[0]!; + return { kind: "and", items }; +}; + +const pluginStorageFieldsValidationError = ( + definition: PluginStorageRuntimeCollectionDefinition, + fields: Iterable, +): StorageError | null => { + const indexedFields = pluginStorageCollectionIndexedFields(definition); + for (const field of fields) { + if (!indexedFields.has(field)) { + return new StorageError({ + message: `Plugin storage collection "${definition.name}" cannot query field "${field}" because it is not declared as an index`, + cause: undefined, + }); + } + } + return null; +}; + +const pluginStorageInvalidLimitError = ( + definition: PluginStorageRuntimeCollectionDefinition, + limit: number, +): StorageError | null => + Number.isInteger(limit) && limit >= 0 + ? null + : new StorageError({ + message: `Plugin storage collection "${definition.name}" received an invalid query limit`, + cause: undefined, + }); + const makePluginStorageFacade = (input: { readonly core: CoreDb; readonly pluginId: string; @@ -828,6 +1009,18 @@ const makePluginStorageFacade = (input: { key === undefined ? true : b("key", "=", key), ); + // Like `whereFor` but with an optional key-prefix match, used by the + // pushdown aggregate/keyset paths. Owner/tenant scoping is added by the + // table policy, exactly as for `whereFor`. + const whereForPrefix = + (collection: string, keyPrefix?: string): CoreWhere => + (b: AnyCb) => + b.and( + b("plugin_id", "=", input.pluginId), + b("collection", "=", collection), + keyPrefix === undefined ? true : b("key", "starts with", keyPrefix), + ); + const whereOwner = (owner: Owner, collection: string, key: string): CoreWhere => { const os = ownerSubject(owner); return (b: AnyCb) => @@ -1112,6 +1305,120 @@ const makePluginStorageFacade = (input: { query: (storageInput) => queryCollection(definition, storageInput), count: (storageInput) => queryCollection(definition, storageInput).pipe(Effect.map((rows) => rows.length)), + queryKeyset: (storageInput) => + Effect.gen(function* () { + const fields = new Set([ + ...Object.keys(storageInput.where ?? {}), + ...storageInput.orderBy.map((order) => order.field), + ]); + const validationError = pluginStorageFieldsValidationError(definition, fields); + if (validationError) return yield* validationError; + const limitError = pluginStorageInvalidLimitError(definition, storageInput.limit); + if (limitError) return yield* limitError; + + const orderBy: JsonKeysetOrder[] = storageInput.orderBy.map((order) => ({ + path: [order.field], + valueType: order.valueType ?? "text", + direction: order.direction ?? "asc", + })); + const keyDirection = orderBy[0]?.direction ?? "asc"; + + const rows = yield* input.core.jsonPage("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + orderBy, + keyColumn: "key", + keyDirection, + cursor: storageInput.cursor, + limit: storageInput.limit, + }); + + const entries = rows.map((row) => + pluginStorageEntryFromRow>(row), + ); + const last = entries.at(-1); + const nextCursor = + last && entries.length >= storageInput.limit + ? { + values: storageInput.orderBy.map((order) => + pluginStorageJsonScalar(pluginStorageDataField(last.data, order.field)), + ), + key: last.key, + } + : null; + return { entries, nextCursor }; + }), + aggregate: { + count: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError( + definition, + Object.keys(storageInput?.where ?? {}), + ); + if (validationError) return yield* validationError; + return yield* input.core.jsonCount("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput?.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput?.where as Readonly> | undefined, + ), + }); + }), + groupCount: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError(definition, [ + storageInput.field, + ...Object.keys(storageInput.where ?? {}), + ]); + if (validationError) return yield* validationError; + return yield* input.core.jsonGroupCount("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + path: [storageInput.field], + valueType: storageInput.valueType ?? "text", + }); + }), + timeBuckets: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError(definition, [ + storageInput.field, + ...Object.keys(storageInput.where ?? {}), + ]); + if (validationError) return yield* validationError; + return yield* input.core.jsonTimeBuckets("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + path: [storageInput.field], + bucketMs: storageInput.bucketMs, + }); + }), + stats: (storageInput) => + Effect.gen(function* () { + const validationError = pluginStorageFieldsValidationError(definition, [ + storageInput.field, + ...Object.keys(storageInput.where ?? {}), + ]); + if (validationError) return yield* validationError; + return yield* input.core.jsonStats("plugin_storage", { + column: "data", + where: whereForPrefix(definition.name, storageInput.keyPrefix), + filter: pluginStorageWhereToJsonFilter( + storageInput.where as Readonly> | undefined, + ), + path: [storageInput.field], + percentiles: storageInput.percentiles, + }); + }), + }, remove: (storageInput) => removeImpl(storageInput.owner, definition.name, storageInput.key), }), get: (storageInput) => getVisible(storageInput.collection, storageInput.key), diff --git a/packages/core/sdk/src/fuma-runtime.ts b/packages/core/sdk/src/fuma-runtime.ts index e560b1b09..89f7b7e16 100644 --- a/packages/core/sdk/src/fuma-runtime.ts +++ b/packages/core/sdk/src/fuma-runtime.ts @@ -139,6 +139,11 @@ const makeSafeFumaQuery = ( deleteMany: (name, value) => db.deleteMany(table(name), value), findFirst: (name, value) => db.findFirst(table(name), value), findMany: (name, value) => db.findMany(table(name), value), + jsonCount: (name, value) => db.jsonCount(table(name), value), + jsonGroupCount: (name, value) => db.jsonGroupCount(table(name), value), + jsonTimeBuckets: (name, value) => db.jsonTimeBuckets(table(name), value), + jsonStats: (name, value) => db.jsonStats(table(name), value), + jsonPage: (name, value) => db.jsonPage(table(name), value), transaction: (run) => db.transaction((transactionDb) => run(makeSafeFumaQuery(transactionDb, options))), updateMany: (name, value) => db.updateMany(table(name), value), diff --git a/packages/core/sdk/src/plugin-storage-aggregate.test.ts b/packages/core/sdk/src/plugin-storage-aggregate.test.ts new file mode 100644 index 000000000..3a2575e84 --- /dev/null +++ b/packages/core/sdk/src/plugin-storage-aggregate.test.ts @@ -0,0 +1,200 @@ +import { describe, expect, it } from "@effect/vitest"; +import { Effect, Schema } from "effect"; + +import type { StorageFailure } from "./fuma-runtime"; +import { Owner } from "./ids"; +import { definePlugin } from "./plugin"; +import { + definePluginStorageCollection, + type PluginStorageAggregateFilter, + type PluginStorageGroupCountInput, + type PluginStorageQueryKeysetInput, + type PluginStorageStatsInput, + type PluginStorageTimeBucketInput, +} from "./plugin-storage"; +import { makeTestExecutor } from "./testing"; + +const Run = Schema.Struct({ + status: Schema.Literals(["completed", "failed", "running"]), + triggerKind: Schema.NullOr(Schema.String), + startedAt: Schema.Number, + durationMs: Schema.NullOr(Schema.Number), + hadInteraction: Schema.Boolean, +}); +type Run = typeof Run.Type; + +const runs = definePluginStorageCollection("runs", Run, { + indexes: ["status", "triggerKind", "startedAt", "durationMs", "hadInteraction"], +}); + +const runsPlugin = definePlugin(() => ({ + id: "runs" as const, + pluginStorage: { runs }, + storage: ({ pluginStorage }) => ({ runs: pluginStorage.collection(runs) }), + extension: (ctx) => ({ + record: (owner: Owner, key: string, data: Run) => ctx.storage.runs.put({ owner, key, data }), + count: (input?: PluginStorageAggregateFilter) => + ctx.storage.runs.aggregate.count(input), + groupCount: (input: PluginStorageGroupCountInput) => + ctx.storage.runs.aggregate.groupCount(input), + timeBuckets: (input: PluginStorageTimeBucketInput) => + ctx.storage.runs.aggregate.timeBuckets(input), + stats: (input: PluginStorageStatsInput) => ctx.storage.runs.aggregate.stats(input), + keyset: (input: PluginStorageQueryKeysetInput) => + ctx.storage.runs.queryKeyset(input), + }), +}))(); + +const seedRows: readonly (readonly [string, Run])[] = [ + [ + "r1", + { + status: "completed", + triggerKind: "cli", + startedAt: 1000, + durationMs: 100, + hadInteraction: false, + }, + ], + [ + "r2", + { + status: "completed", + triggerKind: "cli", + startedAt: 2000, + durationMs: 200, + hadInteraction: false, + }, + ], + [ + "r3", + { + status: "failed", + triggerKind: "http", + startedAt: 3000, + durationMs: 300, + hadInteraction: true, + }, + ], + [ + "r4", + { + status: "running", + triggerKind: "http", + startedAt: 4000, + durationMs: null, + hadInteraction: false, + }, + ], +]; + +const seed = (executor: { + runs: { + record: (owner: Owner, key: string, data: Run) => Effect.Effect; + }; +}) => + Effect.forEach(seedRows, ([key, data]) => executor.runs.record("org", key, data), { + discard: true, + }); + +describe("plugin storage aggregate + keyset (SQLite pushdown)", () => { + it.effect("counts with JSON filters and numeric ranges", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + expect(yield* executor.runs.count()).toBe(4); + expect(yield* executor.runs.count({ where: { status: "completed" } })).toBe(2); + expect(yield* executor.runs.count({ where: { startedAt: { gte: 3000 } } })).toBe(2); + expect(yield* executor.runs.count({ where: { durationMs: { gte: 200 } } })).toBe(2); + expect(yield* executor.runs.count({ where: { hadInteraction: true } })).toBe(1); + }), + ); + + it.effect("groups counts by a JSON field (facets)", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const byStatus = yield* executor.runs.groupCount({ field: "status" }); + expect(Object.fromEntries(byStatus.map((row) => [row.value, row.count]))).toEqual({ + completed: 2, + failed: 1, + running: 1, + }); + + const byTrigger = yield* executor.runs.groupCount({ field: "triggerKind" }); + expect(Object.fromEntries(byTrigger.map((row) => [row.value, row.count]))).toEqual({ + cli: 2, + http: 2, + }); + }), + ); + + it.effect("buckets a numeric field over time", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const buckets = yield* executor.runs.timeBuckets({ field: "startedAt", bucketMs: 2000 }); + expect(buckets).toEqual([ + { bucket: 0, count: 1 }, + { bucket: 2000, count: 2 }, + { bucket: 4000, count: 1 }, + ]); + }), + ); + + it.effect("computes duration stats and percentiles", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const stats = yield* executor.runs.stats({ field: "durationMs", percentiles: [0, 0.5, 1] }); + expect(stats.count).toBe(3); + expect(stats.min).toBe(100); + expect(stats.max).toBe(300); + expect(stats.percentiles).toEqual([ + { fraction: 0, value: 100 }, + { fraction: 0.5, value: 200 }, + { fraction: 1, value: 300 }, + ]); + }), + ); + + it.effect("keyset-paginates with a cursor and JSON filter", () => + Effect.gen(function* () { + const executor = yield* makeTestExecutor({ plugins: [runsPlugin] as const }); + yield* seed(executor); + + const page1 = yield* executor.runs.keyset({ + orderBy: [{ field: "startedAt", direction: "desc", valueType: "number" }], + limit: 2, + }); + expect(page1.entries.map((entry) => entry.key)).toEqual(["r4", "r3"]); + expect(page1.nextCursor).not.toBeNull(); + + const page2 = yield* executor.runs.keyset({ + orderBy: [{ field: "startedAt", direction: "desc", valueType: "number" }], + limit: 2, + cursor: page1.nextCursor ?? undefined, + }); + expect(page2.entries.map((entry) => entry.key)).toEqual(["r2", "r1"]); + + const page3 = yield* executor.runs.keyset({ + orderBy: [{ field: "startedAt", direction: "desc", valueType: "number" }], + limit: 2, + cursor: page2.nextCursor ?? undefined, + }); + expect(page3.entries).toHaveLength(0); + expect(page3.nextCursor).toBeNull(); + + const completed = yield* executor.runs.keyset({ + where: { status: "completed" }, + orderBy: [{ field: "startedAt", direction: "asc", valueType: "number" }], + limit: 10, + }); + expect(completed.entries.map((entry) => entry.key)).toEqual(["r1", "r2"]); + }), + ); +}); diff --git a/packages/core/sdk/src/plugin-storage.ts b/packages/core/sdk/src/plugin-storage.ts index e854feb51..9a560e321 100644 --- a/packages/core/sdk/src/plugin-storage.ts +++ b/packages/core/sdk/src/plugin-storage.ts @@ -164,6 +164,109 @@ export interface PluginStorageEntry { readonly updatedAt: Date; } +export type PluginStorageScalar = string | number | boolean | null; + +export type PluginStorageFieldType = "text" | "number" | "boolean"; + +export interface PluginStorageAggregateFilter { + readonly where?: PluginStorageCollectionWhere; + readonly keyPrefix?: string; +} + +export interface PluginStorageGroupCountInput< + TDefinition, +> extends PluginStorageAggregateFilter { + readonly field: PluginStorageCollectionIndexedField; + readonly valueType?: PluginStorageFieldType; +} + +export interface PluginStorageTimeBucketInput< + TDefinition, +> extends PluginStorageAggregateFilter { + /** Numeric (epoch-ms) field the buckets are computed from. */ + readonly field: PluginStorageCollectionIndexedField; + readonly bucketMs: number; +} + +export interface PluginStorageStatsInput< + TDefinition, +> extends PluginStorageAggregateFilter { + /** Numeric field the stats are computed over. */ + readonly field: PluginStorageCollectionIndexedField; + /** + * Percentile fractions in [0, 1] (e.g. 0.5, 0.95). + * + * SQLite has no native percentile aggregate, so SQLite-backed storage computes + * percentiles from all matching numeric values after projection. Count, min, + * and max still run as SQL aggregates. + */ + readonly percentiles?: readonly number[]; +} + +export interface PluginStorageGroupCount { + readonly value: PluginStorageScalar; + readonly count: number; +} + +export interface PluginStorageTimeBucket { + readonly bucket: number; + readonly count: number; +} + +export interface PluginStoragePercentile { + readonly fraction: number; + readonly value: number; +} + +export interface PluginStorageStats { + readonly count: number; + readonly min: number | null; + readonly max: number | null; + readonly percentiles: readonly PluginStoragePercentile[]; +} + +export interface PluginStorageKeysetOrderBy { + readonly field: PluginStorageCollectionIndexedField; + readonly direction?: "asc" | "desc"; + readonly valueType?: PluginStorageFieldType; +} + +export interface PluginStorageKeysetCursor { + readonly values: readonly PluginStorageScalar[]; + readonly key: string; +} + +export interface PluginStorageQueryKeysetInput< + TDefinition, +> extends PluginStorageAggregateFilter { + readonly orderBy: readonly PluginStorageKeysetOrderBy[]; + readonly limit: number; + readonly cursor?: PluginStorageKeysetCursor; +} + +export interface PluginStorageKeysetPage { + readonly entries: readonly PluginStorageEntry[]; + readonly nextCursor: PluginStorageKeysetCursor | null; +} + +/** SQL-pushed aggregation over a collection's JSON documents. */ +export interface PluginStorageAggregateFacade< + TDefinition extends PluginStorageCollectionDefinition, +> { + readonly count: ( + input?: PluginStorageAggregateFilter, + ) => Effect.Effect; + readonly groupCount: ( + input: PluginStorageGroupCountInput, + ) => Effect.Effect; + readonly timeBuckets: ( + input: PluginStorageTimeBucketInput, + ) => Effect.Effect; + readonly stats: ( + input: PluginStorageStatsInput, + ) => Effect.Effect; +} + export interface PluginStorageCollectionFacade< TDefinition extends PluginStorageCollectionDefinition = PluginStorageCollectionDefinition, > { @@ -197,6 +300,15 @@ export interface PluginStorageCollectionFacade< readonly count: ( input?: Omit, "orderBy" | "limit" | "offset">, ) => Effect.Effect; + /** SQL-pushed keyset pagination ordered by indexed JSON fields. */ + readonly queryKeyset: ( + input: PluginStorageQueryKeysetInput, + ) => Effect.Effect< + PluginStorageKeysetPage>, + StorageFailure + >; + /** SQL-pushed aggregation (counts, facets, time buckets, percentiles). */ + readonly aggregate: PluginStorageAggregateFacade; readonly remove: ( input: PluginStorageCollectionScopedKeyInput, ) => Effect.Effect; diff --git a/packages/core/sdk/src/test-config.ts b/packages/core/sdk/src/test-config.ts index 7aba38ff0..66948f5c3 100644 --- a/packages/core/sdk/src/test-config.ts +++ b/packages/core/sdk/src/test-config.ts @@ -62,6 +62,11 @@ const makeLazyTestFumaDb = (options: { "deleteMany", "findFirst", "findMany", + "jsonCount", + "jsonGroupCount", + "jsonTimeBuckets", + "jsonStats", + "jsonPage", "transaction", "updateMany", "upsert", diff --git a/packages/plugins/openapi/src/sdk/store.test.ts b/packages/plugins/openapi/src/sdk/store.test.ts index ef4a4ed2c..bab86fbbe 100644 --- a/packages/plugins/openapi/src/sdk/store.test.ts +++ b/packages/plugins/openapi/src/sdk/store.test.ts @@ -51,6 +51,13 @@ describe("OpenAPI operation store", () => { ), query: () => Effect.succeed([]), count: () => Effect.succeed(0), + queryKeyset: () => Effect.succeed({ entries: [], nextCursor: null }), + aggregate: { + count: () => Effect.succeed(0), + groupCount: () => Effect.succeed([]), + timeBuckets: () => Effect.succeed([]), + stats: () => Effect.succeed({ count: 0, min: null, max: null, percentiles: [] }), + }, remove: () => Effect.void, }), get: (input: { readonly collection: string; readonly key: string }) =>