Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a86635c
chore(run-store): scaffold @internal/run-store package
d-cs Jun 17, 2026
d4c1ff4
feat(run-store): add shared types and the RunStore interface
d-cs Jun 17, 2026
6d7abab
chore(run-store): use .js extensions in index re-exports for Node16 r…
d-cs Jun 17, 2026
010cf17
feat(run-store): add NoopRunStore test double
d-cs Jun 17, 2026
72a7462
feat(run-store): add PostgresRunStore with createRun
d-cs Jun 17, 2026
2e63223
feat(run-store): implement createCancelledRun and createFailedRun
d-cs Jun 17, 2026
f8456c1
feat(run-store): implement attempt lifecycle, cancel, and fail methods
d-cs Jun 17, 2026
f1ab6ae
feat(run-store): implement expiry, dequeue-lock, version, and checkpo…
d-cs Jun 17, 2026
f66bbad
feat(run-store): implement reschedule, debounce, metadata, idempotenc…
d-cs Jun 17, 2026
56ec707
feat(run-store): wire RunStore into run-engine SystemResources and we…
d-cs Jun 17, 2026
01bbc67
fix(run-store): align create-input types with the columns callers act…
d-cs Jun 17, 2026
de52aaa
refactor(run-engine): route run creation through RunStore
d-cs Jun 17, 2026
4826117
fix(run-store): allow optional machinePreset in recordRetryOutcome (l…
d-cs Jun 17, 2026
8650e40
refactor(run-engine): route attempt lifecycle, cancel, and fail write…
d-cs Jun 17, 2026
d530eb1
refactor(run-engine): route expiry and dequeue-lock writes through Ru…
d-cs Jun 17, 2026
4ec5aab
fix(run-store): allow undefined maxDurationInSeconds in lockRunToWork…
d-cs Jun 17, 2026
109c6a7
refactor(run-engine): route checkpoint, delayed, pending-version, and…
d-cs Jun 17, 2026
2fbdc5d
refactor(webapp): route run metadata, idempotency-key, and reschedule…
d-cs Jun 17, 2026
1a5ccdc
refactor(webapp): route tag and realtime-stream appends through RunStore
d-cs Jun 17, 2026
60565cf
fix(run-store): short-circuit expireRunsBatch on an empty runIds array
d-cs Jun 18, 2026
3c22b32
Merge main into run-store-write-adapter
d-cs Jun 18, 2026
76f3494
fix(webapp): inject runStore into UpdateMetadataService
d-cs Jun 18, 2026
c5226a2
feat(run-store): add TaskRun read methods to the run store
d-cs Jun 18, 2026
13d5364
feat(run-store): add full-row read overload to the run store
d-cs Jun 18, 2026
cfa9052
refactor(run-engine): route TaskRun reads through the run store
d-cs Jun 18, 2026
5b74b48
refactor(webapp): route service-layer TaskRun reads through the run s…
d-cs Jun 18, 2026
5683952
refactor(webapp): route presenter TaskRun reads through the run store
d-cs Jun 18, 2026
126b05f
refactor(webapp): route API and loader TaskRun reads through the run …
d-cs Jun 18, 2026
f59abe7
refactor(webapp): hydrate parent-model TaskRun reads through the run …
d-cs Jun 18, 2026
cb12430
chore(scripts): flag recover-stuck-runs raw TaskRun read for table cu…
d-cs Jun 18, 2026
ae57f25
chore(webapp): add server-changes entry for run-store read routing
d-cs Jun 18, 2026
fcc26d4
test(webapp): mock db.server in the new run-store read tests
d-cs Jun 18, 2026
789e107
test(webapp): drop the cancelTaskAttemptDependencies container test
d-cs Jun 18, 2026
e20e451
Merge branch 'main' into runstore-read-path
d-cs Jun 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/route-taskrun-reads-through-run-store.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Route Postgres task run reads through the run store so they can be retargeted to a different backing store without changing call sites.
14 changes: 9 additions & 5 deletions apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AuthenticatedEnvironment } from "@internal/run-engine";
import type { Prisma, PrismaClientOrTransaction, RuntimeEnvironment } from "@trigger.dev/database";
import { $replica, prisma } from "~/db.server";
import { runStore } from "~/v3/runStore.server";
import { logger } from "~/services/logger.server";
import { getUsername } from "~/utils/username";
import { sanitizeBranchName } from "@trigger.dev/core/v3/utils/gitBranch";
Expand Down Expand Up @@ -251,14 +252,17 @@ export async function findEnvironmentFromRun(
): Promise<EnvironmentFromRun | null> {
// The include (no select) already pulls every taskRun scalar, so runTags/batchId
// ride along for free — no extra query for the realtime publish to send a full record.
const taskRun = await (tx ?? $replica).taskRun.findFirst({
where: {
const taskRun = await runStore.findRun(
{
id: runId,
},
include: {
runtimeEnvironment: { include: authIncludeBase },
{
include: {
runtimeEnvironment: { include: authIncludeBase },
},
},
});
tx ?? $replica
);
if (!taskRun?.runtimeEnvironment) {
return null;
}
Expand Down
55 changes: 43 additions & 12 deletions apps/webapp/app/presenters/v3/ApiBatchResultsPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { BatchTaskRunExecutionResult } from "@trigger.dev/core/v3";
import { executionResultForTaskRun } from "~/models/taskRun.server";
import { executionResultForTaskRun, TaskRunWithAttempts } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { runStore } from "~/v3/runStore.server";
import { BasePresenter } from "./basePresenter.server";

export class ApiBatchResultsPresenter extends BasePresenter {
Expand All @@ -16,16 +17,8 @@ export class ApiBatchResultsPresenter extends BasePresenter {
},
include: {
items: {
include: {
taskRun: {
include: {
attempts: {
orderBy: {
createdAt: "desc",
},
},
},
},
select: {
taskRunId: true,
},
},
},
Expand All @@ -35,10 +28,48 @@ export class ApiBatchResultsPresenter extends BasePresenter {
return undefined;
}

const taskRunIds = batchRun.items.map((item) => item.taskRunId);

if (taskRunIds.length === 0) {
return {
id: batchRun.friendlyId,
items: [],
};
}

const taskRuns = await runStore.findRuns(
{
where: { id: { in: taskRunIds } },
select: {
id: true,
friendlyId: true,
status: true,
taskIdentifier: true,
attempts: {
select: {
status: true,
output: true,
outputType: true,
error: true,
},
orderBy: {
createdAt: "desc",
},
},
},
},
this._prisma
);

const runMap = new Map(taskRuns.map((run) => [run.id, run]));

return {
id: batchRun.friendlyId,
items: batchRun.items
.map((item) => executionResultForTaskRun(item.taskRun))
.map((item) => {
const run = runMap.get(item.taskRunId);
return run ? executionResultForTaskRun(run as TaskRunWithAttempts) : undefined;
})
.filter(Boolean),
};
});
Expand Down
56 changes: 30 additions & 26 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
type SyntheticRun,
} from "~/v3/mollifier/readFallback.server";
import { generatePresignedUrl } from "~/v3/objectStore.server";
import { runStore } from "~/v3/runStore.server";
import { tracer } from "~/v3/tracer.server";
import { startSpanWithEnv } from "~/v3/tracing.server";

Expand Down Expand Up @@ -110,38 +111,41 @@ export class ApiRetrieveRunPresenter {
friendlyId: string,
env: AuthenticatedEnvironment,
): Promise<FoundRun | null> {
const pgRow = await $replica.taskRun.findFirst({
where: {
const pgRow = await runStore.findRun(
{
friendlyId,
runtimeEnvironmentId: env.id,
},
select: {
...commonRunSelect,
traceId: true,
payload: true,
payloadType: true,
output: true,
outputType: true,
error: true,
attempts: {
select: {
id: true,
{
select: {
...commonRunSelect,
traceId: true,
payload: true,
payloadType: true,
output: true,
outputType: true,
error: true,
attempts: {
select: {
id: true,
},
},
attemptNumber: true,
engine: true,
taskEventStore: true,
parentTaskRun: {
select: commonRunSelect,
},
rootTaskRun: {
select: commonRunSelect,
},
childRuns: {
select: commonRunSelect,
},
},
attemptNumber: true,
engine: true,
taskEventStore: true,
parentTaskRun: {
select: commonRunSelect,
},
rootTaskRun: {
select: commonRunSelect,
},
childRuns: {
select: commonRunSelect,
},
},
});
$replica
);

if (pgRow) return { ...pgRow, isBuffered: false };

Expand Down
18 changes: 11 additions & 7 deletions apps/webapp/app/presenters/v3/ApiRunResultPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { TaskRunExecutionResult } from "@trigger.dev/core/v3";
import { executionResultForTaskRun } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { runStore } from "~/v3/runStore.server";
import { BasePresenter } from "./basePresenter.server";

export class ApiRunResultPresenter extends BasePresenter {
Expand All @@ -9,19 +10,22 @@ export class ApiRunResultPresenter extends BasePresenter {
env: AuthenticatedEnvironment
): Promise<TaskRunExecutionResult | undefined> {
return this.traceWithEnv("call", env, async (span) => {
const taskRun = await this._prisma.taskRun.findFirst({
where: {
const taskRun = await runStore.findRun(
{
friendlyId,
runtimeEnvironmentId: env.id,
},
include: {
attempts: {
orderBy: {
createdAt: "desc",
{
include: {
attempts: {
orderBy: {
createdAt: "desc",
},
},
},
},
});
this._prisma
);

if (!taskRun) {
return undefined;
Expand Down
8 changes: 5 additions & 3 deletions apps/webapp/app/presenters/v3/NextRunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { getTaskIdentifiers } from "~/models/task.server";
import { RunsRepository } from "~/services/runsRepository/runsRepository.server";
import { regionForDisplay } from "~/runEngine/concerns/workerQueueSplit.server";
import { machinePresetFromRun } from "~/v3/machinePresets.server";
import { runStore } from "~/v3/runStore.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { isCancellableRunStatus, isFinalRunStatus, isPendingRunStatus } from "~/v3/taskStatus";

Expand Down Expand Up @@ -206,11 +207,12 @@ export class NextRunListPresenter {
let hasAnyRuns = runs.length > 0;

if (!hasAnyRuns) {
const firstRun = await this.replica.taskRun.findFirst({
where: {
const firstRun = await runStore.findRun(
{
runtimeEnvironmentId: environmentId,
},
});
this.replica
);

if (firstRun) {
hasAnyRuns = true;
Expand Down
108 changes: 56 additions & 52 deletions apps/webapp/app/presenters/v3/RunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
import { isFinalRunStatus } from "~/v3/taskStatus";
import { env } from "~/env.server";
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
import { runStore } from "~/v3/runStore.server";

type Result = Awaited<ReturnType<RunPresenter["call"]>>;
export type Run = Result["run"];
Expand Down Expand Up @@ -62,57 +63,8 @@ export class RunPresenter {
// buffer view. `findFirstOrThrow` would log a `PrismaClient error`
// every tick of the page poll, masking real DB issues with synthetic
// not-found noise.
const run = await this.#prismaClient.taskRun.findFirst({
select: {
id: true,
createdAt: true,
taskEventStore: true,
taskIdentifier: true,
number: true,
traceId: true,
spanId: true,
parentSpanId: true,
friendlyId: true,
status: true,
startedAt: true,
completedAt: true,
logsDeletedAt: true,
annotations: true,
rootTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
parentTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
runtimeEnvironment: {
select: {
id: true,
type: true,
slug: true,
organizationId: true,
orgMember: {
select: {
user: {
select: {
id: true,
name: true,
displayName: true,
},
},
},
},
},
},
},
where: {
const run = await runStore.findRun(
{
friendlyId: runFriendlyId,
project: {
slug: projectSlug,
Expand All @@ -125,7 +77,59 @@ export class RunPresenter {
},
},
},
});
{
select: {
id: true,
createdAt: true,
taskEventStore: true,
taskIdentifier: true,
number: true,
traceId: true,
spanId: true,
parentSpanId: true,
friendlyId: true,
status: true,
startedAt: true,
completedAt: true,
logsDeletedAt: true,
annotations: true,
rootTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
parentTaskRun: {
select: {
friendlyId: true,
spanId: true,
createdAt: true,
},
},
runtimeEnvironment: {
select: {
id: true,
type: true,
slug: true,
organizationId: true,
orgMember: {
select: {
user: {
select: {
id: true,
name: true,
displayName: true,
},
},
},
},
},
},
},
},
this.#prismaClient
);

if (!run) {
throw new RunNotInPgError(runFriendlyId);
Expand Down
Loading
Loading