diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts index f984562eb3..c3a99fcec4 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts @@ -9,6 +9,7 @@ import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server"; import { logger } from "~/services/logger.server"; import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server"; import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server"; +import { runStore } from "~/v3/runStore.server"; // Pull the existing tags out of a buffer entry's serialised payload so // the buffer-path response can dedup against them, matching the @@ -84,14 +85,7 @@ export async function action({ request, params }: ActionFunctionArgs) { if (newTags.length === 0) { return json({ message: "No new tags to add" }, { status: 200 }); } - const updated = await prisma.taskRun.update({ - where: { - id: taskRun.id, - runtimeEnvironmentId: env.id, - }, - data: { runTags: { push: newTags } }, - select: { updatedAt: true }, - }); + const updated = await runStore.pushTags(taskRun.id, newTags, { runtimeEnvironmentId: env.id }, prisma); // Publish a run-changed record with the NEW tag set so tag feeds reindex // (no-op unless enabled). updatedAt is the read-your-writes watermark. publishChangeRecord({ diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts index ec5800c1f9..11074840a3 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.append.ts @@ -6,6 +6,7 @@ import { $replica, prisma } from "~/db.server"; import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; import { ServiceValidationError } from "~/v3/services/common.server"; +import { runStore } from "~/v3/runStore.server"; const ParamsSchema = z.object({ runId: z.string(), @@ -87,16 +88,7 @@ const { action } = createActionApiRoute( } if (!targetRun.realtimeStreams.includes(params.streamId)) { - await prisma.taskRun.update({ - where: { - id: targetRun.id, - }, - data: { - realtimeStreams: { - push: params.streamId, - }, - }, - }); + await runStore.pushRealtimeStream(targetRun.id, params.streamId, prisma); } const part = await request.text(); diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts index dd3d3bf31d..cdee9567b7 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts @@ -6,6 +6,7 @@ import { createActionApiRoute, createLoaderApiRoute, } from "~/services/routeBuilders/apiBuilder.server"; +import { runStore } from "~/v3/runStore.server"; const ParamsSchema = z.object({ runId: z.string(), @@ -86,12 +87,7 @@ const { action } = createActionApiRoute( } if (!target.realtimeStreams.includes(params.streamId)) { - await prisma.taskRun.update({ - where: { id: target.id }, - data: { - realtimeStreams: { push: params.streamId }, - }, - }); + await runStore.pushRealtimeStream(target.id, params.streamId, prisma); } const realtimeStream = getRealtimeStreamInstance( diff --git a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts index 493c5c1ce4..2bdf95eb9a 100644 --- a/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts +++ b/apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts @@ -10,6 +10,7 @@ import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server"; import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server"; +import { runStore } from "~/v3/runStore.server"; import type { TraceEventConcern, TriggerTaskRequest } from "../types"; // In-memory per-org mollifier-enabled check, shared with `evaluateGate` @@ -190,10 +191,10 @@ export class IdempotencyKeyConcern { }); // Update the existing run to remove the idempotency key - await this.prisma.taskRun.updateMany({ - where: { id: existingRun.id, idempotencyKey }, - data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, - }); + await runStore.clearIdempotencyKey( + { byId: { runId: existingRun.id, idempotencyKey } }, + this.prisma + ); return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; } @@ -207,10 +208,10 @@ export class IdempotencyKeyConcern { }); // Update the existing run to remove the idempotency key - await this.prisma.taskRun.updateMany({ - where: { id: existingRun.id, idempotencyKey }, - data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, - }); + await runStore.clearIdempotencyKey( + { byId: { runId: existingRun.id, idempotencyKey } }, + this.prisma + ); return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt }; } diff --git a/apps/webapp/app/services/metadata/updateMetadata.server.ts b/apps/webapp/app/services/metadata/updateMetadata.server.ts index 7b87034a30..2cc057f10f 100644 --- a/apps/webapp/app/services/metadata/updateMetadata.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadata.server.ts @@ -13,6 +13,7 @@ import { Effect, Schedule, Duration, Fiber } from "effect"; import { type RuntimeFiber } from "effect/Fiber"; import { setTimeout } from "timers/promises"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; +import type { RunStore } from "@internal/run-store"; const RUN_UPDATABLE_WINDOW_MS = 60 * 60 * 1000; // 1 hour @@ -24,6 +25,7 @@ type BufferedRunMetadataChangeOperation = { export type UpdateMetadataServiceOptions = { prisma: PrismaClientOrTransaction; + runStore: RunStore; flushIntervalMs?: number; flushEnabled?: boolean; flushLoggingEnabled?: boolean; @@ -49,6 +51,7 @@ export class UpdateMetadataService { private _bufferedOperations: Map = new Map(); private _flushFiber: RuntimeFiber | null = null; private readonly _prisma: PrismaClientOrTransaction; + private readonly _runStore: RunStore; private readonly flushIntervalMs: number; private readonly flushEnabled: boolean; private readonly flushLoggingEnabled: boolean; @@ -57,6 +60,7 @@ export class UpdateMetadataService { constructor(private readonly options: UpdateMetadataServiceOptions) { this._prisma = options.prisma; + this._runStore = options.runStore; this.flushIntervalMs = options.flushIntervalMs ?? 5000; this.flushEnabled = options.flushEnabled ?? true; this.flushLoggingEnabled = options.flushLoggingEnabled ?? true; @@ -260,17 +264,16 @@ export class UpdateMetadataService { const writeTime = new Date(); const result = yield* _( Effect.tryPromise(() => - this._prisma.taskRun.updateMany({ - where: { - id: runId, - metadataVersion: run.metadataVersion, - }, - data: { - metadata: newMetadataPacket.data, + this._runStore.updateMetadata( + runId, + { + metadata: newMetadataPacket.data!, metadataVersion: { increment: 1 }, updatedAt: writeTime, }, - }) + { expectedMetadataVersion: run.metadataVersion }, + this._prisma + ) ) ); @@ -469,20 +472,19 @@ export class UpdateMetadataService { // Update with optimistic locking; updatedAt stamped explicitly so the caller can // publish the exact committed watermark without a follow-up read. const writeTime = new Date(); - const result = await this._prisma.taskRun.updateMany({ - where: { - id: runId, - metadataVersion: run.metadataVersion, - }, - data: { - metadata: newMetadataPacket.data, + const result = await this._runStore.updateMetadata( + runId, + { + metadata: newMetadataPacket.data!, metadataType: newMetadataPacket.dataType, metadataVersion: { increment: 1, }, updatedAt: writeTime, }, - }); + { expectedMetadataVersion: run.metadataVersion }, + this._prisma + ); if (result.count === 0) { if (this.flushLoggingEnabled) { @@ -564,19 +566,19 @@ export class UpdateMetadataService { // Update the metadata without version check; updatedAt stamped explicitly so the // caller can publish the exact committed watermark. const writeTime = new Date(); - await this._prisma.taskRun.update({ - where: { - id: runId, - }, - data: { - metadata: metadataPacket?.data, + await this._runStore.updateMetadata( + runId, + { + metadata: metadataPacket?.data!, metadataType: metadataPacket?.dataType, metadataVersion: { increment: 1, }, updatedAt: writeTime, }, - }); + {}, + this._prisma + ); updatedAtMs = writeTime.getTime(); } diff --git a/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts b/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts index 9f1818e5ed..147df2bca2 100644 --- a/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts @@ -2,6 +2,7 @@ import { singleton } from "~/utils/singleton"; import { env } from "~/env.server"; import { UpdateMetadataService } from "./updateMetadata.server"; import { prisma } from "~/db.server"; +import { runStore } from "~/v3/runStore.server"; import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server"; export const updateMetadataService = singleton( @@ -9,6 +10,7 @@ export const updateMetadataService = singleton( () => new UpdateMetadataService({ prisma, + runStore, flushIntervalMs: env.BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS, flushEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_ENABLED === "1", flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1", diff --git a/apps/webapp/app/v3/runStore.server.ts b/apps/webapp/app/v3/runStore.server.ts new file mode 100644 index 0000000000..2993597ea1 --- /dev/null +++ b/apps/webapp/app/v3/runStore.server.ts @@ -0,0 +1,8 @@ +import { PostgresRunStore } from "@internal/run-store"; +import { $replica, prisma } from "~/db.server"; +import { singleton } from "~/utils/singleton"; + +export const runStore = singleton( + "PostgresRunStore", + () => new PostgresRunStore({ prisma, readOnlyPrisma: $replica }) +); diff --git a/apps/webapp/app/v3/services/baseService.server.ts b/apps/webapp/app/v3/services/baseService.server.ts index 06c8bd33ea..9dc3a33d08 100644 --- a/apps/webapp/app/v3/services/baseService.server.ts +++ b/apps/webapp/app/v3/services/baseService.server.ts @@ -1,8 +1,10 @@ import { Span, SpanKind } from "@opentelemetry/api"; +import type { RunStore } from "@internal/run-store"; import { $replica, PrismaClientOrTransaction, prisma } from "~/db.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server"; import { engine, RunEngine } from "../runEngine.server"; +import { runStore as defaultRunStore } from "../runStore.server"; import { ServiceValidationError } from "./common.server"; export { ServiceValidationError }; @@ -10,7 +12,8 @@ export { ServiceValidationError }; export abstract class BaseService { constructor( protected readonly _prisma: PrismaClientOrTransaction = prisma, - protected readonly _replica: PrismaClientOrTransaction = $replica + protected readonly _replica: PrismaClientOrTransaction = $replica, + protected readonly runStore: RunStore = defaultRunStore ) {} protected async traceWithEnv( diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index 22aa64b5e1..3303687159 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -408,10 +408,10 @@ export class BatchTriggerV3Service extends BaseService { // Expire the cached runs that are no longer valid if (expiredRunIds.size) { - await this._prisma.taskRun.updateMany({ - where: { friendlyId: { in: Array.from(expiredRunIds) } }, - data: { idempotencyKey: null }, - }); + await this.runStore.clearIdempotencyKey( + { byFriendlyIds: Array.from(expiredRunIds) }, + this._prisma + ); } return runs; diff --git a/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts b/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts index 43163fb4fb..707473167e 100644 --- a/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts +++ b/apps/webapp/app/v3/services/rescheduleTaskRun.server.ts @@ -17,15 +17,14 @@ export class RescheduleTaskRunService extends BaseService { throw new ServiceValidationError(`Invalid delay: ${body.delay}`); } - const updatedRun = await this._prisma.taskRun.update({ - where: { - id: taskRun.id, - }, - data: { + const updatedRun = await this.runStore.rescheduleRun( + taskRun.id, + { delayUntil: delay, queueTimestamp: delay, }, - }); + this._prisma + ); if (updatedRun.engine === "V1") { await EnqueueDelayedRunService.reschedule(taskRun.id, delay); diff --git a/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts index 8273d8c9d9..0aa44e9466 100644 --- a/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts +++ b/apps/webapp/app/v3/services/resetIdempotencyKey.server.ts @@ -9,17 +9,16 @@ export class ResetIdempotencyKeyService extends BaseService { taskIdentifier: string, authenticatedEnv: AuthenticatedEnvironment ): Promise<{ id: string }> { - const { count: pgCount } = await this._prisma.taskRun.updateMany({ - where: { - idempotencyKey, - taskIdentifier, - runtimeEnvironmentId: authenticatedEnv.id, - }, - data: { - idempotencyKey: null, - idempotencyKeyExpiresAt: null, + const { count: pgCount } = await this.runStore.clearIdempotencyKey( + { + byPredicate: { + idempotencyKey, + taskIdentifier, + runtimeEnvironmentId: authenticatedEnv.id, + }, }, - }); + this._prisma + ); // Buffer-side reset: the key may belong to a buffered run that // hasn't materialised yet. The PG updateMany above can't see it. @@ -75,17 +74,16 @@ export class ResetIdempotencyKeyService extends BaseService { // lookup against the writer when there's nothing to find; // otherwise the exact write the customer asked for (i.e., not // duplicative — without it the reset is silently lost). - const { count: handoffPgCount } = await this._prisma.taskRun.updateMany({ - where: { - idempotencyKey, - taskIdentifier, - runtimeEnvironmentId: authenticatedEnv.id, - }, - data: { - idempotencyKey: null, - idempotencyKeyExpiresAt: null, + const { count: handoffPgCount } = await this.runStore.clearIdempotencyKey( + { + byPredicate: { + idempotencyKey, + taskIdentifier, + runtimeEnvironmentId: authenticatedEnv.id, + }, }, - }); + this._prisma + ); if (handoffPgCount > 0) { logger.info( `Reset idempotency key via handoff re-check: ${idempotencyKey} for task: ${taskIdentifier} in env: ${authenticatedEnv.id}, affected ${handoffPgCount} run(s)` diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 64aa62d328..effdc7296d 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -61,6 +61,7 @@ "@internal/llm-model-catalog": "workspace:*", "@internal/redis": "workspace:*", "@internal/run-engine": "workspace:*", + "@internal/run-store": "workspace:*", "@internal/schedule-engine": "workspace:*", "@internal/tracing": "workspace:*", "@internal/tsql": "workspace:*", diff --git a/apps/webapp/test/updateMetadata.test.ts b/apps/webapp/test/updateMetadata.test.ts index 6fa2605272..b78a1a50a9 100644 --- a/apps/webapp/test/updateMetadata.test.ts +++ b/apps/webapp/test/updateMetadata.test.ts @@ -2,6 +2,7 @@ import { containerTest } from "@internal/testcontainers"; import { parsePacket } from "@trigger.dev/core/v3"; import { setTimeout } from "timers/promises"; import { describe } from "vitest"; +import { PostgresRunStore } from "@internal/run-store"; import { UpdateMetadataService } from "~/services/metadata/updateMetadata.server"; import { MetadataTooLargeError } from "~/utils/packets"; @@ -13,6 +14,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, @@ -112,6 +114,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, @@ -280,6 +283,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, @@ -395,6 +399,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, @@ -587,6 +592,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, @@ -785,6 +791,7 @@ describe("UpdateMetadataService.call", () => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100000, // Very long interval so we can control flushing flushEnabled: true, flushLoggingEnabled: true, @@ -893,6 +900,7 @@ describe("UpdateMetadataService.call", () => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, @@ -1004,6 +1012,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100000, // Very long interval so we can control flushing flushEnabled: true, flushLoggingEnabled: true, @@ -1134,6 +1143,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, @@ -1209,6 +1219,7 @@ describe("UpdateMetadataService.call", () => { async ({ prisma, redisOptions }) => { const service = new UpdateMetadataService({ prisma, + runStore: new PostgresRunStore({ prisma, readOnlyPrisma: prisma }), flushIntervalMs: 100, flushEnabled: true, flushLoggingEnabled: true, diff --git a/internal-packages/run-engine/package.json b/internal-packages/run-engine/package.json index 8b876a1aab..414452da3b 100644 --- a/internal-packages/run-engine/package.json +++ b/internal-packages/run-engine/package.json @@ -21,6 +21,7 @@ }, "dependencies": { "@internal/redis": "workspace:*", + "@internal/run-store": "workspace:*", "@trigger.dev/redis-worker": "workspace:*", "@internal/tracing": "workspace:*", "@trigger.dev/core": "workspace:*", diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index 84941560a5..8d1f4c9c1f 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -73,6 +73,7 @@ import { RaceSimulationSystem } from "./systems/raceSimulationSystem.js"; import { RunAttemptSystem } from "./systems/runAttemptSystem.js"; import { NoopPendingVersionRunIdLookup } from "./services/pendingVersionLookup.js"; import { SystemResources } from "./systems/systems.js"; +import { PostgresRunStore, RunStore } from "@internal/run-store"; import { TtlSystem } from "./systems/ttlSystem.js"; import { WaitpointSystem } from "./systems/waitpointSystem.js"; import { @@ -102,6 +103,7 @@ export class RunEngine { prisma: PrismaClient; readOnlyPrisma: PrismaReplicaClient; + runStore: RunStore; runQueue: RunQueue; eventBus: EventBus = new EventEmitter(); executionSnapshotSystem: ExecutionSnapshotSystem; @@ -123,6 +125,7 @@ export class RunEngine { this.logger = options.logger ?? new Logger("RunEngine", this.options.logLevel ?? "info"); this.prisma = options.prisma; this.readOnlyPrisma = options.readOnlyPrisma ?? this.prisma; + this.runStore = new PostgresRunStore({ prisma: this.prisma, readOnlyPrisma: this.readOnlyPrisma }); this.runLockRedis = createRedisClient( { ...options.runLock.redis, @@ -313,6 +316,7 @@ export class RunEngine { const resources: SystemResources = { prisma: this.prisma, readOnlyPrisma: this.readOnlyPrisma, + runStore: this.runStore, worker: this.worker, eventBus: this.eventBus, logger: this.logger, @@ -532,84 +536,85 @@ export class RunEngine { const error: TaskRunError = { type: "STRING_ERROR", raw: cancelReason }; try { - const taskRun = await prisma.taskRun.create({ - data: { - id, - engine: "V2", - status: "CANCELED", - friendlyId: snapshot.friendlyId, - runtimeEnvironmentId: snapshot.environment.id, - environmentType: snapshot.environment.type, - organizationId: snapshot.environment.organization.id, - projectId: snapshot.environment.project.id, - idempotencyKey: snapshot.idempotencyKey, - idempotencyKeyExpiresAt: snapshot.idempotencyKeyExpiresAt, - idempotencyKeyOptions: snapshot.idempotencyKeyOptions, - taskIdentifier: snapshot.taskIdentifier, - payload: snapshot.payload, - payloadType: snapshot.payloadType, - context: snapshot.context, - traceContext: snapshot.traceContext, - traceId: snapshot.traceId, - spanId: snapshot.spanId, - parentSpanId: snapshot.parentSpanId, - lockedToVersionId: snapshot.lockedToVersionId, - taskVersion: snapshot.taskVersion, - sdkVersion: snapshot.sdkVersion, - cliVersion: snapshot.cliVersion, - concurrencyKey: snapshot.concurrencyKey, - queue: snapshot.queue, - lockedQueueId: snapshot.lockedQueueId, - workerQueue: snapshot.workerQueue, - isTest: snapshot.isTest, - taskEventStore: snapshot.taskEventStore, - // Defensive: the snapshot comes from a cjson-encoded buffer - // payload, where empty Lua tables encode as `{}` not `[]`. If - // the drainer pops a buffered run with no tags, snapshot.tags - // will be an empty object, which Prisma misreads as a relation - // update op. Normalise to a real array (or undefined for the - // empty case). - runTags: Array.isArray(snapshot.tags) && snapshot.tags.length > 0 - ? snapshot.tags - : undefined, - oneTimeUseToken: snapshot.oneTimeUseToken, - parentTaskRunId: snapshot.parentTaskRunId, - rootTaskRunId: snapshot.rootTaskRunId, - replayedFromTaskRunFriendlyId: snapshot.replayedFromTaskRunFriendlyId, - batchId: snapshot.batch?.id, - resumeParentOnCompletion: snapshot.resumeParentOnCompletion, - depth: snapshot.depth, - seedMetadata: snapshot.seedMetadata, - seedMetadataType: snapshot.seedMetadataType, - metadata: snapshot.metadata, - metadataType: snapshot.metadataType, - machinePreset: snapshot.machine, - scheduleId: snapshot.scheduleId, - scheduleInstanceId: snapshot.scheduleInstanceId, - createdAt: snapshot.createdAt, - bulkActionGroupIds: snapshot.bulkActionId ? [snapshot.bulkActionId] : undefined, - planType: snapshot.planType, - realtimeStreamsVersion: snapshot.realtimeStreamsVersion, - streamBasinName: snapshot.streamBasinName, - annotations: snapshot.annotations, - completedAt: cancelledAt, - updatedAt: cancelledAt, - error: error as unknown as Prisma.InputJsonValue, - attemptNumber: 0, - executionSnapshots: { - create: { - engine: "V2", - executionStatus: "FINISHED", - description: "Run cancelled before materialisation", - runStatus: "CANCELED", - environmentId: snapshot.environment.id, - environmentType: snapshot.environment.type, - projectId: snapshot.environment.project.id, - organizationId: snapshot.environment.organization.id, - }, + const taskRun = await this.runStore.createCancelledRun( + { + data: { + id, + engine: "V2", + status: "CANCELED", + friendlyId: snapshot.friendlyId, + runtimeEnvironmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + organizationId: snapshot.environment.organization.id, + projectId: snapshot.environment.project.id, + idempotencyKey: snapshot.idempotencyKey, + idempotencyKeyExpiresAt: snapshot.idempotencyKeyExpiresAt, + idempotencyKeyOptions: snapshot.idempotencyKeyOptions, + taskIdentifier: snapshot.taskIdentifier, + payload: snapshot.payload, + payloadType: snapshot.payloadType, + context: snapshot.context, + traceContext: snapshot.traceContext, + traceId: snapshot.traceId, + spanId: snapshot.spanId, + parentSpanId: snapshot.parentSpanId, + lockedToVersionId: snapshot.lockedToVersionId, + taskVersion: snapshot.taskVersion, + sdkVersion: snapshot.sdkVersion, + cliVersion: snapshot.cliVersion, + concurrencyKey: snapshot.concurrencyKey, + queue: snapshot.queue, + lockedQueueId: snapshot.lockedQueueId, + workerQueue: snapshot.workerQueue, + isTest: snapshot.isTest, + taskEventStore: snapshot.taskEventStore, + // Defensive: the snapshot comes from a cjson-encoded buffer + // payload, where empty Lua tables encode as `{}` not `[]`. If + // the drainer pops a buffered run with no tags, snapshot.tags + // will be an empty object, which Prisma misreads as a relation + // update op. Normalise to a real array (or undefined for the + // empty case). + runTags: Array.isArray(snapshot.tags) && snapshot.tags.length > 0 + ? snapshot.tags + : undefined, + oneTimeUseToken: snapshot.oneTimeUseToken, + parentTaskRunId: snapshot.parentTaskRunId, + rootTaskRunId: snapshot.rootTaskRunId, + replayedFromTaskRunFriendlyId: snapshot.replayedFromTaskRunFriendlyId, + batchId: snapshot.batch?.id, + resumeParentOnCompletion: snapshot.resumeParentOnCompletion, + depth: snapshot.depth, + seedMetadata: snapshot.seedMetadata, + seedMetadataType: snapshot.seedMetadataType, + metadata: snapshot.metadata, + metadataType: snapshot.metadataType, + machinePreset: snapshot.machine, + scheduleId: snapshot.scheduleId, + scheduleInstanceId: snapshot.scheduleInstanceId, + createdAt: snapshot.createdAt, + bulkActionGroupIds: snapshot.bulkActionId ? [snapshot.bulkActionId] : undefined, + planType: snapshot.planType, + realtimeStreamsVersion: snapshot.realtimeStreamsVersion, + streamBasinName: snapshot.streamBasinName, + annotations: snapshot.annotations, + completedAt: cancelledAt, + updatedAt: cancelledAt, + error: error as unknown as Prisma.InputJsonValue, + attemptNumber: 0, + }, + snapshot: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run cancelled before materialisation", + runStatus: "CANCELED", + environmentId: snapshot.environment.id, + environmentType: snapshot.environment.type, + projectId: snapshot.environment.project.id, + organizationId: snapshot.environment.organization.id, }, }, - }); + prisma + ); if (emitRunCancelledEvent) { this.eventBus.emit("runCancelled", { @@ -826,112 +831,108 @@ export class RunEngine { let taskRun: TaskRun & { associatedWaitpoint: Waitpoint | null }; const taskRunId = RunId.fromFriendlyId(friendlyId); try { - taskRun = await prisma.taskRun.create({ - include: { - associatedWaitpoint: true, - }, - data: { - id: taskRunId, - engine: "V2", - status, - friendlyId, - runtimeEnvironmentId: environment.id, - environmentType: environment.type, - organizationId: environment.organization.id, - projectId: environment.project.id, - idempotencyKey, - idempotencyKeyExpiresAt, - idempotencyKeyOptions, - taskIdentifier, - payload, - payloadType, - context, - traceContext, - traceId, - spanId, - parentSpanId, - lockedToVersionId, - taskVersion, - sdkVersion, - cliVersion, - concurrencyKey, - queue, - lockedQueueId, - workerQueue, - region, - isTest, - delayUntil, - queuedAt, - maxAttempts, - taskEventStore, - priorityMs, - queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(), - ttl: resolvedTtl, - // Defensive: when the mollifier drainer replays a buffered - // snapshot whose payload was rewritten by a buffer-side Lua - // mutate (e.g. append_tags clears an empty list), cjson - // encodes an empty Lua table as `{}` rather than `[]`. JS - // parses that back as an empty object, and `{}.length` is - // undefined — the original `tags.length === 0` check would - // pass `{}` straight to Prisma's `String[]` column. Mirror - // the same Array.isArray guard that `createCancelledRun` - // uses for symmetry with the trigger replay path. - runTags: Array.isArray(tags) && tags.length > 0 ? tags : undefined, - oneTimeUseToken, - parentTaskRunId, - rootTaskRunId, - replayedFromTaskRunFriendlyId, - batchId: batch?.id, - resumeParentOnCompletion, - depth, - metadata, - metadataType, - seedMetadata, - seedMetadataType, - maxDurationInSeconds, - machinePreset: machine, - scheduleId, - scheduleInstanceId, - createdAt, - bulkActionGroupIds: bulkActionId ? [bulkActionId] : undefined, - planType, - realtimeStreamsVersion, - streamBasinName, - debounce: debounce - ? { - key: debounce.key, - delay: debounce.delay, - createdAt: new Date(), - } - : undefined, - annotations, - executionSnapshots: { - create: { - engine: "V2", - executionStatus: delayUntil ? "DELAYED" : "RUN_CREATED", - description: delayUntil ? "Run is delayed" : "Run was created", - runStatus: status, - environmentId: environment.id, - environmentType: environment.type, - projectId: environment.project.id, - organizationId: environment.organization.id, - workerId, - runnerId, - }, + taskRun = await this.runStore.createRun( + { + data: { + id: taskRunId, + engine: "V2", + status, + friendlyId, + runtimeEnvironmentId: environment.id, + environmentType: environment.type, + organizationId: environment.organization.id, + projectId: environment.project.id, + idempotencyKey, + idempotencyKeyExpiresAt, + idempotencyKeyOptions, + taskIdentifier, + payload, + payloadType, + context, + traceContext, + traceId, + spanId, + parentSpanId, + lockedToVersionId, + taskVersion, + sdkVersion, + cliVersion, + concurrencyKey, + queue, + lockedQueueId, + workerQueue, + region, + isTest, + delayUntil, + queuedAt, + maxAttempts, + taskEventStore, + priorityMs, + queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(), + ttl: resolvedTtl, + // Defensive: when the mollifier drainer replays a buffered + // snapshot whose payload was rewritten by a buffer-side Lua + // mutate (e.g. append_tags clears an empty list), cjson + // encodes an empty Lua table as `{}` rather than `[]`. JS + // parses that back as an empty object, and `{}.length` is + // undefined — the original `tags.length === 0` check would + // pass `{}` straight to Prisma's `String[]` column. Mirror + // the same Array.isArray guard that `createCancelledRun` + // uses for symmetry with the trigger replay path. + runTags: Array.isArray(tags) && tags.length > 0 ? tags : undefined, + oneTimeUseToken, + parentTaskRunId, + rootTaskRunId, + replayedFromTaskRunFriendlyId, + batchId: batch?.id, + resumeParentOnCompletion, + depth, + metadata, + metadataType, + seedMetadata, + seedMetadataType, + maxDurationInSeconds, + machinePreset: machine, + scheduleId, + scheduleInstanceId, + createdAt, + bulkActionGroupIds: bulkActionId ? [bulkActionId] : undefined, + planType, + realtimeStreamsVersion, + streamBasinName, + debounce: debounce + ? { + key: debounce.key, + delay: debounce.delay, + createdAt: new Date(), + } + : undefined, + annotations, + }, + snapshot: { + engine: "V2", + executionStatus: delayUntil ? "DELAYED" : "RUN_CREATED", + description: delayUntil ? "Run is delayed" : "Run was created", + runStatus: status, + environmentId: environment.id, + environmentType: environment.type, + projectId: environment.project.id, + organizationId: environment.organization.id, + workerId, + runnerId, }, // Only create waitpoint if parent is waiting for this run to complete // For standalone triggers (no waiting parent), waitpoint is created lazily if needed later associatedWaitpoint: resumeParentOnCompletion && parentTaskRunId - ? { - create: this.waitpointSystem.buildRunAssociatedWaitpoint({ - projectId: environment.project.id, - environmentId: environment.id, - }), - } + ? this.waitpointSystem.buildRunAssociatedWaitpoint({ + projectId: environment.project.id, + environmentId: environment.id, + }) : undefined, }, - }); + prisma + ); } catch (error) { if (error instanceof Prisma.PrismaClientKnownRequestError) { this.logger.debug("engine.trigger(): Prisma transaction error", { @@ -1176,42 +1177,40 @@ export class RunEngine { // Create the run in terminal SYSTEM_FAILURE status. // No execution snapshot is needed: this run never gets dequeued, executed, // or heartbeated, so nothing will call getLatestExecutionSnapshot on it. - const taskRun = await this.prisma.taskRun.create({ - include: { - associatedWaitpoint: true, - }, - data: { - id: taskRunId, - engine: "V2", - status: "SYSTEM_FAILURE", - friendlyId, - runtimeEnvironmentId: environment.id, - environmentType: environment.type, - organizationId: environment.organization.id, - projectId: environment.project.id, - taskIdentifier, - payload: payload ?? "", - payloadType: payloadType ?? "application/json", - context: {}, - traceContext: (traceContext ?? {}) as Record, - traceId: traceId ?? "", - spanId: spanId ?? "", - queue: queueOverride ?? `task/${taskIdentifier}`, - lockedQueueId: lockedQueueIdOverride, - isTest: false, - completedAt: new Date(), - error: error as unknown as Prisma.InputJsonObject, - parentTaskRunId, - rootTaskRunId, - depth: depth ?? 0, - batchId: batch?.id, - resumeParentOnCompletion, - taskEventStore, - associatedWaitpoint: waitpointData - ? { create: waitpointData } - : undefined, + const taskRun = await this.runStore.createFailedRun( + { + data: { + id: taskRunId, + engine: "V2", + status: "SYSTEM_FAILURE", + friendlyId, + runtimeEnvironmentId: environment.id, + environmentType: environment.type, + organizationId: environment.organization.id, + projectId: environment.project.id, + taskIdentifier, + payload: payload ?? "", + payloadType: payloadType ?? "application/json", + context: {}, + traceContext: (traceContext ?? {}) as Record, + traceId: traceId ?? "", + spanId: spanId ?? "", + queue: queueOverride ?? `task/${taskIdentifier}`, + lockedQueueId: lockedQueueIdOverride, + isTest: false, + completedAt: new Date(), + error: error as unknown as Prisma.InputJsonObject, + parentTaskRunId, + rootTaskRunId, + depth: depth ?? 0, + batchId: batch?.id, + resumeParentOnCompletion, + taskEventStore, + }, + associatedWaitpoint: waitpointData, }, - }); + this.prisma + ); span.setAttribute("runId", taskRun.id); diff --git a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts index 6c66591e28..b956a0f01a 100644 --- a/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/checkpointSystem.ts @@ -115,22 +115,20 @@ export class CheckpointSystem { } // Get the run and update the status - const run = await this.$.prisma.taskRun.update({ - where: { - id: runId, - }, - data: { - status: "WAITING_TO_RESUME", - }, - include: { - runtimeEnvironment: { - include: { - project: true, - organization: true, + const run = await this.$.runStore.suspendForCheckpoint( + runId, + { + include: { + runtimeEnvironment: { + include: { + project: true, + organization: true, + }, }, }, }, - }); + this.$.prisma + ); if (!run) { this.$.logger.error("Run not found for createCheckpoint", { @@ -294,26 +292,24 @@ export class CheckpointSystem { } // Get the run and update the status - const run = await this.$.prisma.taskRun.update({ - where: { - id: runId, - }, - data: { - status: "EXECUTING", - }, - select: { - id: true, - status: true, - attemptNumber: true, - organizationId: true, - runtimeEnvironmentId: true, - projectId: true, - updatedAt: true, - createdAt: true, - runTags: true, - batchId: true, + const run = await this.$.runStore.resumeFromCheckpoint( + runId, + { + select: { + id: true, + status: true, + attemptNumber: true, + organizationId: true, + runtimeEnvironmentId: true, + projectId: true, + updatedAt: true, + createdAt: true, + runTags: true, + batchId: true, + }, }, - }); + this.$.prisma + ); if (!run) { this.$.logger.error("Run not found for createCheckpoint", { diff --git a/internal-packages/run-engine/src/engine/systems/debounceSystem.ts b/internal-packages/run-engine/src/engine/systems/debounceSystem.ts index 0e59d1d69d..5b9d851d0f 100644 --- a/internal-packages/run-engine/src/engine/systems/debounceSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/debounceSystem.ts @@ -1160,13 +1160,7 @@ return 0 updatePayload.runTags = updateData.tags; } - const updatedRun = await prisma.taskRun.update({ - where: { id: runId }, - data: updatePayload, - include: { - associatedWaitpoint: true, - }, - }); + const updatedRun = await this.$.runStore.rewriteDebouncedRun(runId, updatePayload, prisma); return updatedRun; } diff --git a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts index 10c965741c..cff29a75a4 100644 --- a/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/delayedRunSystem.ts @@ -48,26 +48,19 @@ export class DelayedRunSystem { throw new ServiceValidationError("Cannot reschedule a run that is not delayed"); } - const updatedRun = await prisma.taskRun.update({ - where: { - id: runId, - }, - data: { + const updatedRun = await this.$.runStore.rescheduleRun( + runId, + { delayUntil: delayUntil, - executionSnapshots: { - create: { - engine: "V2", - executionStatus: "DELAYED", - description: "Delayed run was rescheduled to a future date", - runStatus: "DELAYED", - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - }, + snapshot: { + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, }, }, - }); + prisma + ); await this.$.worker.reschedule(`enqueueDelayedRun:${updatedRun.id}`, delayUntil); @@ -178,13 +171,13 @@ export class DelayedRunSystem { const queuedAt = new Date(); - const updatedRun = await this.$.prisma.taskRun.update({ - where: { id: runId }, - data: { - status: "PENDING", + const updatedRun = await this.$.runStore.enqueueDelayedRun( + runId, + { queuedAt, }, - }); + this.$.prisma + ); this.$.eventBus.emit("runEnqueuedAfterDelay", { time: new Date(), diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 7c811ebfdf..26ea7866a6 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -419,17 +419,14 @@ export class DequeueSystem { // Pre-generate snapshot ID so we can construct the result without an extra read const snapshotId = generateInternalId(); - const lockedTaskRun = await prisma.taskRun.update({ - where: { - id: runId, - }, - data: { + const lockedTaskRun = await this.$.runStore.lockRunToWorker( + runId, + { lockedAt, lockedById: result.task.id, lockedToVersionId: result.worker.id, lockedQueueId: result.queue.id, lockedRetryConfig: lockedRetryConfig ?? undefined, - status: "DEQUEUED", startedAt, baseCostInCents: this.options.machines.baseCostInCents, machinePreset: machinePreset.name, @@ -438,38 +435,27 @@ export class DequeueSystem { cliVersion: result.worker.cliVersion, maxDurationInSeconds, maxAttempts: maxAttempts ?? undefined, - executionSnapshots: { - create: { - id: snapshotId, - engine: "V2", - executionStatus: "PENDING_EXECUTING", - description: "Run was dequeued for execution", - // Map DEQUEUED -> PENDING for backwards compatibility with older runners - runStatus: "PENDING", - attemptNumber: result.run.attemptNumber ?? undefined, - previousSnapshotId: snapshot.id, - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - checkpointId: snapshot.checkpointId ?? undefined, - batchId: snapshot.batchId ?? undefined, - completedWaitpoints: { - connect: snapshot.completedWaitpoints.map((w) => ({ id: w.id })), - }, - completedWaitpointOrder: snapshot.completedWaitpoints - .filter((c) => c.index !== undefined) - .sort((a, b) => a.index! - b.index!) - .map((w) => w.id), - workerId, - runnerId, - }, + snapshot: { + id: snapshotId, + previousSnapshotId: snapshot.id, + attemptNumber: result.run.attemptNumber ?? undefined, + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, + checkpointId: snapshot.checkpointId ?? undefined, + batchId: snapshot.batchId ?? undefined, + completedWaitpointIds: snapshot.completedWaitpoints.map((w) => w.id), + completedWaitpointOrder: snapshot.completedWaitpoints + .filter((c) => c.index !== undefined) + .sort((a, b) => a.index! - b.index!) + .map((w) => w.id), + workerId, + runnerId, }, }, - include: { - runtimeEnvironment: true, - }, - }); + prisma + ); this.$.eventBus.emit("runLocked", { time: new Date(), @@ -741,30 +727,32 @@ export class DequeueSystem { }); //mark run as waiting for deploy - const run = await prisma.taskRun.update({ - where: { id: runId }, - data: { - status: "PENDING_VERSION", + const run = await this.$.runStore.parkPendingVersion( + runId, + { statusReason, }, - select: { - id: true, - status: true, - attemptNumber: true, - updatedAt: true, - createdAt: true, - runTags: true, - batchId: true, - runtimeEnvironment: { - select: { - id: true, - type: true, - projectId: true, - project: { select: { id: true, organizationId: true } }, + { + select: { + id: true, + status: true, + attemptNumber: true, + updatedAt: true, + createdAt: true, + runTags: true, + batchId: true, + runtimeEnvironment: { + select: { + id: true, + type: true, + projectId: true, + project: { select: { id: true, organizationId: true } }, + }, }, }, }, - }); + prisma + ); this.$.logger.debug("RunEngine.dequeueFromWorkerQueue(): Pending version", { runId, diff --git a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts index b46b857f02..59d72c4c46 100644 --- a/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/pendingVersionSystem.ts @@ -129,10 +129,7 @@ export class PendingVersionSystem { // Idempotency guard: only flips PENDING_VERSION → PENDING. If another // worker already promoted this run between our findMany and the // update, count is 0 and we skip the enqueue. - const updateResult = await tx.taskRun.updateMany({ - where: { id: run.id, status: "PENDING_VERSION" }, - data: { status: "PENDING" }, - }); + const updateResult = await this.$.runStore.promotePendingVersionRuns(run.id, tx); if (updateResult.count === 0) { return false; diff --git a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts index 02fd83a7a2..1aa1738f3b 100644 --- a/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts @@ -397,67 +397,67 @@ export class RunAttemptSystem { const result = await $transaction( prisma, async (tx) => { - const run = await tx.taskRun.update({ - where: { - id: taskRun.id, - }, - data: { - status: "EXECUTING", + const run = await this.$.runStore.startAttempt( + taskRun.id, + { attemptNumber: nextAttemptNumber, executedAt: taskRun.attemptNumber === null ? new Date() : undefined, isWarmStart: isWarmStart ?? false, }, - select: { - id: true, - createdAt: true, - updatedAt: true, - executedAt: true, - baseCostInCents: true, - projectId: true, - organizationId: true, - friendlyId: true, - lockedById: true, - lockedQueueId: true, - queue: true, - attemptNumber: true, - status: true, - ttl: true, - metadata: true, - metadataType: true, - machinePreset: true, - payload: true, - payloadType: true, - runTags: true, - isTest: true, - replayedFromTaskRunFriendlyId: true, - idempotencyKey: true, - idempotencyKeyOptions: true, - startedAt: true, - maxAttempts: true, - taskVersion: true, - maxDurationInSeconds: true, - usageDurationMs: true, - costInCents: true, - traceContext: true, - priorityMs: true, - batchId: true, - realtimeStreamsVersion: true, - runtimeEnvironment: { - select: { - id: true, - slug: true, - type: true, - branchName: true, - git: true, - organizationId: true, + { + select: { + id: true, + createdAt: true, + updatedAt: true, + executedAt: true, + baseCostInCents: true, + projectId: true, + organizationId: true, + friendlyId: true, + lockedById: true, + lockedQueueId: true, + queue: true, + attemptNumber: true, + status: true, + ttl: true, + metadata: true, + metadataType: true, + machinePreset: true, + payload: true, + payloadType: true, + runTags: true, + isTest: true, + replayedFromTaskRunFriendlyId: true, + idempotencyKey: true, + idempotencyKeyOptions: true, + startedAt: true, + maxAttempts: true, + taskVersion: true, + maxDurationInSeconds: true, + usageDurationMs: true, + costInCents: true, + traceContext: true, + priorityMs: true, + batchId: true, + realtimeStreamsVersion: true, + runtimeEnvironment: { + select: { + id: true, + slug: true, + type: true, + branchName: true, + git: true, + organizationId: true, + }, }, + parentTaskRunId: true, + rootTaskRunId: true, + workerQueue: true, + taskEventStore: true, }, - parentTaskRunId: true, - rootTaskRunId: true, - workerQueue: true, - taskEventStore: true, }, - }); + tx + ); const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(tx, { run, @@ -740,58 +740,58 @@ export class RunAttemptSystem { environmentType: latestSnapshot.environmentType, }); - const run = await prisma.taskRun.update({ - where: { id: runId }, - data: { - status: "COMPLETED_SUCCESSFULLY", + const run = await this.$.runStore.completeAttemptSuccess( + runId, + { completedAt, output: completion.output, outputType: completion.outputType, usageDurationMs: updatedUsage.usageDurationMs, costInCents: updatedUsage.costInCents, - executionSnapshots: { - create: { - executionStatus: "FINISHED", - description: "Task completed successfully", - runStatus: "COMPLETED_SUCCESSFULLY", - attemptNumber: latestSnapshot.attemptNumber, - environmentId: latestSnapshot.environmentId, - environmentType: latestSnapshot.environmentType, - projectId: latestSnapshot.projectId, - organizationId: latestSnapshot.organizationId, - workerId, - runnerId, - }, + snapshot: { + executionStatus: "FINISHED", + description: "Task completed successfully", + runStatus: "COMPLETED_SUCCESSFULLY", + attemptNumber: latestSnapshot.attemptNumber, + environmentId: latestSnapshot.environmentId, + environmentType: latestSnapshot.environmentType, + projectId: latestSnapshot.projectId, + organizationId: latestSnapshot.organizationId, + workerId, + runnerId, }, }, - select: { - id: true, - friendlyId: true, - status: true, - attemptNumber: true, - spanId: true, - updatedAt: true, - associatedWaitpoint: { - select: { - id: true, + { + select: { + id: true, + friendlyId: true, + status: true, + attemptNumber: true, + spanId: true, + updatedAt: true, + associatedWaitpoint: { + select: { + id: true, + }, }, - }, - project: { - select: { - organizationId: true, + project: { + select: { + organizationId: true, + }, }, + batchId: true, + createdAt: true, + completedAt: true, + taskEventStore: true, + parentTaskRunId: true, + usageDurationMs: true, + costInCents: true, + runtimeEnvironmentId: true, + projectId: true, }, - batchId: true, - createdAt: true, - completedAt: true, - taskEventStore: true, - parentTaskRunId: true, - usageDurationMs: true, - costInCents: true, - runtimeEnvironmentId: true, - projectId: true, }, - }); + prisma + ); const newSnapshot = await getLatestExecutionSnapshot(prisma, runId); await this.$.runQueue.acknowledgeMessage(run.project.organizationId, runId); @@ -997,25 +997,26 @@ export class RunAttemptSystem { environmentType: latestSnapshot.environmentType, }); - const run = await prisma.taskRun.update({ - where: { - id: runId, - }, - data: { + const run = await this.$.runStore.recordRetryOutcome( + runId, + { machinePreset: retryResult.machine, usageDurationMs: updatedUsage.usageDurationMs, costInCents: updatedUsage.costInCents, }, - include: { - runtimeEnvironment: { - include: { - project: true, - organization: true, - orgMember: true, + { + include: { + runtimeEnvironment: { + include: { + project: true, + organization: true, + orgMember: true, + }, }, }, }, - }); + this.$.prisma + ); const nextAttemptNumber = latestSnapshot.attemptNumber === null ? 1 : latestSnapshot.attemptNumber + 1; @@ -1250,19 +1251,17 @@ export class RunAttemptSystem { return { wasRequeued: false, ...result }; } - const requeuedRun = await prisma.taskRun.update({ - where: { - id: run.id, - }, - data: { - status: "PENDING", - }, - select: { - id: true, - status: true, - attemptNumber: true, + const requeuedRun = await this.$.runStore.requeueRun( + run.id, + { + select: { + id: true, + status: true, + attemptNumber: true, + }, }, - }); + prisma + ); const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run: requeuedRun, @@ -1338,14 +1337,7 @@ export class RunAttemptSystem { //already finished, do nothing if (latestSnapshot.executionStatus === "FINISHED") { if (bulkActionId) { - await prisma.taskRun.update({ - where: { id: runId }, - data: { - bulkActionGroupIds: { - push: bulkActionId, - }, - }, - }); + await this.$.runStore.recordBulkActionMembership(runId, bulkActionId, prisma); } return { alreadyFinished: true, @@ -1398,52 +1390,50 @@ export class RunAttemptSystem { }); } - const run = await prisma.taskRun.update({ - where: { id: runId }, - data: { - status: "CANCELED", + const run = await this.$.runStore.cancelRun( + runId, + { completedAt: finalizeRun ? completedAt ?? new Date() : completedAt, error, - bulkActionGroupIds: bulkActionId - ? { - push: bulkActionId, - } - : undefined, + ...(bulkActionId && { bulkActionId }), ...(usageUpdate && { usageDurationMs: usageUpdate.usageDurationMs, costInCents: usageUpdate.costInCents, }), }, - select: { - id: true, - friendlyId: true, - status: true, - attemptNumber: true, - spanId: true, - batchId: true, - createdAt: true, - completedAt: true, - taskEventStore: true, - parentTaskRunId: true, - delayUntil: true, - updatedAt: true, - runtimeEnvironment: { - select: { - organizationId: true, + { + select: { + id: true, + friendlyId: true, + status: true, + attemptNumber: true, + spanId: true, + batchId: true, + createdAt: true, + completedAt: true, + taskEventStore: true, + parentTaskRunId: true, + delayUntil: true, + updatedAt: true, + runtimeEnvironment: { + select: { + organizationId: true, + }, }, - }, - associatedWaitpoint: { - select: { - id: true, + associatedWaitpoint: { + select: { + id: true, + }, }, - }, - childRuns: { - select: { - id: true, + childRuns: { + select: { + id: true, + }, }, }, }, - }); + prisma + ); //if the run is delayed and hasn't started yet, we need to prevent it being added to the queue in future if (isInitialState(latestSnapshot.executionStatus) && run.delayUntil) { @@ -1612,51 +1602,52 @@ export class RunAttemptSystem { }); //run permanently failed - const run = await prisma.taskRun.update({ - where: { - id: runId, - }, - data: { + const run = await this.$.runStore.failRunPermanently( + runId, + { status, completedAt: failedAt, error: truncatedError, usageDurationMs: updatedUsage.usageDurationMs, costInCents: updatedUsage.costInCents, }, - select: { - id: true, - friendlyId: true, - status: true, - attemptNumber: true, - spanId: true, - batchId: true, - parentTaskRunId: true, - updatedAt: true, - usageDurationMs: true, - costInCents: true, - associatedWaitpoint: { - select: { - id: true, + { + select: { + id: true, + friendlyId: true, + status: true, + attemptNumber: true, + spanId: true, + batchId: true, + parentTaskRunId: true, + updatedAt: true, + usageDurationMs: true, + costInCents: true, + associatedWaitpoint: { + select: { + id: true, + }, }, - }, - runtimeEnvironment: { - select: { - id: true, - type: true, - organizationId: true, - project: { - select: { - id: true, - organizationId: true, + runtimeEnvironment: { + select: { + id: true, + type: true, + organizationId: true, + project: { + select: { + id: true, + organizationId: true, + }, }, }, }, + taskEventStore: true, + createdAt: true, + completedAt: true, }, - taskEventStore: true, - createdAt: true, - completedAt: true, }, - }); + this.$.prisma + ); const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, { run, diff --git a/internal-packages/run-engine/src/engine/systems/systems.ts b/internal-packages/run-engine/src/engine/systems/systems.ts index e21f95958d..1b2f1d64c5 100644 --- a/internal-packages/run-engine/src/engine/systems/systems.ts +++ b/internal-packages/run-engine/src/engine/systems/systems.ts @@ -1,4 +1,5 @@ import { Meter, Tracer } from "@internal/tracing"; +import { RunStore } from "@internal/run-store"; import { Logger } from "@trigger.dev/core/logger"; import { PrismaClient, PrismaReplicaClient } from "@trigger.dev/database"; import { RunQueue } from "../../run-queue/index.js"; @@ -11,6 +12,7 @@ import { RaceSimulationSystem } from "./raceSimulationSystem.js"; export type SystemResources = { prisma: PrismaClient; readOnlyPrisma: PrismaReplicaClient; + runStore: RunStore; worker: EngineWorker; eventBus: EventBus; logger: Logger; diff --git a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts index 8d078c8889..ebd1cbdd80 100644 --- a/internal-packages/run-engine/src/engine/systems/ttlSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/ttlSystem.ts @@ -1,6 +1,6 @@ import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic"; import { TaskRunError } from "@trigger.dev/core/v3/schemas"; -import { Prisma, PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database"; +import { PrismaClientOrTransaction, TaskRunStatus } from "@trigger.dev/database"; import { isExecuting } from "../statuses.js"; import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js"; import { SystemResources } from "./systems.js"; @@ -61,51 +61,51 @@ export class TtlSystem { raw: `Run expired because the TTL (${run.ttl}) was reached`, }; - const updatedRun = await prisma.taskRun.update({ - where: { id: runId }, - data: { - status: "EXPIRED", + const updatedRun = await this.$.runStore.expireRun( + runId, + { + error, completedAt: new Date(), expiredAt: new Date(), - error, - executionSnapshots: { - create: { - engine: "V2", - executionStatus: "FINISHED", - description: "Run was expired because the TTL was reached", - runStatus: "EXPIRED", - environmentId: snapshot.environmentId, - environmentType: snapshot.environmentType, - projectId: snapshot.projectId, - organizationId: snapshot.organizationId, - }, + snapshot: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run was expired because the TTL was reached", + runStatus: "EXPIRED", + environmentId: snapshot.environmentId, + environmentType: snapshot.environmentType, + projectId: snapshot.projectId, + organizationId: snapshot.organizationId, }, }, - select: { - id: true, - spanId: true, - ttl: true, - updatedAt: true, - associatedWaitpoint: { - select: { - id: true, + { + select: { + id: true, + spanId: true, + ttl: true, + updatedAt: true, + associatedWaitpoint: { + select: { + id: true, + }, }, - }, - runtimeEnvironment: { - select: { - organizationId: true, - projectId: true, - id: true, + runtimeEnvironment: { + select: { + organizationId: true, + projectId: true, + id: true, + }, }, + createdAt: true, + completedAt: true, + taskEventStore: true, + parentTaskRunId: true, + expiredAt: true, + status: true, }, - createdAt: true, - completedAt: true, - taskEventStore: true, - parentTaskRunId: true, - expiredAt: true, - status: true, }, - }); + prisma + ); await this.$.runQueue.acknowledgeMessage( updatedRun.runtimeEnvironment.organizationId, @@ -228,15 +228,7 @@ export class TtlSystem { raw: "Run expired because the TTL was reached", }; - await this.$.prisma.$executeRaw` - UPDATE "TaskRun" - SET "status" = 'EXPIRED'::"TaskRunStatus", - "completedAt" = ${now}, - "expiredAt" = ${now}, - "updatedAt" = ${now}, - "error" = ${JSON.stringify(error)}::jsonb - WHERE "id" IN (${Prisma.join(runIdsToExpire)}) - `; + await this.$.runStore.expireRunsBatch(runIdsToExpire, { error, now }, this.$.prisma); // Process each run: enqueue waitpoint completion jobs and emit events await pMap( diff --git a/internal-packages/run-store/package.json b/internal-packages/run-store/package.json new file mode 100644 index 0000000000..096888c4e9 --- /dev/null +++ b/internal-packages/run-store/package.json @@ -0,0 +1,31 @@ +{ + "name": "@internal/run-store", + "private": true, + "version": "0.0.1", + "main": "./dist/src/index.js", + "types": "./dist/src/index.d.ts", + "type": "module", + "exports": { + ".": { + "@triggerdotdev/source": "./src/index.ts", + "import": "./dist/src/index.js", + "types": "./dist/src/index.d.ts", + "default": "./dist/src/index.js" + } + }, + "dependencies": { + "@trigger.dev/core": "workspace:*", + "@trigger.dev/database": "workspace:*" + }, + "devDependencies": { + "@internal/testcontainers": "workspace:*", + "rimraf": "6.0.1" + }, + "scripts": { + "clean": "rimraf dist", + "typecheck": "tsc --noEmit -p tsconfig.build.json", + "test": "vitest --sequence.concurrent=false --no-file-parallelism", + "build": "pnpm run clean && tsc -p tsconfig.build.json", + "dev": "tsc --watch -p tsconfig.build.json" + } +} diff --git a/internal-packages/run-store/src/NoopRunStore.ts b/internal-packages/run-store/src/NoopRunStore.ts new file mode 100644 index 0000000000..3b4fb0a36f --- /dev/null +++ b/internal-packages/run-store/src/NoopRunStore.ts @@ -0,0 +1,32 @@ +import type { RunStore } from "./types.js"; + +/** Test double: throws on any call. Inject into units that must not write runs. */ +export class NoopRunStore implements RunStore { + private fail(method: string): never { + throw new Error(`NoopRunStore.${method} called`); + } + createRun(): never { return this.fail("createRun"); } + createCancelledRun(): never { return this.fail("createCancelledRun"); } + createFailedRun(): never { return this.fail("createFailedRun"); } + startAttempt(): never { return this.fail("startAttempt"); } + completeAttemptSuccess(): never { return this.fail("completeAttemptSuccess"); } + recordRetryOutcome(): never { return this.fail("recordRetryOutcome"); } + requeueRun(): never { return this.fail("requeueRun"); } + recordBulkActionMembership(): never { return this.fail("recordBulkActionMembership"); } + cancelRun(): never { return this.fail("cancelRun"); } + failRunPermanently(): never { return this.fail("failRunPermanently"); } + expireRun(): never { return this.fail("expireRun"); } + expireRunsBatch(): never { return this.fail("expireRunsBatch"); } + lockRunToWorker(): never { return this.fail("lockRunToWorker"); } + parkPendingVersion(): never { return this.fail("parkPendingVersion"); } + promotePendingVersionRuns(): never { return this.fail("promotePendingVersionRuns"); } + suspendForCheckpoint(): never { return this.fail("suspendForCheckpoint"); } + resumeFromCheckpoint(): never { return this.fail("resumeFromCheckpoint"); } + rescheduleRun(): never { return this.fail("rescheduleRun"); } + enqueueDelayedRun(): never { return this.fail("enqueueDelayedRun"); } + rewriteDebouncedRun(): never { return this.fail("rewriteDebouncedRun"); } + updateMetadata(): never { return this.fail("updateMetadata"); } + clearIdempotencyKey(): never { return this.fail("clearIdempotencyKey"); } + pushTags(): never { return this.fail("pushTags"); } + pushRealtimeStream(): never { return this.fail("pushRealtimeStream"); } +} diff --git a/internal-packages/run-store/src/PostgresRunStore.test.ts b/internal-packages/run-store/src/PostgresRunStore.test.ts new file mode 100644 index 0000000000..f2fb2969e6 --- /dev/null +++ b/internal-packages/run-store/src/PostgresRunStore.test.ts @@ -0,0 +1,1530 @@ +import { postgresTest } from "@internal/testcontainers"; +import type { PrismaClient } from "@trigger.dev/database"; +import { describe, expect } from "vitest"; +import { PostgresRunStore } from "./PostgresRunStore.js"; +import type { CreateCancelledRunInput, CreateFailedRunInput, CreateRunInput } from "./types.js"; + +async function seedEnvironment(prisma: PrismaClient) { + const organization = await prisma.organization.create({ + data: { + title: "Test Organization", + slug: "test-organization", + }, + }); + + const project = await prisma.project.create({ + data: { + name: "Test Project", + slug: "test-project", + externalRef: "proj_1234", + organizationId: organization.id, + }, + }); + + const environment = await prisma.runtimeEnvironment.create({ + data: { + type: "DEVELOPMENT", + slug: "dev", + projectId: project.id, + organizationId: organization.id, + apiKey: "tr_dev_apikey", + pkApiKey: "pk_dev_apikey", + shortcode: "short_code", + }, + }); + + return { organization, project, environment }; +} + +function buildCreateRunInput(params: { + runId: string; + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; +}): CreateRunInput { + return { + data: { + id: params.runId, + engine: "V2", + status: "PENDING", + friendlyId: "run_friendly_1", + runtimeEnvironmentId: params.runtimeEnvironmentId, + environmentType: "DEVELOPMENT", + organizationId: params.organizationId, + projectId: params.projectId, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_1", + spanId: "span_1", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + }, + snapshot: { + engine: "V2", + executionStatus: "RUN_CREATED", + description: "Run was created", + runStatus: "PENDING", + environmentId: params.runtimeEnvironmentId, + environmentType: "DEVELOPMENT", + projectId: params.projectId, + organizationId: params.organizationId, + }, + }; +} + +describe("PostgresRunStore", () => { + postgresTest("createRun creates the run with one snapshot and no waitpoint", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ + prisma, + // The read-only client just needs to be a PrismaClient for these tests. + readOnlyPrisma: prisma, + }); + + const runId = "run_test_1"; + + const run = await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + expect(run.id).toBe(runId); + expect(run.status).toBe("PENDING"); + expect(run.associatedWaitpoint).toBeNull(); + + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId }, + }); + + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.executionStatus).toBe("RUN_CREATED"); + expect(snapshots[0]?.runStatus).toBe("PENDING"); + }); + + postgresTest( + "createCancelledRun creates a CANCELED run with one FINISHED/CANCELED execution snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ + prisma, + readOnlyPrisma: prisma, + }); + + const runId = "run_cancelled_1"; + const cancelledAt = new Date("2026-01-01T00:00:00.000Z"); + const error = { type: "STRING_ERROR", raw: "cancelled before dispatch" }; + + const input: CreateCancelledRunInput = { + data: { + id: runId, + engine: "V2", + status: "CANCELED", + friendlyId: "run_cancelled_friendly_1", + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_c1", + spanId: "span_c1", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + error: error as unknown as import("@trigger.dev/database").Prisma.InputJsonValue, + completedAt: cancelledAt, + updatedAt: cancelledAt, + attemptNumber: 0, + }, + snapshot: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run cancelled before materialisation", + runStatus: "CANCELED", + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }; + + const run = await store.createCancelledRun(input); + + expect(run.id).toBe(runId); + expect(run.status).toBe("CANCELED"); + expect(run.attemptNumber).toBe(0); + expect(run.completedAt).toEqual(cancelledAt); + + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId }, + }); + + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.executionStatus).toBe("FINISHED"); + expect(snapshots[0]?.runStatus).toBe("CANCELED"); + } + ); + + postgresTest( + "createFailedRun creates a SYSTEM_FAILURE run with no execution snapshot and null associatedWaitpoint when not provided", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ + prisma, + readOnlyPrisma: prisma, + }); + + const runId = "run_failed_1"; + const completedAt = new Date("2026-01-01T00:00:00.000Z"); + const error = { type: "STRING_ERROR", raw: "system failure" }; + + const input: CreateFailedRunInput = { + data: { + id: runId, + engine: "V2", + status: "SYSTEM_FAILURE", + friendlyId: "run_failed_friendly_1", + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "trace_f1", + spanId: "span_f1", + queue: "task/my-task", + isTest: false, + completedAt, + error: error as unknown as import("@trigger.dev/database").Prisma.InputJsonObject, + depth: 0, + taskEventStore: "taskEvent", + }, + }; + + const run = await store.createFailedRun(input); + + expect(run.id).toBe(runId); + expect(run.status).toBe("SYSTEM_FAILURE"); + expect(run.completedAt).toEqual(completedAt); + expect(run.associatedWaitpoint).toBeNull(); + + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId }, + }); + + expect(snapshots).toHaveLength(0); + } + ); + + postgresTest("startAttempt sets status to EXECUTING and records attempt fields", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_start_attempt_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + const executedAt = new Date("2026-03-01T10:00:00.000Z"); + + const run = await store.startAttempt( + runId, + { attemptNumber: 1, executedAt, isWarmStart: true }, + { select: { id: true, status: true, attemptNumber: true, executedAt: true, isWarmStart: true } } + ); + + expect(run.id).toBe(runId); + expect(run.status).toBe("EXECUTING"); + expect(run.attemptNumber).toBe(1); + expect(run.executedAt).toEqual(executedAt); + expect(run.isWarmStart).toBe(true); + }); + + postgresTest( + "completeAttemptSuccess sets status to COMPLETED_SUCCESSFULLY and creates a FINISHED snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_complete_success_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + const completedAt = new Date("2026-03-01T11:00:00.000Z"); + + const run = await store.completeAttemptSuccess( + runId, + { + completedAt, + output: '{"result":"ok"}', + outputType: "application/json", + usageDurationMs: 500, + costInCents: 10, + snapshot: { + executionStatus: "FINISHED", + description: "Task completed successfully", + runStatus: "COMPLETED_SUCCESSFULLY", + attemptNumber: 1, + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }, + { + select: { + id: true, + status: true, + completedAt: true, + output: true, + outputType: true, + usageDurationMs: true, + costInCents: true, + }, + } + ); + + expect(run.id).toBe(runId); + expect(run.status).toBe("COMPLETED_SUCCESSFULLY"); + expect(run.completedAt).toEqual(completedAt); + expect(run.output).toBe('{"result":"ok"}'); + expect(run.usageDurationMs).toBe(500); + expect(run.costInCents).toBe(10); + + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId, executionStatus: "FINISHED" }, + }); + + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.runStatus).toBe("COMPLETED_SUCCESSFULLY"); + } + ); + + postgresTest("recordRetryOutcome updates machine/usage/cost but leaves status unchanged", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_retry_outcome_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + // Set status to EXECUTING first so we know what to verify against + await store.startAttempt(runId, { attemptNumber: 1, isWarmStart: false }, { select: { id: true } }); + + const run = await store.recordRetryOutcome( + runId, + { machinePreset: "large-1x", usageDurationMs: 200, costInCents: 5 }, + { include: { runtimeEnvironment: true } } + ); + + // Status must be unchanged (EXECUTING — not PENDING, not CANCELED) + expect(run.status).toBe("EXECUTING"); + expect(run.machinePreset).toBe("large-1x"); + expect(run.usageDurationMs).toBe(200); + expect(run.costInCents).toBe(5); + }); + + postgresTest("requeueRun sets status to PENDING", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_requeue_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + await store.startAttempt(runId, { attemptNumber: 1, isWarmStart: false }, { select: { id: true } }); + + const run = await store.requeueRun(runId, { select: { id: true, status: true } }); + + expect(run.id).toBe(runId); + expect(run.status).toBe("PENDING"); + }); + + postgresTest("recordBulkActionMembership appends bulkActionId to existing array", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_bulk_action_1"; + + // Seed a run with an existing bulk action id + await prisma.taskRun.create({ + data: { + id: runId, + engine: "V2", + status: "CANCELED", + friendlyId: "run_bulk_action_friendly_1", + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_b1", + spanId: "span_b1", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + bulkActionGroupIds: ["existing-bulk-id"], + }, + }); + + await store.recordBulkActionMembership(runId, "new-bulk-id"); + + const updated = await prisma.taskRun.findUnique({ + where: { id: runId }, + select: { bulkActionGroupIds: true }, + }); + + expect(updated?.bulkActionGroupIds).toContain("existing-bulk-id"); + expect(updated?.bulkActionGroupIds).toContain("new-bulk-id"); + expect(updated?.bulkActionGroupIds).toHaveLength(2); + }); + + postgresTest( + "cancelRun sets status to CANCELED; without bulkActionId/usage those fields are untouched", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_cancel_no_bulk_1"; + + // Seed with a pre-existing bulk action id so we can verify it stays + await prisma.taskRun.create({ + data: { + id: runId, + engine: "V2", + status: "PENDING", + friendlyId: "run_cancel_no_bulk_friendly_1", + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_cn1", + spanId: "span_cn1", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + bulkActionGroupIds: ["x"], + }, + }); + + const cancelledAt = new Date("2026-04-01T00:00:00.000Z"); + const error = { type: "STRING_ERROR" as const, raw: "Canceled by user" }; + + const run = await store.cancelRun( + runId, + { completedAt: cancelledAt, error }, + { select: { id: true, status: true, completedAt: true, bulkActionGroupIds: true, usageDurationMs: true, costInCents: true } } + ); + + expect(run.id).toBe(runId); + expect(run.status).toBe("CANCELED"); + expect(run.completedAt).toEqual(cancelledAt); + // bulkActionGroupIds must be unchanged (still just ["x"]) + expect(run.bulkActionGroupIds).toEqual(["x"]); + // usage fields were not passed — should remain at default (0) + expect(run.usageDurationMs).toBe(0); + expect(run.costInCents).toBe(0); + } + ); + + postgresTest( + "cancelRun with bulkActionId and usage applies all optional fields", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_cancel_with_bulk_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + const cancelledAt = new Date("2026-04-01T01:00:00.000Z"); + const error = { type: "STRING_ERROR" as const, raw: "Canceled by user" }; + + const run = await store.cancelRun( + runId, + { completedAt: cancelledAt, error, bulkActionId: "bulk-abc", usageDurationMs: 300, costInCents: 7 }, + { select: { id: true, status: true, bulkActionGroupIds: true, usageDurationMs: true, costInCents: true } } + ); + + expect(run.status).toBe("CANCELED"); + expect(run.bulkActionGroupIds).toContain("bulk-abc"); + expect(run.usageDurationMs).toBe(300); + expect(run.costInCents).toBe(7); + } + ); + + postgresTest("failRunPermanently sets the passed status with completedAt/error/usage/cost", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_fail_permanently_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + const completedAt = new Date("2026-05-01T00:00:00.000Z"); + const error = { type: "STRING_ERROR" as const, raw: "permanent failure" }; + + const run = await store.failRunPermanently( + runId, + { status: "SYSTEM_FAILURE", completedAt, error, usageDurationMs: 150, costInCents: 3 }, + { + select: { + id: true, + status: true, + completedAt: true, + usageDurationMs: true, + costInCents: true, + }, + } + ); + + expect(run.id).toBe(runId); + expect(run.status).toBe("SYSTEM_FAILURE"); + expect(run.completedAt).toEqual(completedAt); + expect(run.usageDurationMs).toBe(150); + expect(run.costInCents).toBe(3); + }); + + postgresTest( + "expireRun sets status to EXPIRED with distinct completedAt/expiredAt, error set, and one FINISHED/EXPIRED snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_expire_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + const completedAt = new Date("2026-06-01T10:00:00.000Z"); + const expiredAt = new Date("2026-06-01T10:00:01.000Z"); + const error = { type: "STRING_ERROR" as const, raw: "Run expired because the TTL was reached" }; + + const run = await store.expireRun( + runId, + { + error, + completedAt, + expiredAt, + snapshot: { + engine: "V2", + executionStatus: "FINISHED", + description: "Run was expired because the TTL was reached", + runStatus: "EXPIRED", + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }, + { + select: { + id: true, + status: true, + completedAt: true, + expiredAt: true, + error: true, + }, + } + ); + + expect(run.id).toBe(runId); + expect(run.status).toBe("EXPIRED"); + expect(run.completedAt).toEqual(completedAt); + expect(run.expiredAt).toEqual(expiredAt); + // completedAt and expiredAt are distinct + expect(run.completedAt?.getTime()).not.toBe(run.expiredAt?.getTime()); + + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId, executionStatus: "FINISHED", runStatus: "EXPIRED" }, + }); + expect(snapshots).toHaveLength(1); + } + ); + + postgresTest( + "expireRunsBatch sets EXPIRED status with all four timestamps equal to now and error set; returns correct count", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const runId1 = "run_expire_batch_1"; + const runId2 = "run_expire_batch_2"; + + for (const id of [runId1, runId2]) { + await prisma.taskRun.create({ + data: { + id, + engine: "V2", + status: "PENDING", + friendlyId: `run_expire_batch_friendly_${id}`, + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: `trace_${id}`, + spanId: `span_${id}`, + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + }, + }); + } + + const now = new Date("2026-06-01T12:00:00.000Z"); + const error = { type: "STRING_ERROR" as const, raw: "Run expired because the TTL was reached" }; + + const count = await store.expireRunsBatch([runId1, runId2], { error, now }); + + expect(count).toBe(2); + + for (const id of [runId1, runId2]) { + const row = await prisma.taskRun.findUniqueOrThrow({ + where: { id }, + select: { status: true, completedAt: true, expiredAt: true, updatedAt: true }, + }); + expect(row.status).toBe("EXPIRED"); + expect(row.completedAt).toEqual(now); + expect(row.expiredAt).toEqual(now); + expect(row.updatedAt).toEqual(now); + } + } + ); + + postgresTest( + "expireRunsBatch returns 0 and writes nothing when runIds is empty", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + const runId = "run_expire_batch_empty"; + await prisma.taskRun.create({ + data: { + id: runId, + engine: "V2", + status: "PENDING", + friendlyId: "run_expire_batch_empty_friendly", + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_empty", + spanId: "span_empty", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + }, + }); + + const error = { type: "STRING_ERROR" as const, raw: "unused" }; + + // Must not throw (Prisma.join([]) would build an invalid `IN ()` clause). + const count = await store.expireRunsBatch([], { error, now: new Date() }); + + expect(count).toBe(0); + + const row = await prisma.taskRun.findUniqueOrThrow({ + where: { id: runId }, + select: { status: true, expiredAt: true }, + }); + expect(row.status).toBe("PENDING"); + expect(row.expiredAt).toBeNull(); + } + ); + + postgresTest( + "lockRunToWorker sets status to DEQUEUED with lock columns, includes runtimeEnvironment, and creates one PENDING_EXECUTING snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_lock_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + // Seed a background worker task to use as lockedById + const backgroundWorker = await prisma.backgroundWorker.create({ + data: { + friendlyId: "worker_friendly_1", + version: "20260601.1", + runtimeEnvironmentId: environment.id, + projectId: project.id, + contentHash: "abc123", + sdkVersion: "3.0.0", + cliVersion: "3.0.0", + metadata: {}, + }, + }); + + const workerTask = await prisma.backgroundWorkerTask.create({ + data: { + friendlyId: "task_friendly_1", + slug: "my-task", + filePath: "src/my-task.ts", + exportName: "myTask", + workerId: backgroundWorker.id, + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + + const queue = await prisma.taskQueue.create({ + data: { + friendlyId: "queue_friendly_1", + name: "task/my-task", + runtimeEnvironmentId: environment.id, + projectId: project.id, + }, + }); + + // Seed a prior snapshot to use as previousSnapshotId + const priorSnapshot = await prisma.taskRunExecutionSnapshot.create({ + data: { + engine: "V2", + executionStatus: "RUN_CREATED", + description: "prior", + runStatus: "PENDING", + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + runId, + }, + }); + + const lockedAt = new Date("2026-06-01T13:00:00.000Z"); + const startedAt = new Date("2026-06-01T13:00:01.000Z"); + const snapshotId = "snap_lock_1"; + + const locked = await store.lockRunToWorker(runId, { + lockedAt, + lockedById: workerTask.id, + lockedToVersionId: backgroundWorker.id, + lockedQueueId: queue.id, + startedAt, + baseCostInCents: 5, + machinePreset: "small-1x", + taskVersion: "20260601.1", + sdkVersion: "3.0.0", + cliVersion: "3.0.0", + maxDurationInSeconds: null, + snapshot: { + id: snapshotId, + previousSnapshotId: priorSnapshot.id, + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + completedWaitpointIds: [], + completedWaitpointOrder: [], + }, + }); + + expect(locked.status).toBe("DEQUEUED"); + expect(locked.lockedAt).toEqual(lockedAt); + expect(locked.lockedById).toBe(workerTask.id); + expect(locked.lockedToVersionId).toBe(backgroundWorker.id); + expect(locked.lockedQueueId).toBe(queue.id); + expect(locked.runtimeEnvironment).toBeDefined(); + expect(locked.runtimeEnvironment.id).toBe(environment.id); + + const snap = await prisma.taskRunExecutionSnapshot.findUnique({ where: { id: snapshotId } }); + expect(snap).not.toBeNull(); + expect(snap?.executionStatus).toBe("PENDING_EXECUTING"); + expect(snap?.runStatus).toBe("PENDING"); + } + ); + + postgresTest("parkPendingVersion sets status to PENDING_VERSION and stores statusReason", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_park_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + const run = await store.parkPendingVersion( + runId, + { statusReason: "No background worker found" }, + { select: { id: true, status: true, statusReason: true } } + ); + + expect(run.id).toBe(runId); + expect(run.status).toBe("PENDING_VERSION"); + expect(run.statusReason).toBe("No background worker found"); + }); + + postgresTest( + "promotePendingVersionRuns flips PENDING_VERSION to PENDING and returns count 1; run in another status returns count 0 and is unchanged", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + + // Seed a PENDING_VERSION run + const pendingVersionId = "run_promote_pv_1"; + await prisma.taskRun.create({ + data: { + id: pendingVersionId, + engine: "V2", + status: "PENDING_VERSION", + friendlyId: "run_promote_pv_friendly_1", + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_pv1", + spanId: "span_pv1", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + }, + }); + + const result = await store.promotePendingVersionRuns(pendingVersionId); + + expect(result.count).toBe(1); + + const promoted = await prisma.taskRun.findUniqueOrThrow({ where: { id: pendingVersionId }, select: { status: true } }); + expect(promoted.status).toBe("PENDING"); + + // Seed a run NOT in PENDING_VERSION (e.g. EXECUTING) + const executingId = "run_promote_exec_1"; + await prisma.taskRun.create({ + data: { + id: executingId, + engine: "V2", + status: "EXECUTING", + friendlyId: "run_promote_exec_friendly_1", + runtimeEnvironmentId: environment.id, + environmentType: "DEVELOPMENT", + organizationId: organization.id, + projectId: project.id, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: "trace_exec1", + spanId: "span_exec1", + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + }, + }); + + const result2 = await store.promotePendingVersionRuns(executingId); + + expect(result2.count).toBe(0); + + const unchanged = await prisma.taskRun.findUniqueOrThrow({ where: { id: executingId }, select: { status: true } }); + expect(unchanged.status).toBe("EXECUTING"); + } + ); + + postgresTest("suspendForCheckpoint sets status to WAITING_TO_RESUME", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_suspend_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + const run = await store.suspendForCheckpoint(runId, { + include: { runtimeEnvironment: true }, + }); + + expect(run.id).toBe(runId); + expect(run.status).toBe("WAITING_TO_RESUME"); + expect(run.runtimeEnvironment).toBeDefined(); + }); + + postgresTest("resumeFromCheckpoint sets status to EXECUTING", async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_resume_1"; + + await store.createRun( + buildCreateRunInput({ + runId, + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + }) + ); + + // Suspend first so we start from a realistic state + await store.suspendForCheckpoint(runId, { include: {} }); + + const run = await store.resumeFromCheckpoint(runId, { + select: { id: true, status: true }, + }); + + expect(run.id).toBe(runId); + expect(run.status).toBe("EXECUTING"); + }); +}); + +describe("PostgresRunStore — delayed / debounce / metadata / idempotency / array-append", () => { + // Helper: seed a run with idempotency key and expiry set + async function seedRunWithIdempotency( + prisma: PrismaClient, + params: { + runId: string; + friendlyId: string; + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; + taskIdentifier?: string; + idempotencyKey: string; + idempotencyKeyExpiresAt?: Date; + status?: string; + } + ) { + return prisma.taskRun.create({ + data: { + id: params.runId, + engine: "V2", + status: (params.status as any) ?? "PENDING", + friendlyId: params.friendlyId, + runtimeEnvironmentId: params.runtimeEnvironmentId, + environmentType: "DEVELOPMENT", + organizationId: params.organizationId, + projectId: params.projectId, + taskIdentifier: params.taskIdentifier ?? "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: `trace_${params.runId}`, + spanId: `span_${params.runId}`, + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + idempotencyKey: params.idempotencyKey, + idempotencyKeyExpiresAt: params.idempotencyKeyExpiresAt ?? null, + }, + }); + } + + // Helper: seed a plain run (no idempotency) + async function seedRun( + prisma: PrismaClient, + params: { + runId: string; + friendlyId: string; + organizationId: string; + projectId: string; + runtimeEnvironmentId: string; + status?: string; + runTags?: string[]; + realtimeStreams?: string[]; + metadata?: string; + metadataType?: string; + metadataVersion?: number; + } + ) { + return prisma.taskRun.create({ + data: { + id: params.runId, + engine: "V2", + status: (params.status as any) ?? "PENDING", + friendlyId: params.friendlyId, + runtimeEnvironmentId: params.runtimeEnvironmentId, + environmentType: "DEVELOPMENT", + organizationId: params.organizationId, + projectId: params.projectId, + taskIdentifier: "my-task", + payload: "{}", + payloadType: "application/json", + traceContext: {}, + traceId: `trace_${params.runId}`, + spanId: `span_${params.runId}`, + queue: "task/my-task", + isTest: false, + taskEventStore: "taskEvent", + depth: 0, + runTags: params.runTags ?? [], + realtimeStreams: params.realtimeStreams ?? [], + ...(params.metadata !== undefined && { metadata: params.metadata }), + ...(params.metadataType !== undefined && { metadataType: params.metadataType }), + ...(params.metadataVersion !== undefined && { metadataVersion: params.metadataVersion }), + }, + }); + } + + // --------------------------------------------------------------------------- + // rescheduleRun + // --------------------------------------------------------------------------- + + postgresTest( + "rescheduleRun with snapshot: writes delayUntil and creates a DELAYED snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_reschedule_snapshot_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_reschedule_snap_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + status: "DELAYED", + }); + + const delayUntil = new Date("2027-01-01T00:00:00.000Z"); + + const updated = await store.rescheduleRun(runId, { + delayUntil, + snapshot: { + environmentId: environment.id, + environmentType: "DEVELOPMENT", + projectId: project.id, + organizationId: organization.id, + }, + }); + + expect(updated.id).toBe(runId); + expect(updated.delayUntil).toEqual(delayUntil); + + const snapshots = await prisma.taskRunExecutionSnapshot.findMany({ + where: { runId, executionStatus: "DELAYED" }, + }); + expect(snapshots).toHaveLength(1); + expect(snapshots[0]?.runStatus).toBe("DELAYED"); + } + ); + + postgresTest( + "rescheduleRun with queueTimestamp and no snapshot: writes delayUntil + queueTimestamp, no new snapshot", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_reschedule_notimestamp_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_reschedule_notimestamp_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + status: "DELAYED", + }); + + const delayUntil = new Date("2027-02-01T00:00:00.000Z"); + const queueTimestamp = new Date("2027-02-01T00:00:00.000Z"); + + const updated = await store.rescheduleRun(runId, { delayUntil, queueTimestamp }); + + expect(updated.delayUntil).toEqual(delayUntil); + expect(updated.queueTimestamp).toEqual(queueTimestamp); + + const snapshotCount = await prisma.taskRunExecutionSnapshot.count({ where: { runId } }); + expect(snapshotCount).toBe(0); + } + ); + + // --------------------------------------------------------------------------- + // enqueueDelayedRun + // --------------------------------------------------------------------------- + + postgresTest( + "enqueueDelayedRun sets status to PENDING and writes queuedAt", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_enqueue_delayed_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_enqueue_delayed_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + status: "DELAYED", + }); + + const queuedAt = new Date("2026-06-17T10:00:00.000Z"); + const updated = await store.enqueueDelayedRun(runId, { queuedAt }); + + expect(updated.id).toBe(runId); + expect(updated.status).toBe("PENDING"); + expect(updated.queuedAt).toEqual(queuedAt); + } + ); + + // --------------------------------------------------------------------------- + // rewriteDebouncedRun + // --------------------------------------------------------------------------- + + postgresTest( + "rewriteDebouncedRun updates the requested columns and returns the run with associatedWaitpoint key", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_rewrite_debounced_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_rewrite_debounced_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + runTags: ["original-tag"], + }); + + const result = await store.rewriteDebouncedRun(runId, { + payload: '{"key":"newvalue"}', + payloadType: "application/json", + runTags: ["new-tag"], + }); + + expect(result.id).toBe(runId); + expect(result.payload).toBe('{"key":"newvalue"}'); + expect(result.runTags).toEqual(["new-tag"]); + // associatedWaitpoint key must exist in the result (even if null) + expect("associatedWaitpoint" in result).toBe(true); + } + ); + + // --------------------------------------------------------------------------- + // updateMetadata + // --------------------------------------------------------------------------- + + postgresTest( + "updateMetadata optimistic-lock: matching version writes metadata and returns count 1", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_update_meta_match_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_update_meta_match_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + metadata: '{"old":"data"}', + metadataType: "application/json", + metadataVersion: 1, + }); + + const updatedAt = new Date("2026-06-17T11:00:00.000Z"); + const result = await store.updateMetadata( + runId, + { + metadata: '{"new":"data"}', + metadataType: "application/json", + metadataVersion: { increment: 1 }, + updatedAt, + }, + { expectedMetadataVersion: 1 } + ); + + expect(result.count).toBe(1); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { metadata: true, metadataVersion: true, updatedAt: true }, + }); + expect(row?.metadata).toBe('{"new":"data"}'); + expect(row?.metadataVersion).toBe(2); + expect(row?.updatedAt).toEqual(updatedAt); + } + ); + + postgresTest( + "updateMetadata optimistic-lock: non-matching version returns count 0, row unchanged", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_update_meta_mismatch_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_update_meta_mismatch_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + metadata: '{"original":"data"}', + metadataType: "application/json", + metadataVersion: 5, + }); + + const result = await store.updateMetadata( + runId, + { + metadata: '{"new":"data"}', + metadataVersion: { increment: 1 }, + updatedAt: new Date(), + }, + { expectedMetadataVersion: 3 } // wrong version + ); + + expect(result.count).toBe(0); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { metadata: true, metadataVersion: true }, + }); + expect(row?.metadata).toBe('{"original":"data"}'); + expect(row?.metadataVersion).toBe(5); + } + ); + + postgresTest( + "updateMetadata direct (no expectedMetadataVersion): writes metadata and returns count 1", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_update_meta_direct_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_update_meta_direct_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + metadataVersion: 0, + }); + + const result = await store.updateMetadata( + runId, + { + metadata: '{"direct":"write"}', + metadataType: "application/json", + metadataVersion: { increment: 1 }, + updatedAt: new Date(), + }, + {} + ); + + expect(result.count).toBe(1); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { metadata: true, metadataVersion: true }, + }); + expect(row?.metadata).toBe('{"direct":"write"}'); + expect(row?.metadataVersion).toBe(1); + } + ); + + // --------------------------------------------------------------------------- + // clearIdempotencyKey + // --------------------------------------------------------------------------- + + postgresTest( + "clearIdempotencyKey byId: clears both idempotencyKey and idempotencyKeyExpiresAt when key matches", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_clear_idempotency_byid_1"; + const expiresAt = new Date("2028-01-01T00:00:00.000Z"); + + await seedRunWithIdempotency(prisma, { + runId, + friendlyId: "run_clear_byid_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-key-abc", + idempotencyKeyExpiresAt: expiresAt, + }); + + const result = await store.clearIdempotencyKey({ + byId: { runId, idempotencyKey: "idem-key-abc" }, + }); + + expect(result.count).toBe(1); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { idempotencyKey: true, idempotencyKeyExpiresAt: true }, + }); + expect(row?.idempotencyKey).toBeNull(); + expect(row?.idempotencyKeyExpiresAt).toBeNull(); + } + ); + + postgresTest( + "clearIdempotencyKey byId: returns count 0 when idempotencyKey does not match", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_clear_byid_mismatch_1"; + + await seedRunWithIdempotency(prisma, { + runId, + friendlyId: "run_clear_byid_mismatch_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + idempotencyKey: "idem-key-real", + }); + + const result = await store.clearIdempotencyKey({ + byId: { runId, idempotencyKey: "idem-key-wrong" }, + }); + + expect(result.count).toBe(0); + + // key still set + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { idempotencyKey: true }, + }); + expect(row?.idempotencyKey).toBe("idem-key-real"); + } + ); + + postgresTest( + "clearIdempotencyKey byPredicate: clears both columns when predicate matches", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_clear_predicate_1"; + const expiresAt = new Date("2028-06-01T00:00:00.000Z"); + + await seedRunWithIdempotency(prisma, { + runId, + friendlyId: "run_clear_predicate_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + taskIdentifier: "predicate-task", + idempotencyKey: "pred-idem-key", + idempotencyKeyExpiresAt: expiresAt, + }); + + const result = await store.clearIdempotencyKey({ + byPredicate: { + idempotencyKey: "pred-idem-key", + taskIdentifier: "predicate-task", + runtimeEnvironmentId: environment.id, + }, + }); + + expect(result.count).toBe(1); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { idempotencyKey: true, idempotencyKeyExpiresAt: true }, + }); + expect(row?.idempotencyKey).toBeNull(); + expect(row?.idempotencyKeyExpiresAt).toBeNull(); + } + ); + + postgresTest( + "clearIdempotencyKey byFriendlyIds: clears ONLY idempotencyKey, leaves idempotencyKeyExpiresAt intact", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_clear_friendly_1"; + const expiresAt = new Date("2028-07-01T00:00:00.000Z"); + + await seedRunWithIdempotency(prisma, { + runId, + friendlyId: "run_clear_friendly_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + idempotencyKey: "friendly-idem-key", + idempotencyKeyExpiresAt: expiresAt, + }); + + const result = await store.clearIdempotencyKey({ + byFriendlyIds: ["run_clear_friendly_friendly_1"], + }); + + expect(result.count).toBe(1); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { idempotencyKey: true, idempotencyKeyExpiresAt: true }, + }); + // idempotencyKey cleared + expect(row?.idempotencyKey).toBeNull(); + // idempotencyKeyExpiresAt NOT cleared (byFriendlyIds only clears the key) + expect(row?.idempotencyKeyExpiresAt).toEqual(expiresAt); + } + ); + + // --------------------------------------------------------------------------- + // pushTags + // --------------------------------------------------------------------------- + + postgresTest( + "pushTags appends to existing runTags (seed [a], push [b,c] → [a,b,c]) and returns updatedAt", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_push_tags_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_push_tags_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + runTags: ["a"], + }); + + const result = await store.pushTags(runId, ["b", "c"], { + runtimeEnvironmentId: environment.id, + }); + + expect(result.updatedAt).toBeInstanceOf(Date); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { runTags: true }, + }); + expect(row?.runTags).toEqual(["a", "b", "c"]); + } + ); + + // --------------------------------------------------------------------------- + // pushRealtimeStream + // --------------------------------------------------------------------------- + + postgresTest( + "pushRealtimeStream appends streamId to existing realtimeStreams", + async ({ prisma }) => { + const { organization, project, environment } = await seedEnvironment(prisma); + const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma }); + const runId = "run_push_stream_1"; + + await seedRun(prisma, { + runId, + friendlyId: "run_push_stream_friendly_1", + organizationId: organization.id, + projectId: project.id, + runtimeEnvironmentId: environment.id, + realtimeStreams: ["existing-stream"], + }); + + await store.pushRealtimeStream(runId, "new-stream"); + + const row = await prisma.taskRun.findFirst({ + where: { id: runId }, + select: { realtimeStreams: true }, + }); + expect(row?.realtimeStreams).toEqual(["existing-stream", "new-stream"]); + } + ); +}); diff --git a/internal-packages/run-store/src/PostgresRunStore.ts b/internal-packages/run-store/src/PostgresRunStore.ts new file mode 100644 index 0000000000..925a39425b --- /dev/null +++ b/internal-packages/run-store/src/PostgresRunStore.ts @@ -0,0 +1,620 @@ +import { Prisma } from "@trigger.dev/database"; +import type { + PrismaClient, + PrismaClientOrTransaction, + PrismaReplicaClient, + TaskRun, + TaskRunStatus, +} from "@trigger.dev/database"; +import type { + ClearIdempotencyKeyInput, + CompletionSnapshotInput, + CreateCancelledRunInput, + CreateFailedRunInput, + CreateRunInput, + ExpireSnapshotInput, + LockRunData, + RescheduleSnapshotInput, + RewriteDebouncedRunData, + RunStore, + TaskRunWithWaitpoint, +} from "./types.js"; +import type { TaskRunError } from "@trigger.dev/core/v3/schemas"; + +export type PostgresRunStoreOptions = { + prisma: PrismaClient; + readOnlyPrisma: PrismaReplicaClient; +}; + +/** + * Typed write layer for the task-run row, backed by the `taskRun` Prisma model. + * + * Each method is a verbatim relocation of the Prisma statement that lives at a + * specific call site today. Methods write through `(tx ?? this.prisma).taskRun` + * so callers can opt into an existing transaction. Errors (including unique + * constraint violations) propagate to the caller unchanged. + */ +export class PostgresRunStore implements RunStore { + private readonly prisma: PrismaClient; + private readonly readOnlyPrisma: PrismaReplicaClient; + + constructor(options: PostgresRunStoreOptions) { + this.prisma = options.prisma; + this.readOnlyPrisma = options.readOnlyPrisma; + } + + async createRun( + params: CreateRunInput, + tx?: PrismaClientOrTransaction + ): Promise { + const client = tx ?? this.prisma; + + return client.taskRun.create({ + include: { + associatedWaitpoint: true, + }, + data: { + ...params.data, + executionSnapshots: { + create: { + engine: params.snapshot.engine, + executionStatus: params.snapshot.executionStatus, + description: params.snapshot.description, + runStatus: params.snapshot.runStatus, + environmentId: params.snapshot.environmentId, + environmentType: params.snapshot.environmentType, + projectId: params.snapshot.projectId, + organizationId: params.snapshot.organizationId, + workerId: params.snapshot.workerId, + runnerId: params.snapshot.runnerId, + }, + }, + associatedWaitpoint: params.associatedWaitpoint + ? { + create: params.associatedWaitpoint, + } + : undefined, + }, + }); + } + + async createCancelledRun( + params: CreateCancelledRunInput, + tx?: PrismaClientOrTransaction + ): Promise { + const client = tx ?? this.prisma; + + return client.taskRun.create({ + data: { + ...params.data, + executionSnapshots: { + create: { + engine: params.snapshot.engine, + executionStatus: params.snapshot.executionStatus, + description: params.snapshot.description, + runStatus: params.snapshot.runStatus, + environmentId: params.snapshot.environmentId, + environmentType: params.snapshot.environmentType, + projectId: params.snapshot.projectId, + organizationId: params.snapshot.organizationId, + workerId: params.snapshot.workerId, + runnerId: params.snapshot.runnerId, + }, + }, + }, + }); + } + + async createFailedRun( + params: CreateFailedRunInput, + tx?: PrismaClientOrTransaction + ): Promise { + const client = tx ?? this.prisma; + + return client.taskRun.create({ + include: { + associatedWaitpoint: true, + }, + data: { + ...params.data, + associatedWaitpoint: params.associatedWaitpoint + ? { + create: params.associatedWaitpoint, + } + : undefined, + }, + }); + } + + async startAttempt( + runId: string, + data: { attemptNumber: number; executedAt?: Date; isWarmStart: boolean }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: "EXECUTING", + attemptNumber: data.attemptNumber, + executedAt: data.executedAt, + isWarmStart: data.isWarmStart, + }, + select: args.select, + }) as Promise>; + } + + async completeAttemptSuccess( + runId: string, + data: { + completedAt: Date; + output?: string; + outputType: string; + usageDurationMs: number; + costInCents: number; + snapshot: CompletionSnapshotInput; + }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: "COMPLETED_SUCCESSFULLY", + completedAt: data.completedAt, + output: data.output, + outputType: data.outputType, + usageDurationMs: data.usageDurationMs, + costInCents: data.costInCents, + executionSnapshots: { + create: { + executionStatus: data.snapshot.executionStatus, + description: data.snapshot.description, + runStatus: data.snapshot.runStatus, + attemptNumber: data.snapshot.attemptNumber, + environmentId: data.snapshot.environmentId, + environmentType: data.snapshot.environmentType, + projectId: data.snapshot.projectId, + organizationId: data.snapshot.organizationId, + workerId: data.snapshot.workerId, + runnerId: data.snapshot.runnerId, + }, + }, + }, + select: args.select, + }) as Promise>; + } + + async recordRetryOutcome( + runId: string, + data: { machinePreset?: string; usageDurationMs: number; costInCents: number }, + args: { include: I }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + machinePreset: data.machinePreset, + usageDurationMs: data.usageDurationMs, + costInCents: data.costInCents, + }, + include: args.include, + }) as Promise>; + } + + async requeueRun( + runId: string, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { status: "PENDING" }, + select: args.select, + }) as Promise>; + } + + async recordBulkActionMembership( + runId: string, + bulkActionId: string, + tx?: PrismaClientOrTransaction + ): Promise { + const prisma = tx ?? this.prisma; + + await prisma.taskRun.update({ + where: { id: runId }, + data: { + bulkActionGroupIds: { + push: bulkActionId, + }, + }, + }); + } + + async cancelRun( + runId: string, + data: { + completedAt?: Date; + error: TaskRunError; + bulkActionId?: string; + usageDurationMs?: number; + costInCents?: number; + }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: "CANCELED", + ...(data.completedAt !== undefined && { completedAt: data.completedAt }), + error: data.error as Prisma.InputJsonValue, + ...(data.bulkActionId !== undefined && { + bulkActionGroupIds: { push: data.bulkActionId }, + }), + ...(data.usageDurationMs !== undefined && { usageDurationMs: data.usageDurationMs }), + ...(data.costInCents !== undefined && { costInCents: data.costInCents }), + }, + select: args.select, + }) as Promise>; + } + + async failRunPermanently( + runId: string, + data: { + status: TaskRunStatus; + completedAt: Date; + error: TaskRunError; + usageDurationMs: number; + costInCents: number; + }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: data.status, + completedAt: data.completedAt, + error: data.error as Prisma.InputJsonValue, + usageDurationMs: data.usageDurationMs, + costInCents: data.costInCents, + }, + select: args.select, + }) as Promise>; + } + + async expireRun( + runId: string, + data: { error: TaskRunError; completedAt: Date; expiredAt: Date; snapshot: ExpireSnapshotInput }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: "EXPIRED", + completedAt: data.completedAt, + expiredAt: data.expiredAt, + error: data.error as Prisma.InputJsonValue, + executionSnapshots: { + create: { + engine: data.snapshot.engine, + executionStatus: data.snapshot.executionStatus, + description: data.snapshot.description, + runStatus: data.snapshot.runStatus, + environmentId: data.snapshot.environmentId, + environmentType: data.snapshot.environmentType, + projectId: data.snapshot.projectId, + organizationId: data.snapshot.organizationId, + }, + }, + }, + select: args.select, + }) as Promise>; + } + + async expireRunsBatch( + runIds: string[], + data: { error: TaskRunError; now: Date }, + tx?: PrismaClientOrTransaction + ): Promise { + const prisma = tx ?? this.prisma; + + // Nothing to do for an empty set, and Prisma.join would build an invalid + // `IN ()` clause, so short-circuit before touching the database. + if (runIds.length === 0) { + return 0; + } + + return prisma.$executeRaw` + UPDATE "TaskRun" + SET "status" = 'EXPIRED'::"TaskRunStatus", + "completedAt" = ${data.now}, + "expiredAt" = ${data.now}, + "updatedAt" = ${data.now}, + "error" = ${JSON.stringify(data.error)}::jsonb + WHERE "id" IN (${Prisma.join(runIds)}) + `; + } + + async lockRunToWorker( + runId: string, + data: LockRunData, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: "DEQUEUED", + lockedAt: data.lockedAt, + lockedById: data.lockedById, + lockedToVersionId: data.lockedToVersionId, + lockedQueueId: data.lockedQueueId, + lockedRetryConfig: data.lockedRetryConfig ?? undefined, + startedAt: data.startedAt, + baseCostInCents: data.baseCostInCents, + machinePreset: data.machinePreset, + taskVersion: data.taskVersion, + sdkVersion: data.sdkVersion ?? undefined, + cliVersion: data.cliVersion ?? undefined, + maxDurationInSeconds: data.maxDurationInSeconds ?? undefined, + maxAttempts: data.maxAttempts ?? undefined, + executionSnapshots: { + create: { + id: data.snapshot.id, + engine: "V2", + executionStatus: "PENDING_EXECUTING", + description: "Run was dequeued for execution", + runStatus: "PENDING", + attemptNumber: data.snapshot.attemptNumber ?? undefined, + previousSnapshotId: data.snapshot.previousSnapshotId, + environmentId: data.snapshot.environmentId, + environmentType: data.snapshot.environmentType, + projectId: data.snapshot.projectId, + organizationId: data.snapshot.organizationId, + checkpointId: data.snapshot.checkpointId ?? undefined, + batchId: data.snapshot.batchId ?? undefined, + completedWaitpoints: { + connect: data.snapshot.completedWaitpointIds.map((id) => ({ id })), + }, + completedWaitpointOrder: data.snapshot.completedWaitpointOrder, + workerId: data.snapshot.workerId ?? undefined, + runnerId: data.snapshot.runnerId ?? undefined, + }, + }, + }, + include: { + runtimeEnvironment: true, + }, + }); + } + + async parkPendingVersion( + runId: string, + data: { statusReason: string }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: "PENDING_VERSION", + statusReason: data.statusReason, + }, + select: args.select, + }) as Promise>; + } + + async promotePendingVersionRuns( + runId: string, + tx?: PrismaClientOrTransaction + ): Promise<{ count: number }> { + const prisma = tx ?? this.prisma; + + const result = await prisma.taskRun.updateMany({ + where: { id: runId, status: "PENDING_VERSION" }, + data: { status: "PENDING" }, + }); + + return { count: result.count }; + } + + async suspendForCheckpoint( + runId: string, + args: { include: I }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { status: "WAITING_TO_RESUME" }, + include: args.include, + }) as Promise>; + } + + async resumeFromCheckpoint( + runId: string, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { status: "EXECUTING" }, + select: args.select, + }) as Promise>; + } + + async rescheduleRun( + runId: string, + data: { delayUntil: Date; queueTimestamp?: Date; snapshot?: RescheduleSnapshotInput }, + tx?: PrismaClientOrTransaction + ): Promise { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + delayUntil: data.delayUntil, + ...(data.queueTimestamp !== undefined && { queueTimestamp: data.queueTimestamp }), + ...(data.snapshot && { + executionSnapshots: { + create: { + engine: "V2", + executionStatus: "DELAYED", + description: "Delayed run was rescheduled to a future date", + runStatus: "DELAYED", + environmentId: data.snapshot.environmentId, + environmentType: data.snapshot.environmentType, + projectId: data.snapshot.projectId, + organizationId: data.snapshot.organizationId, + }, + }, + }), + }, + }); + } + + async enqueueDelayedRun( + runId: string, + data: { queuedAt: Date }, + tx?: PrismaClientOrTransaction + ): Promise { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data: { + status: "PENDING", + queuedAt: data.queuedAt, + }, + }); + } + + async rewriteDebouncedRun( + runId: string, + data: RewriteDebouncedRunData, + tx?: PrismaClientOrTransaction + ): Promise { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId }, + data, + include: { + associatedWaitpoint: true, + }, + }); + } + + async updateMetadata( + runId: string, + data: { + metadata: string | null; + metadataType?: string; + metadataVersion: { increment: number }; + updatedAt: Date; + }, + options: { expectedMetadataVersion?: number }, + tx?: PrismaClientOrTransaction + ): Promise<{ count: number }> { + const prisma = tx ?? this.prisma; + + if (options.expectedMetadataVersion !== undefined) { + const result = await prisma.taskRun.updateMany({ + where: { id: runId, metadataVersion: options.expectedMetadataVersion }, + data, + }); + return { count: result.count }; + } + + await prisma.taskRun.update({ + where: { id: runId }, + data, + }); + return { count: 1 }; + } + + async clearIdempotencyKey( + params: ClearIdempotencyKeyInput, + tx?: PrismaClientOrTransaction + ): Promise<{ count: number }> { + const prisma = tx ?? this.prisma; + + if (params.byId) { + const result = await prisma.taskRun.updateMany({ + where: { id: params.byId.runId, idempotencyKey: params.byId.idempotencyKey }, + data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, + }); + return { count: result.count }; + } + + if (params.byPredicate) { + const result = await prisma.taskRun.updateMany({ + where: { + idempotencyKey: params.byPredicate.idempotencyKey, + taskIdentifier: params.byPredicate.taskIdentifier, + runtimeEnvironmentId: params.byPredicate.runtimeEnvironmentId, + }, + data: { idempotencyKey: null, idempotencyKeyExpiresAt: null }, + }); + return { count: result.count }; + } + + // byFriendlyIds — only clears idempotencyKey, not idempotencyKeyExpiresAt + const result = await prisma.taskRun.updateMany({ + where: { friendlyId: { in: params.byFriendlyIds } }, + data: { idempotencyKey: null }, + }); + return { count: result.count }; + } + + async pushTags( + runId: string, + tags: string[], + where: { runtimeEnvironmentId: string }, + tx?: PrismaClientOrTransaction + ): Promise<{ updatedAt: Date }> { + const prisma = tx ?? this.prisma; + + return prisma.taskRun.update({ + where: { id: runId, runtimeEnvironmentId: where.runtimeEnvironmentId }, + data: { runTags: { push: tags } }, + select: { updatedAt: true }, + }); + } + + async pushRealtimeStream( + runId: string, + streamId: string, + tx?: PrismaClientOrTransaction + ): Promise { + const prisma = tx ?? this.prisma; + + await prisma.taskRun.update({ + where: { id: runId }, + data: { realtimeStreams: { push: streamId } }, + }); + } +} diff --git a/internal-packages/run-store/src/index.ts b/internal-packages/run-store/src/index.ts new file mode 100644 index 0000000000..de9f7620d7 --- /dev/null +++ b/internal-packages/run-store/src/index.ts @@ -0,0 +1,3 @@ +export * from "./types.js"; +export * from "./PostgresRunStore.js"; +export * from "./NoopRunStore.js"; diff --git a/internal-packages/run-store/src/types.ts b/internal-packages/run-store/src/types.ts new file mode 100644 index 0000000000..e680f25463 --- /dev/null +++ b/internal-packages/run-store/src/types.ts @@ -0,0 +1,322 @@ +import type { + Prisma, + PrismaClientOrTransaction, + TaskRun, + TaskRunStatus, + TaskRunExecutionStatus, + RuntimeEnvironmentType, + Waitpoint, +} from "@trigger.dev/database"; +import type { TaskRunError } from "@trigger.dev/core/v3/schemas"; + +export type CreateRunSnapshotInput = { + engine: "V2"; + executionStatus: TaskRunExecutionStatus; + description: string; + runStatus: TaskRunStatus; + environmentId: string; + environmentType: RuntimeEnvironmentType; + projectId: string; + organizationId: string; + workerId?: string; + runnerId?: string; +}; + +export type CompletionSnapshotInput = { + executionStatus: "FINISHED"; + description: string; + runStatus: TaskRunStatus; + attemptNumber: number | null; + environmentId: string; + environmentType: RuntimeEnvironmentType; + projectId: string; + organizationId: string; + workerId?: string; + runnerId?: string; +}; + +export type ExpireSnapshotInput = { + engine: "V2"; + executionStatus: "FINISHED"; + description: string; + runStatus: TaskRunStatus; + environmentId: string; + environmentType: RuntimeEnvironmentType; + projectId: string; + organizationId: string; +}; + +export type RescheduleSnapshotInput = { + environmentId: string; + environmentType: RuntimeEnvironmentType; + projectId: string; + organizationId: string; +}; + +export type LockSnapshotInput = { + id: string; + previousSnapshotId: string; + attemptNumber?: number; + environmentId: string; + environmentType: RuntimeEnvironmentType; + projectId: string; + organizationId: string; + checkpointId?: string; + batchId?: string; + completedWaitpointIds: string[]; + completedWaitpointOrder: string[]; + workerId?: string; + runnerId?: string; +}; + +export type RunAssociatedWaitpointInput = { + id: string; + friendlyId: string; + type: "RUN"; + status: "PENDING"; + idempotencyKey: string; + userProvidedIdempotencyKey: boolean; + projectId: string; + environmentId: string; +}; + +// The ~60 trigger columns (the existing Prisma create `data` minus the nested relation creates). +export type CreateRunData = { + id: string; + engine: "V2"; + status: TaskRunStatus; + friendlyId: string; + runtimeEnvironmentId: string; + environmentType: RuntimeEnvironmentType; + organizationId: string; + projectId: string; + idempotencyKey?: string; + idempotencyKeyExpiresAt?: Date; + idempotencyKeyOptions?: Prisma.InputJsonValue; + taskIdentifier: string; + payload: string; + payloadType: string; + context?: Prisma.InputJsonValue; + traceContext: Prisma.InputJsonValue; + traceId: string; + spanId: string; + parentSpanId?: string; + lockedToVersionId?: string; + taskVersion?: string; + sdkVersion?: string; + cliVersion?: string; + concurrencyKey?: string; + queue: string; + lockedQueueId?: string; + workerQueue?: string; + region?: string | null; + isTest: boolean; + delayUntil?: Date; + queuedAt?: Date; + maxAttempts?: number; + taskEventStore?: string; + priorityMs?: number; + queueTimestamp?: Date; + ttl?: string; + runTags?: string[]; + oneTimeUseToken?: string; + parentTaskRunId?: string; + rootTaskRunId?: string; + replayedFromTaskRunFriendlyId?: string; + batchId?: string; + resumeParentOnCompletion?: boolean; + depth?: number; + metadata?: string; + metadataType?: string; + seedMetadata?: string; + seedMetadataType?: string; + maxDurationInSeconds?: number; + machinePreset?: string; + scheduleId?: string; + scheduleInstanceId?: string; + createdAt?: Date; + bulkActionGroupIds?: string[]; + planType?: string; + realtimeStreamsVersion?: string; + streamBasinName?: string | null; + debounce?: Prisma.InputJsonValue; + annotations?: Prisma.InputJsonValue; +}; + +export type CreateRunInput = { + data: CreateRunData; + snapshot: CreateRunSnapshotInput; + associatedWaitpoint?: RunAssociatedWaitpointInput; +}; + +export type CreateCancelledRunInput = { + data: CreateRunData & { error: Prisma.InputJsonValue; completedAt: Date; updatedAt: Date; attemptNumber: 0 }; + snapshot: CreateRunSnapshotInput; +}; + +export type CreateFailedRunData = { + id: string; + engine: "V2"; + status: "SYSTEM_FAILURE"; + friendlyId: string; + runtimeEnvironmentId: string; + environmentType: RuntimeEnvironmentType; + organizationId: string; + projectId: string; + taskIdentifier: string; + payload: string; + payloadType: string; + context: Prisma.InputJsonValue; + traceContext: Prisma.InputJsonValue; + traceId: string; + spanId: string; + queue: string; + lockedQueueId?: string; + isTest: false; + completedAt: Date; + error: Prisma.InputJsonObject; + parentTaskRunId?: string; + rootTaskRunId?: string; + depth: number; + batchId?: string; + resumeParentOnCompletion?: boolean; + taskEventStore?: string; +}; + +export type CreateFailedRunInput = { + data: CreateFailedRunData; + associatedWaitpoint?: RunAssociatedWaitpointInput; +}; + +export type LockRunData = { + lockedAt: Date; + lockedById: string; + lockedToVersionId: string; + lockedQueueId: string; + lockedRetryConfig?: Prisma.InputJsonValue; + startedAt: Date; + baseCostInCents: number; + machinePreset: string; + taskVersion: string; + sdkVersion: string | null; + cliVersion: string | null; + maxDurationInSeconds: number | null | undefined; + maxAttempts?: number; + snapshot: LockSnapshotInput; +}; + +export type RewriteDebouncedRunData = { + payload: string; + payloadType: string; + metadata?: string; + metadataType?: string; + maxAttempts?: number; + maxDurationInSeconds?: number; + machinePreset?: string; + runTags?: string[]; +}; + +export type ClearIdempotencyKeyInput = + | { byId: { runId: string; idempotencyKey: string }; byPredicate?: never; byFriendlyIds?: never } + | { byPredicate: { idempotencyKey: string; taskIdentifier: string; runtimeEnvironmentId: string }; byId?: never; byFriendlyIds?: never } + | { byFriendlyIds: string[]; byId?: never; byPredicate?: never }; + +export type TaskRunWithWaitpoint = TaskRun & { associatedWaitpoint: Waitpoint | null }; + +export interface RunStore { + // Create + createRun(params: CreateRunInput, tx?: PrismaClientOrTransaction): Promise; + createCancelledRun(params: CreateCancelledRunInput, tx?: PrismaClientOrTransaction): Promise; + createFailedRun(params: CreateFailedRunInput, tx?: PrismaClientOrTransaction): Promise; + + // Attempt lifecycle + startAttempt( + runId: string, + data: { attemptNumber: number; executedAt?: Date; isWarmStart: boolean }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + completeAttemptSuccess( + runId: string, + data: { completedAt: Date; output?: string; outputType: string; usageDurationMs: number; costInCents: number; snapshot: CompletionSnapshotInput }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + recordRetryOutcome( + runId: string, + data: { machinePreset?: string; usageDurationMs: number; costInCents: number }, + args: { include: I }, + tx?: PrismaClientOrTransaction + ): Promise>; + requeueRun( + runId: string, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + recordBulkActionMembership(runId: string, bulkActionId: string, tx?: PrismaClientOrTransaction): Promise; + cancelRun( + runId: string, + data: { completedAt?: Date; error: TaskRunError; bulkActionId?: string; usageDurationMs?: number; costInCents?: number }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + failRunPermanently( + runId: string, + data: { status: TaskRunStatus; completedAt: Date; error: TaskRunError; usageDurationMs: number; costInCents: number }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + + // Expiry + expireRun( + runId: string, + data: { error: TaskRunError; completedAt: Date; expiredAt: Date; snapshot: ExpireSnapshotInput }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + expireRunsBatch(runIds: string[], data: { error: TaskRunError; now: Date }, tx?: PrismaClientOrTransaction): Promise; + + // Dequeue / version / checkpoint + lockRunToWorker( + runId: string, + data: LockRunData, + tx?: PrismaClientOrTransaction + ): Promise>; + parkPendingVersion( + runId: string, + data: { statusReason: string }, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + promotePendingVersionRuns(runId: string, tx?: PrismaClientOrTransaction): Promise<{ count: number }>; + suspendForCheckpoint( + runId: string, + args: { include: I }, + tx?: PrismaClientOrTransaction + ): Promise>; + resumeFromCheckpoint( + runId: string, + args: { select: S }, + tx?: PrismaClientOrTransaction + ): Promise>; + + // Delayed / debounce + rescheduleRun( + runId: string, + data: { delayUntil: Date; queueTimestamp?: Date; snapshot?: RescheduleSnapshotInput }, + tx?: PrismaClientOrTransaction + ): Promise; + enqueueDelayedRun(runId: string, data: { queuedAt: Date }, tx?: PrismaClientOrTransaction): Promise; + rewriteDebouncedRun(runId: string, data: RewriteDebouncedRunData, tx?: PrismaClientOrTransaction): Promise; + + // Field touches + updateMetadata( + runId: string, + data: { metadata: string | null; metadataType?: string; metadataVersion: { increment: number }; updatedAt: Date }, + options: { expectedMetadataVersion?: number }, + tx?: PrismaClientOrTransaction + ): Promise<{ count: number }>; + clearIdempotencyKey(params: ClearIdempotencyKeyInput, tx?: PrismaClientOrTransaction): Promise<{ count: number }>; + pushTags(runId: string, tags: string[], where: { runtimeEnvironmentId: string }, tx?: PrismaClientOrTransaction): Promise<{ updatedAt: Date }>; + pushRealtimeStream(runId: string, streamId: string, tx?: PrismaClientOrTransaction): Promise; +} diff --git a/internal-packages/run-store/tsconfig.build.json b/internal-packages/run-store/tsconfig.build.json new file mode 100644 index 0000000000..89c87a3dc6 --- /dev/null +++ b/internal-packages/run-store/tsconfig.build.json @@ -0,0 +1,21 @@ +{ + "include": ["src/**/*.ts"], + "exclude": ["src/**/*.test.ts"], + "compilerOptions": { + "composite": true, + "target": "ES2020", + "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "outDir": "dist", + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "strict": true, + "declaration": true + } +} diff --git a/internal-packages/run-store/tsconfig.json b/internal-packages/run-store/tsconfig.json new file mode 100644 index 0000000000..af630abe1f --- /dev/null +++ b/internal-packages/run-store/tsconfig.json @@ -0,0 +1,8 @@ +{ + "references": [{ "path": "./tsconfig.src.json" }, { "path": "./tsconfig.test.json" }], + "compilerOptions": { + "moduleResolution": "Node16", + "module": "Node16", + "customConditions": ["@triggerdotdev/source"] + } +} diff --git a/internal-packages/run-store/tsconfig.src.json b/internal-packages/run-store/tsconfig.src.json new file mode 100644 index 0000000000..0df3d2d222 --- /dev/null +++ b/internal-packages/run-store/tsconfig.src.json @@ -0,0 +1,20 @@ +{ + "include": ["src/**/*.ts"], + "exclude": ["node_modules", "src/**/*.test.ts"], + "compilerOptions": { + "composite": true, + "target": "ES2020", + "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "strict": true, + "customConditions": ["@triggerdotdev/source"] + } +} diff --git a/internal-packages/run-store/tsconfig.test.json b/internal-packages/run-store/tsconfig.test.json new file mode 100644 index 0000000000..4c06c9f57b --- /dev/null +++ b/internal-packages/run-store/tsconfig.test.json @@ -0,0 +1,21 @@ +{ + "include": ["src/**/*.test.ts"], + "references": [{ "path": "./tsconfig.src.json" }], + "compilerOptions": { + "composite": true, + "target": "ES2020", + "lib": ["ES2020", "DOM", "DOM.Iterable", "DOM.AsyncIterable"], + "module": "Node16", + "moduleResolution": "Node16", + "moduleDetection": "force", + "verbatimModuleSyntax": false, + "types": ["vitest/globals"], + "esModuleInterop": true, + "forceConsistentCasingInFileNames": true, + "isolatedModules": true, + "preserveWatchOutput": true, + "skipLibCheck": true, + "strict": true, + "customConditions": ["@triggerdotdev/source"] + } +} diff --git a/internal-packages/run-store/vitest.config.mts b/internal-packages/run-store/vitest.config.mts new file mode 100644 index 0000000000..9ba46467ca --- /dev/null +++ b/internal-packages/run-store/vitest.config.mts @@ -0,0 +1,11 @@ +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + test: { + include: ["**/*.test.ts"], + globals: true, + isolate: true, + fileParallelism: false, + testTimeout: 120_000, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2c49a34702..27bc9dec87 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -361,6 +361,9 @@ importers: '@internal/run-engine': specifier: workspace:* version: link:../../internal-packages/run-engine + '@internal/run-store': + specifier: workspace:* + version: link:../../internal-packages/run-store '@internal/schedule-engine': specifier: workspace:* version: link:../../internal-packages/schedule-engine @@ -1308,6 +1311,9 @@ importers: '@internal/redis': specifier: workspace:* version: link:../redis + '@internal/run-store': + specifier: workspace:* + version: link:../run-store '@internal/tracing': specifier: workspace:* version: link:../tracing @@ -1352,6 +1358,22 @@ importers: specifier: 6.0.1 version: 6.0.1 + internal-packages/run-store: + dependencies: + '@trigger.dev/core': + specifier: workspace:* + version: link:../../packages/core + '@trigger.dev/database': + specifier: workspace:* + version: link:../database + devDependencies: + '@internal/testcontainers': + specifier: workspace:* + version: link:../testcontainers + rimraf: + specifier: 6.0.1 + version: 6.0.1 + internal-packages/schedule-engine: dependencies: '@internal/redis': @@ -13996,10 +14018,6 @@ packages: resolution: {integrity: sha512-Xa4Nw17FS9ApQFJ9umLiJS4orGjm7ZzwUrwamcGQuHSzDyth9boKDaycYdDcZDuqYATXw4HFXgaqWTctW/v1HA==} engines: {node: '>=16 || 14 >=14.18'} - path-scurry@2.0.0: - resolution: {integrity: sha512-ypGJsmGtdXUOeM5u93TyeIEfEhM6s+ljAhrk5vAvSx8uyY/02OvrZnA0YNGUrPXfpJMgI1ODd3nwz8Npx4O4cg==} - engines: {node: 20 || >=22} - path-scurry@2.0.2: resolution: {integrity: sha512-3O/iVVsJAPsOnpwWIeD+d6z/7PmqApyQePUtCndjatj/9I5LylHvt5qluFaBT3I5h3r1ejfR056c+FCv+NnNXg==} engines: {node: 18 || 20 || >=22} @@ -20770,7 +20788,7 @@ snapshots: '@isaacs/fs-minipass@4.0.1': dependencies: - minipass: 7.1.2 + minipass: 7.1.3 '@jridgewell/gen-mapping@0.3.13': dependencies: @@ -29428,7 +29446,7 @@ snapshots: fs-minipass@3.0.3: dependencies: - minipass: 7.1.2 + minipass: 7.1.3 fs.realpath@1.0.0: {} @@ -29568,7 +29586,7 @@ snapshots: foreground-child: 3.3.1 jackspeak: 3.4.3 minimatch: 9.0.5 - minipass: 7.1.2 + minipass: 7.1.3 package-json-from-dist: 1.0.0 path-scurry: 1.11.1 @@ -29577,9 +29595,9 @@ snapshots: foreground-child: 3.3.1 jackspeak: 4.2.3 minimatch: 10.2.5 - minipass: 7.1.2 + minipass: 7.1.3 package-json-from-dist: 1.0.0 - path-scurry: 2.0.0 + path-scurry: 2.0.2 glob@13.0.6: dependencies: @@ -31656,7 +31674,7 @@ snapshots: minizlib@3.1.0: dependencies: - minipass: 7.1.2 + minipass: 7.1.3 mixme@0.5.4: {} @@ -32332,12 +32350,7 @@ snapshots: path-scurry@1.11.1: dependencies: lru-cache: 10.4.3 - minipass: 7.1.2 - - path-scurry@2.0.0: - dependencies: - lru-cache: 11.2.4 - minipass: 7.1.2 + minipass: 7.1.3 path-scurry@2.0.2: dependencies: @@ -34242,7 +34255,7 @@ snapshots: ssri@10.0.5: dependencies: - minipass: 7.1.2 + minipass: 7.1.3 stack-generator@2.0.10: dependencies: @@ -34535,7 +34548,7 @@ snapshots: dependencies: glob: 11.1.0 mkdirp: 3.0.1 - path-scurry: 2.0.0 + path-scurry: 2.0.2 rimraf: 6.0.1 tshy: 3.0.2