Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions stovepipe/core/filter/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "filter",
srcs = ["filter.go"],
importpath = "github.com/uber/submitqueue/stovepipe/core/filter",
visibility = ["//visibility:public"],
)
18 changes: 16 additions & 2 deletions stovepipe/entity/entity.go → stovepipe/core/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package entity holds Stovepipe-specific domain types (distinct from shared repo entity/).
package entity
// Package filter implements a filter for commit events.
package filter

import "strings"

// ShouldProcess reports whether a commit URI should be processed by the pipeline.
// watchedPrefixes is a list of URI prefixes to match against the commit URI.
// Example prefix: "git://github.com/uber/go-code/refs/heads/main"
func ShouldProcess(uri string, watchedPrefixes []string) bool {
for _, prefix := range watchedPrefixes {
if strings.HasPrefix(uri, prefix) {
return true
}
}
return false
}
5 changes: 1 addition & 4 deletions stovepipe/entity/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "entity",
srcs = [
"entity.go",
"ingest_request.go",
],
srcs = ["ingest_request.go"],
importpath = "github.com/uber/submitqueue/stovepipe/entity",
visibility = ["//visibility:public"],
deps = ["//platform/base/change"],
Expand Down
25 changes: 23 additions & 2 deletions stovepipe/gateway/controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,47 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "controller",
srcs = ["ping.go"],
srcs = [
"ingest.go",
"ping.go",
],
importpath = "github.com/uber/submitqueue/stovepipe/gateway/controller",
visibility = ["//visibility:public"],
deps = [
"//api/stovepipe/gateway/protopb",
"//platform/base/change",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/errs",
"//platform/extension/counter",
"//platform/metrics",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "controller_test",
srcs = ["ping_test.go"],
srcs = [
"ingest_test.go",
"ping_test.go",
],
embed = [":controller"],
deps = [
"//api/base/change/protopb",
"//api/stovepipe/gateway/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/extension/counter/mock",
"//platform/extension/messagequeue/mock",
"//stovepipe/core/topickey",
"//stovepipe/entity",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
],
)
138 changes: 138 additions & 0 deletions stovepipe/gateway/controller/ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) 2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally"
pb "github.com/uber/submitqueue/api/stovepipe/gateway/protopb"
"github.com/uber/submitqueue/platform/base/change"
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/errs"
"github.com/uber/submitqueue/platform/extension/counter"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/stovepipe/core/topickey"
"github.com/uber/submitqueue/stovepipe/entity"
"go.uber.org/zap"
)

// ErrInvalidRequest is returned when the request fails validation.
// This error should be mapped to codes.InvalidArgument at the gRPC layer.
var ErrInvalidRequest = errs.NewUserError(errors.New("invalid request"))

// IsInvalidRequest returns true if any error in the error chain is ErrInvalidRequest.
func IsInvalidRequest(err error) bool {
return errors.Is(err, ErrInvalidRequest)
}

// IngestController handles ingest business logic for the stovepipe gateway.
type IngestController struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
counter counter.Counter
registry consumer.TopicRegistry
}

// NewIngestController creates a new instance of the stovepipe ingest controller.
// The controller publishes ingest requests to the topic registered under
// topickey.TopicKeyStart in the registry.
func NewIngestController(logger *zap.SugaredLogger, scope tally.Scope, counter counter.Counter, registry consumer.TopicRegistry) *IngestController {
return &IngestController{
logger: logger,
metricsScope: scope.SubScope("ingest_controller"),
counter: counter,
registry: registry,
}
}

// Ingest validates the request, generates a SPID, publishes the ingest request
// to the pipeline queue, and returns the SPID for tracking.
func (c *IngestController) Ingest(ctx context.Context, req *pb.IngestRequest) (resp *pb.IngestResponse, retErr error) {
const opName = "ingest"

op := metrics.Begin(c.metricsScope, opName)
defer func() { op.Complete(retErr) }()

if req.Queue == "" {
return nil, fmt.Errorf("IngestController requires the request to have a queue name specified: %w", ErrInvalidRequest)
}
if req.Change == nil || len(req.Change.Uris) == 0 {
return nil, fmt.Errorf("IngestController requires the request to have at least one change URI specified: %w", ErrInvalidRequest)
}

queue := req.Queue

seq, err := c.counter.Next(ctx, "ingest/"+queue)
if err != nil {
return nil, fmt.Errorf("IngestController failed to generate SPID for queue=%s: %w", queue, err)
}

ingestRequest := entity.IngestRequest{
ID: fmt.Sprintf("%s/%d", queue, seq),
Queue: queue,
Change: change.Change{URIs: req.Change.GetUris()},
}

c.logger.Debugw("ingest request created",
"queue", queue,
"spid", ingestRequest.ID,
"change_uris", ingestRequest.Change.URIs,
)

if err := c.publishToQueue(ctx, ingestRequest); err != nil {
return nil, fmt.Errorf("IngestController failed to publish request to queue: %w", err)
}

c.logger.Infow("ingest request published to queue",
"queue", queue,
"spid", ingestRequest.ID,
"topic_key", topickey.TopicKeyStart,
)
metrics.NamedCounter(c.metricsScope, opName, "publish_success", 1)

return &pb.IngestResponse{
Spid: ingestRequest.ID,
}, nil
}

// publishToQueue serializes the ingest request and publishes it to the start topic.
func (c *IngestController) publishToQueue(ctx context.Context, ingestRequest entity.IngestRequest) error {
payload, err := ingestRequest.ToBytes()
if err != nil {
return fmt.Errorf("failed to serialize ingest request: %w", err)
}

msg := entityqueue.NewMessage(ingestRequest.ID, payload, ingestRequest.Queue, nil)

q, ok := c.registry.Queue(topickey.TopicKeyStart)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", topickey.TopicKeyStart)
}

topicName, ok := c.registry.TopicName(topickey.TopicKeyStart)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", topickey.TopicKeyStart)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish ingest request message: %w", err)
}

return nil
}
Loading
Loading