Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ request.Version = newVersion

```
submitqueue/ # repo root (Go module github.com/uber/submitqueue)
├── api/ # Wire contracts (proto) by domain/service
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/
│ └── stovepipe/{gateway,orchestrator}/{proto,protopb}/
├── api/ # Published wire contracts (cross-domain/external)
│ ├── submitqueue/{gateway,orchestrator}/{proto,protopb}/ # RPC (proto)
│ ├── stovepipe/{gateway,orchestrator}/{proto,protopb}/
│ └── runway/messagequeue/ # external queue contracts (proto + protojson)
├── platform/ # SHARED cross-domain packages — no domain deps
│ ├── errs/, metrics/, consumer/, http/
│ ├── base/ # SHARED entities (change/, messagequeue/, …)
Expand Down Expand Up @@ -68,7 +69,7 @@ submitqueue/ # repo root (Go module github.com/uber/submi

The `platform/` tree holds code reused across domains (infrastructure, shared entities, shared extension contracts). Each **domain** (`submitqueue/`, `stovepipe/`, …) keeps the same internal layout (`gateway/`, `orchestrator/`, `entity/`, `extension/`, `core/`); a domain's own `core/` (e.g. `submitqueue/core/`) holds infra shared only between that domain's services.

The `api/` tree holds all wire contracts (proto definitions and their committed generated stubs), organized by `domain/service`: `api/{domain}/{service}/proto/` for `.proto` sources and `api/{domain}/{service}/protopb/` for generated Go. A service package may hold multiple `.proto` files — its RPC contract (`{service}.proto`) alongside messagequeue contracts (queue payload schemas) — all generating into the same `protopb/`.
The `api/` tree holds **published** wire contracts — those depended on from outside the owning domain. RPC contracts live at `api/{domain}/{service}/` (`proto/` for `.proto` sources, `protopb/` for committed generated Go); a service package may hold multiple `.proto` files, all generating into the same `protopb/`. External message-queue contracts live at `api/{domain}/messagequeue/` (see Message Queue Contracts below). Internal queue contracts do **not** go here — they live under `{domain}/core/messagequeue/`.

### Platform notes

Expand Down Expand Up @@ -156,9 +157,10 @@ Paths follow the directory layout: shared packages live under `platform/` at the
- RPC Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller` (e.g. `.../submitqueue/gateway/controller`)
- Queue Controllers: `github.com/uber/submitqueue/{domain}/{service}/controller/{step}`
- Proto (generated): `github.com/uber/submitqueue/api/{domain}/{service}/protopb`
- Queue contracts: external `github.com/uber/submitqueue/api/{domain}/messagequeue`; internal `github.com/uber/submitqueue/{domain}/core/messagequeue`
- Domain entities: `github.com/uber/submitqueue/{domain}/entity` (e.g. `.../submitqueue/entity`)
- Domain extensions: `github.com/uber/submitqueue/{domain}/extension/{ext}[/{impl}]` (e.g. `.../submitqueue/extension/storage/mysql`)
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; domain pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey`
- Cross-domain consumer framework: `github.com/uber/submitqueue/platform/consumer`; internal pipeline topic keys: `github.com/uber/submitqueue/{domain}/core/topickey` (external queue topic keys live with their contract package, e.g. `api/runway/messagequeue`)
- Domain-internal infra: `github.com/uber/submitqueue/{domain}/core/{pkg}` (e.g. `.../submitqueue/core/request`)
- Shared entities: `github.com/uber/submitqueue/platform/base/{pkg}` (e.g. `.../platform/base/messagequeue`)
- Shared extensions: `github.com/uber/submitqueue/platform/extension/{ext}[/{impl}]` (e.g. `.../platform/extension/messagequeue/mysql`)
Expand All @@ -182,7 +184,13 @@ Generated proto files are committed. When modifying `.proto` files:
2. `make proto` (generates `*.pb.go`, `*_grpc.pb.go`, `*.pb.yarpc.go` into `api/{domain}/{service}/protopb/`)
3. Commit all generated files

To add a new `.proto` to a service (e.g. messagequeue contracts), drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.
To add a new `.proto` to a service, drop it in the service's `api/{domain}/{service}/proto/` dir, add it to that package's `srcs` in `api/{domain}/{service}/proto/BUILD.bazel` and its `exports_files`, then `make proto && make gazelle`. The codegen and `make proto` copy loop already handle multiple `.proto` files per package.

### Message Queue Contracts

Queue payloads are defined in **proto3** (`.proto` under `proto/`, generated Go in `protopb/` as the binding) and serialized as **protobuf JSON** (protojson) so the queue keeps storing self-describing JSON. Location follows audience: external/cross-domain contracts go under `api/{domain}/messagequeue/`; internal contracts (used only within the owning domain) go under `{domain}/core/messagequeue/`. Bazel `visibility` enforces the split — internal targets are domain-scoped, `api/` targets are public. See [doc/rfc/messagequeue-contract.md](doc/rfc/messagequeue-contract.md).

The message types are generated; the contract package adds only generic `protojson` glue — `Marshal(m)` / `Unmarshal[T](b, m)` — owning the wire conventions: `UseProtoNames` (snake_case fields), UPPER_SNAKE enum values, int64-as-string, unknown fields discarded on read (additive evolution). The topic key(s) carrying a message are declared on the message via the `topic_keys` proto option — a `google.protobuf.MessageOptions` extension defined in `api/base/messagequeue`. A topic key is a stable logical name, not a concrete wire topic; each implementer maps it to its backend's topic name, and a `TopicKeys(msg)` reflection helper reads the option back. It is contract metadata, not the hot path — publish/consume still routes on `consumer.TopicKey` + `TopicRegistry`. The contract package owns both halves: the proto payload and the `TopicKey` constants for its topic keys. A contract test round-trips the payloads and asserts every topic key is bound to exactly one message. Shared field types (`Change`, `Strategy`) are shared protos under `api/base/{change,mergestrategy}`. `api/runway/messagequeue/` is the reference example.

### Naming Conventions

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ GOIMPORTS_VERSION ?= v0.33.0
# (the out_dir convention in tool/proto/BUILD.bazel) and copied back here. A
# package may hold multiple .proto files (e.g. an RPC contract plus messagequeue
# contracts); all generated stubs land in the same protopb/ dir.
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator
PROTO_PACKAGES = api/base/change api/base/mergestrategy api/base/messagequeue api/runway/messagequeue api/submitqueue/gateway api/submitqueue/orchestrator api/stovepipe/gateway api/stovepipe/orchestrator

# Set REPO_ROOT for docker-compose
export REPO_ROOT := $(shell pwd)
Expand Down
31 changes: 31 additions & 0 deletions api/runway/messagequeue/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "messagequeue",
srcs = [
"merge.go",
"topics.go",
],
importpath = "github.com/uber/submitqueue/api/runway/messagequeue",
visibility = ["//visibility:public"],
deps = [
"//api/base/messagequeue/protopb", # keep
"//api/runway/messagequeue/protopb", # keep
"//platform/consumer",
"@org_golang_google_protobuf//encoding/protojson",
"@org_golang_google_protobuf//proto",
],
)

go_test(
name = "messagequeue_test",
srcs = ["merge_test.go"],
embed = [":messagequeue"],
deps = [
"//api/base/change/protopb",
"//api/base/mergestrategy/protopb",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_protobuf//proto",
],
)
26 changes: 26 additions & 0 deletions api/runway/messagequeue/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Runway message queue contract

The published, language-neutral contract for the merge queues Runway owns. A client — in any language — publishes a merge request and consumes the result without access to Runway's Go types or storage. See [the message queue contract RFC](../../../doc/rfc/messagequeue-contract.md) for the design.

Payloads are defined as proto3 messages in [`proto/merge.proto`](proto/merge.proto) and generated into [`protopb/`](protopb); the proto is the authority and a non-Go client compiles against it directly. On the wire, payloads are serialized as protobuf JSON (`protojson`), so the queue keeps storing self-describing JSON. The message types are generated, so the Go helpers in this package are just generic `protojson` glue — `Marshal(m)` and `Unmarshal[T](b, m)` — for Go callers; field names stay snake_case (`UseProtoNames`) and enums serialize as their UPPER_SNAKE value name.

The shared field types `Change` and `MergeStrategy` come from `api/base/change` and `api/base/mergestrategy`, imported by the contract.

## Topic keys

The binding between a topic key and its payload lives in each message's `topic_keys` option (defined in `api/base/messagequeue`); `TopicKeys` reads it back by reflection. A topic key is a stable logical name, not a concrete wire topic — each implementer maps the key to whatever topic name its broker/queue requires. Our Go wiring maps it via `consumer.TopicRegistry`.

| Message | Direction | Topic keys |
|---|---|---|
| `MergeRequest` | client → Runway | `merge-conflict-check`, `merge` |
| `MergeResult` | Runway → client | `merge-conflict-check-signal`, `merge-signal` |

One message serves a queue pair because a merge-conflict check is a dry run of a merge: Runway applies the same ordered steps onto the same target branch, and the topic key the request arrives on decides whether it commits the result and reports the produced revisions. A request on `merge-conflict-check` is a dry run; a request on `merge` commits.

## Result shape

`MergeResult.outcome` is an `Outcome` enum (`OUTCOME_UNSPECIFIED`/`SUCCEEDED`/`FAILED`): `SUCCEEDED` means mergeable (check) or merged (commit), `FAILED` a conflict or a failed apply; `reason` carries the explanation when `FAILED`. Per-step detail is in `steps` (request order): each `StepResult.outputs` is the list of `StepOutput`s the step produced on the target branch, **in application order** (the order they were created). A committing merge populates `outputs`; a dry-run check, an already-present change, or a failed step leaves them empty. `StepOutput.id` is the VCS-neutral revision identifier (git SHA, Mercurial hash, Subversion revision, Perforce changelist, …), with room to grow (author, timestamp, …).

## Evolution

Contract changes are additive-only: add new fields; never remove, rename, repurpose, or retype an existing field, and never reuse a field number. protojson ignores unknown fields on read and omits zero-valued fields on write, so a new optional field is backward-compatible in both directions.
98 changes: 98 additions & 0 deletions api/runway/messagequeue/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2025 Uber Technologies, Inc.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be autogenerated?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

porbably but it's marshaling using protojson right now

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package messagequeue holds Runway's external message-queue contract: the wire
// payloads for the merge queues Runway owns, defined by the proto files in
// proto/ and generated into protopb/. The proto is the language-neutral
// authority; the generated Go types in protopb are the binding for Go callers.
//
// The message types are generated into protopb; this package adds only generic
// protojson glue (Marshal/Unmarshal) and the topic-key reflection lookup
// (TopicKeys), so there is no per-message serialization code. Payloads are
// serialized as protobuf JSON, not binary, so the MySQL-backed queue keeps
// storing self-describing JSON. The topic key that carries each payload is
// declared on the message itself via the topic_keys proto option (see
// api/base/messagequeue).
//
// One contract serves two queue pairs because a merge-conflict check is a dry
// run of a merge: Runway applies the same ordered steps onto the same target
// branch, and the only difference is whether it commits the result and reports
// the revisions it produced. The topic key a request arrives on encodes that choice
// — the merge-conflict-check pair for a dry run, the merge pair for a
// committing merge — so MergeRequest and MergeResult are identical on both.
package messagequeue

import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

basemqpb "github.com/uber/submitqueue/api/base/messagequeue/protopb"
"github.com/uber/submitqueue/api/runway/messagequeue/protopb"
)

// Wire payload types. These alias the generated protobuf bindings so callers
// reference the contract through this curated package rather than protopb.
type (
// MergeRequest is the payload a client publishes to one of Runway's merge
// queues: the merge-conflict-check topic for a dry-run check, the merge
// topic for a committing merge.
MergeRequest = protopb.MergeRequest
// MergeStep is one step of an ordered merge: a single set of change(s)
// applied with a strategy.
MergeStep = protopb.MergeStep
// MergeResult is the payload Runway publishes to the corresponding signal
// queue once a request completes.
MergeResult = protopb.MergeResult
// StepResult reports what happened to a single MergeStep.
StepResult = protopb.StepResult
// StepOutput is a single revision a merge step produced on the target branch.
StepOutput = protopb.StepOutput
)

// marshalOpts keeps the JSON field names identical to the proto field names
// (snake_case), so the wire shape matches the declared contract rather than
// protojson's default lowerCamelCase. Zero-valued fields are omitted.
var marshalOpts = protojson.MarshalOptions{UseProtoNames: true}

// unmarshalOpts tolerates unknown fields so an additive contract change (a new
// field a producer sends but this consumer does not yet know) is ignored rather
// than rejected.
var unmarshalOpts = protojson.UnmarshalOptions{DiscardUnknown: true}

// Marshal serializes any contract message to protojson bytes for the queue
// payload, keeping the proto field names (snake_case) on the wire.
func Marshal(m proto.Message) ([]byte, error) {
return marshalOpts.Marshal(m)
}

// Unmarshal deserializes protojson bytes into the contract message m, tolerating
// unknown fields so an additive contract change is ignored rather than rejected.
func Unmarshal[T proto.Message](b []byte, m T) error {
return unmarshalOpts.Unmarshal(b, m)
}

// TopicKeys returns the stable logical topic keys bound to a message via the
// topic_keys proto option — not concrete wire names; a caller maps each key to
// its backend's topic name. Returns nil for a message that declares no keys.
func TopicKeys(m proto.Message) []string {
opts := m.ProtoReflect().Descriptor().Options()
if opts == nil {
return nil
}
keys, ok := proto.GetExtension(opts, basemqpb.E_TopicKeys).([]string)
if !ok {
return nil
}
return keys
}
Loading
Loading