Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .changeset/fumadb-json-pushdown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
---
"@executor-js/fumadb": minor
"@executor-js/sdk": minor
---

Push JSON-document aggregation and keyset pagination down to SQL. FumaDB's query
layer gains `jsonCount`, `jsonGroupCount`, `jsonTimeBuckets`, `jsonStats`
(min/max + continuous percentiles), and `jsonPage` (cursor pagination) over a
JSON document column, implemented natively for the memory and Drizzle
(SQLite/Postgres) adapters and failing loudly on adapters that don't support
them. Plugin storage exposes this as `collection.aggregate.{count,groupCount,
timeBuckets,stats}` and `collection.queryKeyset(...)`, translating a
collection's indexed-field `where` into JSON-path predicates while owner/tenant
scoping stays enforced by the storage policy. Aggregates and pages are computed
in the database instead of by fetching the whole collection and reducing in
memory.
297 changes: 297 additions & 0 deletions packages/core/fumadb/src/adapters/drizzle/query.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import * as Drizzle from "drizzle-orm";
import type * as PostgreSQL from "drizzle-orm/pg-core";
import type { AbstractQuery, FindManyOptions } from "../../query";
import type {
JsonFilter,
JsonPath,
JsonScalar,
JsonValueType,
} from "../../query/aggregate";
import { coerceJsonValue, computePercentiles } from "../../query/aggregate-eval";
import { type Condition, ConditionType } from "../../query/condition-builder";
import { type SimplifyFindOptions, toORM } from "../../query/orm";
import {
Expand Down Expand Up @@ -263,6 +270,120 @@ export function fromDrizzle(
return out;
}

// --- JSON-document aggregation helpers ----------------------------------
// Extract a JSON path as a typed SQL expression. SQLite `json_extract` is
// natively typed; Postgres `#>>` returns text and needs explicit casts.
function jsonExtractSql(
jsonColumn: ColumnType,
path: JsonPath,
valueType: JsonValueType,
): Drizzle.SQL {
if (provider === "postgresql") {
// Build a `text[]` array literal with each segment double-quoted so a
// segment containing a comma (the array element separator) descends the
// intended key instead of splitting into two. Inside a quoted element a
// backslash and a double-quote must be backslash-escaped.
const pgPath = `{${path
.map((segment) => `"${segment.replace(/\\/g, "\\\\").replace(/"/g, '\\"')}"`)
.join(",")}}`;
const text = Drizzle.sql`(${jsonColumn} #>> ${pgPath})`;
if (valueType === "number") return Drizzle.sql`${text}::numeric`;
if (valueType === "boolean") return Drizzle.sql`${text}::boolean`;
return text;
}
// Build a quoted SQLite JSON path (`$."seg1"."seg2"`) so a segment
// containing a dot (the path separator) is treated as one key. Inside a
// quoted segment an embedded backslash or double-quote is backslash-escaped.
const jsonPath = `$${path
.map((segment) => `."${segment.replace(/\\/g, "\\\\").replace(/"/g, '\\"')}"`)
.join("")}`;
return Drizzle.sql`json_extract(${jsonColumn}, ${jsonPath})`;
}

// Escape LIKE wildcards (`%`, `_`) and the escape character itself so the
// value matches literally, matching the memory adapter's
// includes/startsWith/endsWith. Paired with an explicit `escape '\'` clause.
function escapeLikePattern(value: string): string {
return value.replace(/[\\%_]/g, (char) => `\\${char}`);
}

function jsonCompareSql(
operator: string,
expr: Drizzle.SQL,
value: JsonScalar,
): Drizzle.SQL {
switch (operator) {
case "=":
return Drizzle.eq(expr, value);
case "!=":
return Drizzle.ne(expr, value);
case ">":
return Drizzle.gt(expr, value);
case ">=":
return Drizzle.gte(expr, value);
case "<":
return Drizzle.lt(expr, value);
case "<=":
return Drizzle.lte(expr, value);
case "contains": {
const escaped = escapeLikePattern(String(value));
return Drizzle.sql`${expr} like ${`%${escaped}%`} escape '\\'`;
}
case "starts with": {
const escaped = escapeLikePattern(String(value));
return Drizzle.sql`${expr} like ${`${escaped}%`} escape '\\'`;
}
case "ends with": {
const escaped = escapeLikePattern(String(value));
return Drizzle.sql`${expr} like ${`%${escaped}`} escape '\\'`;
}
default:
throw new Error(`[FumaDB Drizzle] Unsupported JSON operator: ${operator}`);
}
}

