Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions runway/orchestrator/controller/merge/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "merge",
srcs = ["merge.go"],
importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/merge",
visibility = ["//visibility:public"],
deps = [
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/metrics",
"//runway/core/topickey",
"//runway/entity",
"//runway/extension/vcs",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "merge_test",
srcs = ["merge_test.go"],
embed = [":merge"],
deps = [
"//platform/base/change",
"//platform/base/mergestrategy",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/extension/messagequeue/mock",
"//runway/core/topickey",
"//runway/entity",
"//runway/extension/vcs",
"//runway/extension/vcs/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//zaptest",
],
)
161 changes: 161 additions & 0 deletions runway/orchestrator/controller/merge/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package merge

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally"
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/runway/core/topickey"
"github.com/uber/submitqueue/runway/entity"
"github.com/uber/submitqueue/runway/extension/vcs"
"go.uber.org/zap"
)

var _ consumer.Controller = (*Controller)(nil)

// Controller handles merge queue messages. It applies the ordered steps,
// commits the result to the remote, and publishes per-step outcomes back
// to the signal queue. Conflicts are expected outcomes (ack + publish a
// failure result); infrastructure errors are nacked for retry.
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
registry consumer.TopicRegistry
vcsFactory vcs.Factory
topicKey consumer.TopicKey
consumerGroup string
}

// Params are the parameters for creating a new merge controller.
type Params struct {
Registry consumer.TopicRegistry
VCSFactory vcs.Factory
TopicKey consumer.TopicKey
ConsumerGroup string

Scope tally.Scope
Logger *zap.SugaredLogger
}

// NewController creates a new merge controller.
func NewController(p Params) *Controller {
return &Controller{
logger: p.Logger.Named("merge_controller"),
metricsScope: p.Scope.SubScope("merge_controller"),
registry: p.Registry,
vcsFactory: p.VCSFactory,
topicKey: p.TopicKey,
consumerGroup: p.ConsumerGroup,
}
}

func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

msg := delivery.Message()

req, err := entity.MergeRequestFromBytes(msg.Payload)
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
return fmt.Errorf("failed to deserialize merge request: %w", err)
}

c.logger.Infow("received merge request",
"request_id", req.ID,
"queue", req.QueueName,
"step_count", len(req.Steps),
"attempt", delivery.Attempt(),
)

v, err := c.vcsFactory.For(vcs.Config{QueueName: req.QueueName})
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1)
return fmt.Errorf("failed to build VCS for queue %s: %w", req.QueueName, err)
}

result, err := v.Land(ctx, req)
switch {
case err == nil:
// Success — publish result with per-step output IDs.
case errors.Is(err, vcs.ErrConflict):
metrics.NamedCounter(c.metricsScope, opName, "conflicts", 1)
c.logger.Infow("merge conflict",
"request_id", req.ID,
)
conflictResult := entity.MergeResult{
ID: req.ID,
Success: false,
Reason: err.Error(),
}
if pubErr := c.publishResult(ctx, conflictResult, req.QueueName); pubErr != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish conflict result for request %s: %w", req.ID, pubErr)
}
return nil
default:
metrics.NamedCounter(c.metricsScope, opName, "land_errors", 1)
return fmt.Errorf("merge failed for request %s: %w", req.ID, err)
}

if err := c.publishResult(ctx, result, req.QueueName); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish merge result for request %s: %w", req.ID, err)
}

c.logger.Infow("published merge result",
"request_id", req.ID,
"success", result.Success,
"step_count", len(result.Steps),
)

return nil
}

func (c *Controller) publishResult(ctx context.Context, result entity.MergeResult, partitionKey string) error {
payload, err := result.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize merge result: %w", err)
}

q, ok := c.registry.Queue(topickey.TopicKeyMergeSignal)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyMergeSignal)
}

topicName, ok := c.registry.TopicName(topickey.TopicKeyMergeSignal)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyMergeSignal)
}

msg := entityqueue.NewMessage(result.ID, payload, partitionKey, nil)
if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

func (c *Controller) Name() string { return "merge" }
func (c *Controller) TopicKey() consumer.TopicKey { return c.topicKey }
func (c *Controller) ConsumerGroup() string { return c.consumerGroup }
Loading