diff --git a/runway/orchestrator/controller/merge/BUILD.bazel b/runway/orchestrator/controller/merge/BUILD.bazel new file mode 100644 index 00000000..964b3812 --- /dev/null +++ b/runway/orchestrator/controller/merge/BUILD.bazel @@ -0,0 +1,40 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "merge", + srcs = ["merge.go"], + importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/merge", + visibility = ["//visibility:public"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "merge_test", + srcs = ["merge_test.go"], + embed = [":merge"], + deps = [ + "//platform/base/change", + "//platform/base/mergestrategy", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs", + "//runway/extension/vcs/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/runway/orchestrator/controller/merge/merge.go b/runway/orchestrator/controller/merge/merge.go new file mode 100644 index 00000000..521e0e0a --- /dev/null +++ b/runway/orchestrator/controller/merge/merge.go @@ -0,0 +1,161 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "errors" + "fmt" + + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/runway/extension/vcs" + "go.uber.org/zap" +) + +var _ consumer.Controller = (*Controller)(nil) + +// Controller handles merge queue messages. It applies the ordered steps, +// commits the result to the remote, and publishes per-step outcomes back +// to the signal queue. Conflicts are expected outcomes (ack + publish a +// failure result); infrastructure errors are nacked for retry. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + vcsFactory vcs.Factory + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new merge controller. +type Params struct { + Registry consumer.TopicRegistry + VCSFactory vcs.Factory + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new merge controller. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("merge_controller"), + metricsScope: p.Scope.SubScope("merge_controller"), + registry: p.Registry, + vcsFactory: p.VCSFactory, + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + req, err := entity.MergeRequestFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize merge request: %w", err) + } + + c.logger.Infow("received merge request", + "request_id", req.ID, + "queue", req.QueueName, + "step_count", len(req.Steps), + "attempt", delivery.Attempt(), + ) + + v, err := c.vcsFactory.For(vcs.Config{QueueName: req.QueueName}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1) + return fmt.Errorf("failed to build VCS for queue %s: %w", req.QueueName, err) + } + + result, err := v.Land(ctx, req) + switch { + case err == nil: + // Success — publish result with per-step output IDs. + case errors.Is(err, vcs.ErrConflict): + metrics.NamedCounter(c.metricsScope, opName, "conflicts", 1) + c.logger.Infow("merge conflict", + "request_id", req.ID, + ) + conflictResult := entity.MergeResult{ + ID: req.ID, + Success: false, + Reason: err.Error(), + } + if pubErr := c.publishResult(ctx, conflictResult, req.QueueName); pubErr != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish conflict result for request %s: %w", req.ID, pubErr) + } + return nil + default: + metrics.NamedCounter(c.metricsScope, opName, "land_errors", 1) + return fmt.Errorf("merge failed for request %s: %w", req.ID, err) + } + + if err := c.publishResult(ctx, result, req.QueueName); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish merge result for request %s: %w", req.ID, err) + } + + c.logger.Infow("published merge result", + "request_id", req.ID, + "success", result.Success, + "step_count", len(result.Steps), + ) + + return nil +} + +func (c *Controller) publishResult(ctx context.Context, result entity.MergeResult, partitionKey string) error { + payload, err := result.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize merge result: %w", err) + } + + q, ok := c.registry.Queue(topickey.TopicKeyMergeSignal) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyMergeSignal) + } + + topicName, ok := c.registry.TopicName(topickey.TopicKeyMergeSignal) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyMergeSignal) + } + + msg := entityqueue.NewMessage(result.ID, payload, partitionKey, nil) + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +func (c *Controller) Name() string { return "merge" } +func (c *Controller) TopicKey() consumer.TopicKey { return c.topicKey } +func (c *Controller) ConsumerGroup() string { return c.consumerGroup } diff --git a/runway/orchestrator/controller/merge/merge_test.go b/runway/orchestrator/controller/merge/merge_test.go new file mode 100644 index 00000000..9b616ec7 --- /dev/null +++ b/runway/orchestrator/controller/merge/merge_test.go @@ -0,0 +1,199 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package merge + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/base/mergestrategy" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/runway/extension/vcs" + vcsmock "github.com/uber/submitqueue/runway/extension/vcs/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func testRequest() entity.MergeRequest { + return entity.MergeRequest{ + ID: "queue-a/42", + QueueName: "queue-a", + Steps: []entity.MergeStep{ + { + StepID: "queue-a/1", + Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}}, + Strategy: mergestrategy.MergeStrategyRebase, + }, + }, + } +} + +func captureRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) consumer.TopicRegistry { + t.Helper() + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + if captured != nil { + *captured = msg + } + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyMergeSignal, Name: "merger-signal", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} + +func makeDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { + t.Helper() + msg := entityqueue.NewMessage("queue-a/42", payload, "queue-a", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery +} + +func newTestController(t *testing.T, ctrl *gomock.Controller, mockVCS *vcsmock.MockVCS, publishErr error, captured *entityqueue.Message) *Controller { + t.Helper() + + factory := vcsmock.NewMockFactory(ctrl) + factory.EXPECT().For(gomock.Any()).Return(mockVCS, nil).AnyTimes() + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + Registry: captureRegistry(t, ctrl, publishErr, captured), + VCSFactory: factory, + TopicKey: topickey.TopicKeyMerge, + ConsumerGroup: "runway-merge", + }) +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + assert.Equal(t, "merge", controller.Name()) + assert.Equal(t, topickey.TopicKeyMerge, controller.TopicKey()) + assert.Equal(t, "runway-merge", controller.ConsumerGroup()) +} + +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + + expectedResult := entity.MergeResult{ + ID: "queue-a/42", + Success: true, + Steps: []entity.StepResult{{StepID: "queue-a/1", OutputIDs: []string{"abc123"}}}, + } + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, mockVCS, nil, &captured) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "queue-a/42", captured.ID) + assert.Equal(t, "queue-a", captured.PartitionKey) + + result, err := entity.MergeResultFromBytes(captured.Payload) + require.NoError(t, err) + assert.True(t, result.Success) + require.Len(t, result.Steps, 1) + assert.Equal(t, []string{"abc123"}, result.Steps[0].OutputIDs) +} + +func TestProcess_Conflict(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("conflict in foo.go: %w", vcs.ErrConflict)) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, mockVCS, nil, &captured) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.NoError(t, controller.Process(context.Background(), delivery)) + + result, err := entity.MergeResultFromBytes(captured.Payload) + require.NoError(t, err) + assert.False(t, result.Success) + assert.Contains(t, result.Reason, "conflict") +} + +func TestProcess_InfraError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("vcs unavailable")) + + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_DeserializeError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + delivery := makeDelivery(t, ctrl, []byte(`not json`)) + + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().Land(gomock.Any(), gomock.Any()).Return(entity.MergeResult{ID: "queue-a/42", Success: true}, nil) + + controller := newTestController(t, ctrl, mockVCS, assert.AnError, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +} diff --git a/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel b/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel new file mode 100644 index 00000000..9a906fe7 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/BUILD.bazel @@ -0,0 +1,39 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "mergeconflictcheck", + srcs = ["mergeconflictcheck.go"], + importpath = "github.com/uber/submitqueue/runway/orchestrator/controller/mergeconflictcheck", + visibility = ["//visibility:public"], + deps = [ + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/metrics", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_zap//:zap", + ], +) + +go_test( + name = "mergeconflictcheck_test", + srcs = ["mergeconflictcheck_test.go"], + embed = [":mergeconflictcheck"], + deps = [ + "//platform/base/change", + "//platform/base/mergestrategy", + "//platform/base/messagequeue", + "//platform/consumer", + "//platform/extension/messagequeue/mock", + "//runway/core/topickey", + "//runway/entity", + "//runway/extension/vcs/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@com_github_uber_go_tally//:tally", + "@org_uber_go_mock//gomock", + "@org_uber_go_zap//zaptest", + ], +) diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go new file mode 100644 index 00000000..b9805b55 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck.go @@ -0,0 +1,139 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflictcheck + +import ( + "context" + "fmt" + + "github.com/uber-go/tally" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" + "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + "github.com/uber/submitqueue/runway/extension/vcs" + "go.uber.org/zap" +) + +var _ consumer.Controller = (*Controller)(nil) + +// Controller handles merge-conflict-check queue messages. It performs a dry-run +// merge and publishes per-step mergeability results back to the signal queue. +type Controller struct { + logger *zap.SugaredLogger + metricsScope tally.Scope + registry consumer.TopicRegistry + vcsFactory vcs.Factory + topicKey consumer.TopicKey + consumerGroup string +} + +// Params are the parameters for creating a new merge-conflict-check controller. +type Params struct { + Registry consumer.TopicRegistry + VCSFactory vcs.Factory + TopicKey consumer.TopicKey + ConsumerGroup string + + Scope tally.Scope + Logger *zap.SugaredLogger +} + +// NewController creates a new merge-conflict-check controller. +func NewController(p Params) *Controller { + return &Controller{ + logger: p.Logger.Named("merge_conflict_check_controller"), + metricsScope: p.Scope.SubScope("merge_conflict_check_controller"), + registry: p.Registry, + vcsFactory: p.VCSFactory, + topicKey: p.TopicKey, + consumerGroup: p.ConsumerGroup, + } +} + +func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { + const opName = "process" + + op := metrics.Begin(c.metricsScope, opName) + defer func() { op.Complete(retErr) }() + + msg := delivery.Message() + + req, err := entity.MergeRequestFromBytes(msg.Payload) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) + return fmt.Errorf("failed to deserialize merge request: %w", err) + } + + c.logger.Infow("received merge-conflict-check request", + "request_id", req.ID, + "queue", req.QueueName, + "step_count", len(req.Steps), + "attempt", delivery.Attempt(), + ) + + v, err := c.vcsFactory.For(vcs.Config{QueueName: req.QueueName}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1) + return fmt.Errorf("failed to build VCS for queue %s: %w", req.QueueName, err) + } + + result, err := v.CheckMergeability(ctx, req) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "check_errors", 1) + return fmt.Errorf("merge-conflict check failed for request %s: %w", req.ID, err) + } + + if err := c.publishResult(ctx, result, req.QueueName); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish check result for request %s: %w", req.ID, err) + } + + c.logger.Infow("published merge-conflict-check result", + "request_id", req.ID, + "success", result.Success, + ) + + return nil +} + +func (c *Controller) publishResult(ctx context.Context, result entity.MergeResult, partitionKey string) error { + payload, err := result.ToBytes() + if err != nil { + return fmt.Errorf("failed to serialize merge result: %w", err) + } + + q, ok := c.registry.Queue(topickey.TopicKeyMergeConflictCheckSignal) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyMergeConflictCheckSignal) + } + + topicName, ok := c.registry.TopicName(topickey.TopicKeyMergeConflictCheckSignal) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyMergeConflictCheckSignal) + } + + msg := entityqueue.NewMessage(result.ID, payload, partitionKey, nil) + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + + return nil +} + +func (c *Controller) Name() string { return "merge-conflict-check" } +func (c *Controller) TopicKey() consumer.TopicKey { return c.topicKey } +func (c *Controller) ConsumerGroup() string { return c.consumerGroup } diff --git a/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go new file mode 100644 index 00000000..921dd954 --- /dev/null +++ b/runway/orchestrator/controller/mergeconflictcheck/mergeconflictcheck_test.go @@ -0,0 +1,188 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mergeconflictcheck + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber-go/tally" + "github.com/uber/submitqueue/platform/base/change" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/base/mergestrategy" + "github.com/uber/submitqueue/platform/consumer" + queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/core/topickey" + "github.com/uber/submitqueue/runway/entity" + vcsmock "github.com/uber/submitqueue/runway/extension/vcs/mock" + "go.uber.org/mock/gomock" + "go.uber.org/zap/zaptest" +) + +func testRequest() entity.MergeRequest { + return entity.MergeRequest{ + ID: "queue-a/42", + QueueName: "queue-a", + Steps: []entity.MergeStep{ + { + StepID: "queue-a/1", + Changes: []change.Change{{URIs: []string{"github://uber/repo/pull/1/abcdef0123456789abcdef0123456789abcdef01"}}}, + Strategy: mergestrategy.MergeStrategyRebase, + }, + }, + } +} + +func captureRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error, captured *entityqueue.Message) consumer.TopicRegistry { + t.Helper() + + mockPub := queuemock.NewMockPublisher(ctrl) + mockPub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, msg entityqueue.Message) error { + if captured != nil { + *captured = msg + } + return publishErr + }, + ).AnyTimes() + + mockQ := queuemock.NewMockQueue(ctrl) + mockQ.EXPECT().Publisher().Return(mockPub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: topickey.TopicKeyMergeConflictCheckSignal, Name: "merge-conflict-checker-signal", Queue: mockQ}, + }) + require.NoError(t, err) + return registry +} + +func makeDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { + t.Helper() + msg := entityqueue.NewMessage("queue-a/42", payload, "queue-a", nil) + delivery := queuemock.NewMockDelivery(ctrl) + delivery.EXPECT().Message().Return(msg).AnyTimes() + delivery.EXPECT().Attempt().Return(1).AnyTimes() + return delivery +} + +func newTestController(t *testing.T, ctrl *gomock.Controller, mockVCS *vcsmock.MockVCS, publishErr error, captured *entityqueue.Message) *Controller { + t.Helper() + + factory := vcsmock.NewMockFactory(ctrl) + factory.EXPECT().For(gomock.Any()).Return(mockVCS, nil).AnyTimes() + + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + Registry: captureRegistry(t, ctrl, publishErr, captured), + VCSFactory: factory, + TopicKey: topickey.TopicKeyMergeConflictCheck, + ConsumerGroup: "runway-merge-conflict-check", + }) +} + +func TestNewController(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + assert.Equal(t, "merge-conflict-check", controller.Name()) + assert.Equal(t, topickey.TopicKeyMergeConflictCheck, controller.TopicKey()) + assert.Equal(t, "runway-merge-conflict-check", controller.ConsumerGroup()) +} + +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + + expectedResult := entity.MergeResult{ + ID: "queue-a/42", + Success: true, + Steps: []entity.StepResult{{StepID: "queue-a/1"}}, + } + mockVCS.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + var captured entityqueue.Message + controller := newTestController(t, ctrl, mockVCS, nil, &captured) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "queue-a/42", captured.ID) + assert.Equal(t, "queue-a", captured.PartitionKey) + + var result entity.MergeResult + result, err = entity.MergeResultFromBytes(captured.Payload) + require.NoError(t, err) + assert.True(t, result.Success) + assert.Equal(t, "queue-a/42", result.ID) +} + +func TestProcess_Errors(t *testing.T) { + tests := []struct { + name string + payload []byte + }{ + {name: "invalid json", payload: []byte(`not json`)}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + controller := newTestController(t, ctrl, mockVCS, nil, nil) + delivery := makeDelivery(t, ctrl, tt.payload) + + require.Error(t, controller.Process(context.Background(), delivery)) + }) + } +} + +func TestProcess_VCSError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(entity.MergeResult{}, fmt.Errorf("vcs unavailable")) + + controller := newTestController(t, ctrl, mockVCS, nil, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +} + +func TestProcess_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + mockVCS := vcsmock.NewMockVCS(ctrl) + mockVCS.EXPECT().CheckMergeability(gomock.Any(), gomock.Any()).Return(entity.MergeResult{ID: "queue-a/42", Success: true}, nil) + + controller := newTestController(t, ctrl, mockVCS, assert.AnError, nil) + + req := testRequest() + payload, err := req.ToBytes() + require.NoError(t, err) + + delivery := makeDelivery(t, ctrl, payload) + require.Error(t, controller.Process(context.Background(), delivery)) +}