function buildJsonFilter(
jsonColumn: ColumnType,
filter: JsonFilter,
): Drizzle.SQL | undefined {
if (filter.kind === "and") {
return Drizzle.and(
...filter.items.map((item) => buildJsonFilter(jsonColumn, item)),
);
}
if (filter.kind === "or") {
if (filter.items.length === 0) return Drizzle.sql`1 = 0`;
return Drizzle.or(
...filter.items.map((item) => buildJsonFilter(jsonColumn, item)),
);
Comment thread
greptile-apps[bot] marked this conversation as resolved.
}
const expr = jsonExtractSql(jsonColumn, filter.path, filter.valueType);
if (filter.kind === "array") {
return filter.operator === "in"
? Drizzle.inArray(expr, filter.values as unknown[])
: Drizzle.notInArray(expr, filter.values as unknown[]);
}
return jsonCompareSql(filter.operator, expr, filter.value);
}

function buildScopedConditions(
jsonColumn: ColumnType,
where: Condition | undefined,
filter: JsonFilter | undefined,
): Drizzle.SQL | undefined {
const parts: Drizzle.SQL[] = [];
if (where) {
const compiled = buildWhere(toDrizzleColumn, where);
if (compiled) parts.push(compiled);
}
if (filter) {
const compiled = buildJsonFilter(jsonColumn, filter);
if (compiled) parts.push(compiled);
}
if (parts.length === 0) return undefined;
return Drizzle.and(...parts);
}

return toORM({
tables: schema.tables,
async count(table, v) {
Expand Down Expand Up @@ -402,6 +523,182 @@ export function fromDrizzle(

await query;
},
async jsonCount(table, { column, where, filter }) {
const conditions = buildScopedConditions(toDrizzleColumn(column), where, filter);
return await db.$count(toDrizzle(table), conditions);
},
async jsonGroupCount(table, { column, where, filter, path, valueType }) {
const drizzleTable = toDrizzle(table);
const jsonColumn = toDrizzleColumn(column);
const groupExpr = jsonExtractSql(jsonColumn, path, valueType ?? "text");
const conditions = buildScopedConditions(jsonColumn, where, filter);
const rows = await db
.select({ value: groupExpr, count: Drizzle.sql<number>`count(*)` })
.from(drizzleTable)
.where(conditions)
.groupBy(groupExpr);
return rows.map((row) => ({
value: coerceJsonValue(row.value, valueType ?? "text"),
count: Number(row.count),
}));
},
async jsonTimeBuckets(table, { column, where, filter, path, bucketMs }) {
const drizzleTable = toDrizzle(table);
const jsonColumn = toDrizzleColumn(column);
const valueExpr = jsonExtractSql(jsonColumn, path, "number");
// `value - (value % bucket)` floors to the bucket start without relying
// on integer division (SQLite binds numeric params as REAL, so `/` would
// be float division). Matches `bucketFloor` for non-negative epochs.
const bucketExpr = Drizzle.sql`(${valueExpr} - (${valueExpr} % ${bucketMs}))`;
Comment thread
aryasaatvik marked this conversation as resolved.
const conditions = buildScopedConditions(jsonColumn, where, filter);
const rows = await db
.select({ bucket: bucketExpr, count: Drizzle.sql<number>`count(*)` })
.from(drizzleTable)
.where(conditions)
.groupBy(bucketExpr)
.orderBy(bucketExpr);
return rows.map((row) => ({ bucket: Number(row.bucket), count: Number(row.count) }));
},
async jsonStats(table, { column, where, filter, path, percentiles }) {
const drizzleTable = toDrizzle(table);
const jsonColumn = toDrizzleColumn(column);
const valueExpr = jsonExtractSql(jsonColumn, path, "number");
const conditions = buildScopedConditions(jsonColumn, where, filter);
const aggregate = await db
.select({
count: Drizzle.sql<number>`count(${valueExpr})`,
min: Drizzle.sql<number | null>`min(${valueExpr})`,
max: Drizzle.sql<number | null>`max(${valueExpr})`,
})
.from(drizzleTable)
.where(conditions);
const summary = aggregate[0];
Comment thread
aryasaatvik marked this conversation as resolved.
const count = Number(summary?.count ?? 0);
if (count === 0) return { count: 0, min: null, max: null, percentiles: [] };
const min = summary?.min == null ? null : Number(summary.min);
const max = summary?.max == null ? null : Number(summary.max);
const fractions = percentiles ?? [];
if (fractions.length === 0) return { count, min, max, percentiles: [] };

if (provider === "postgresql") {
const pctExpr = Drizzle.sql`percentile_cont(array[${Drizzle.sql.join(
fractions.map((fraction) => Drizzle.sql`${fraction}`),
Drizzle.sql`, `,
)}]) within group (order by ${valueExpr})`;
const pctRows = await db
.select({ values: Drizzle.sql<number[]>`${pctExpr}` })
.from(drizzleTable)
.where(conditions);
const values = pctRows[0]?.values ?? [];
return {
count,
min,
max,
percentiles: fractions.map((fraction, index) => ({
fraction,
value: Number(values[index]),
})),
};
}

// SQLite has no percentile_cont, so compute over the projected values.
const valueRows = await db
.select({ value: valueExpr })
.from(drizzleTable)
.where(conditions)
.orderBy(valueExpr);
const sorted = valueRows
.map((row) => row.value)
.filter((value): value is number | string => value != null)
.map((value) => Number(value))
.filter((value) => !Number.isNaN(value))
.sort((a, b) => a - b);
return { count, min, max, percentiles: computePercentiles(sorted, fractions) };
},
async jsonPage(table, { column, where, filter, orderBy, keyColumn, keyDirection, cursor, limit }) {
const drizzleTable = toDrizzle(table);
const jsonColumn = toDrizzleColumn(column);
const keyDrizzle = toDrizzleColumn(keyColumn);
const scoped = buildScopedConditions(jsonColumn, where, filter);

const orderExprs: Drizzle.SQL[] = orderBy.map((entry) => {
const expr = jsonExtractSql(jsonColumn, entry.path, entry.valueType);
// Mirror the memory adapter's null ordering (null first in asc, last in
// desc, see compareNullableAscending). SQLite defaults to this; Postgres
// defaults to the opposite, so make it explicit on both dialects.
return entry.direction === "desc"
? Drizzle.sql`${expr} desc nulls last`
: Drizzle.sql`${expr} asc nulls first`;
});
orderExprs.push(keyDirection === "desc" ? Drizzle.desc(keyDrizzle) : Drizzle.asc(keyDrizzle));

// Keyset boundary terms with SQL three-valued-logic null handling. A naive
// `expr > NULL` / `expr < NULL` is always unknown, which silently empties
// every page once the cursor row's sort value is null. These mirror
// compareNullableAscending so a nullable sort column paginates correctly.
const eqTerm = (expr: Drizzle.SQL, cv: unknown): Drizzle.SQL =>
cv == null ? Drizzle.isNull(expr) : Drizzle.eq(expr, cv);
const strictTerm = (
expr: Drizzle.SQL,
cv: unknown,
dir: "asc" | "desc",
): Drizzle.SQL | null => {
if (dir === "asc") {
// null is first: everything non-null is after a null cursor.
return cv == null ? Drizzle.isNotNull(expr) : Drizzle.gt(expr, cv);
}
// desc, null last: nothing is strictly after a null cursor on this field;
// and null rows fall after any non-null cursor value.
if (cv == null) return null;
return Drizzle.or(Drizzle.lt(expr, cv), Drizzle.isNull(expr)) ?? null;
};

let conditions = scoped;
if (cursor) {
const orTerms: Drizzle.SQL[] = [];
for (let boundary = 0; boundary <= orderBy.length; boundary += 1) {
const andTerms: Drizzle.SQL[] = [];
for (let prior = 0; prior < boundary; prior += 1) {
const entry = orderBy[prior]!;
andTerms.push(
eqTerm(
jsonExtractSql(jsonColumn, entry.path, entry.valueType),
cursor.values[prior] as unknown,
),
);
}
if (boundary < orderBy.length) {
const entry = orderBy[boundary]!;
const expr = jsonExtractSql(jsonColumn, entry.path, entry.valueType);
const strict = strictTerm(expr, cursor.values[boundary] as unknown, entry.direction);
if (strict === null) continue;
andTerms.push(strict);
} else {
andTerms.push(
keyDirection === "asc"
? Drizzle.gt(keyDrizzle, cursor.key)
: Drizzle.lt(keyDrizzle, cursor.key),
);
}
const combined = Drizzle.and(...andTerms);
if (combined) orTerms.push(combined);
}
const afterCursor = Drizzle.or(...orTerms);
conditions = scoped && afterCursor ? Drizzle.and(scoped, afterCursor) : (afterCursor ?? scoped);
}

const projection: Record<string, ColumnType> = {};
for (const tableColumn of Object.values(table.columns)) {
projection[tableColumn.ormName] = drizzleTable[tableColumn.names.drizzle];
}

return await db
.select(projection)
.from(drizzleTable)
.where(conditions)
.orderBy(...orderExprs)
.limit(limit);
},
async transaction(run) {
// Some SQLite-compatible engines (Cloudflare D1) reject interactive
// transactions — both raw BEGIN/COMMIT and the driver's `.transaction()`.
Expand Down
Loading
Loading