diff --git a/app.go b/app.go index 2106e94..969b6f6 100644 --- a/app.go +++ b/app.go @@ -160,6 +160,9 @@ func (a *App) Drain(ctx context.Context, queue string, max int) (int, error) { } func (a *App) dispatch(ctx context.Context, msg *ReceivedMessage) { + if msg.Headers[HeaderReplayBypass] != "" { + ctx = withReplay(ctx) + } env, _ := Decode([]byte(msg.Body)) urn := env.URN() diff --git a/redrive.go b/redrive.go index d8188bb..d4536ae 100644 --- a/redrive.go +++ b/redrive.go @@ -18,6 +18,11 @@ type RedriveOptions struct { // Select, when non-nil, picks which messages to redrive (e.g. by reason or URN). // Unselected messages are returned to the DLQ unchanged. Select func(Envelope) bool + // Bypass stamps the bq-replay-bypass transport header on each redriven message, so a + // handler can skip external side-effects that already ran (see [BypassExternalEffects]). + // It takes effect only when the transport is a [HeaderPublisher]; otherwise it is a no-op + // and the per-item Bypassed flag stays false (ADR-0027). + Bypass bool // Timeout is the per-pop wait passed to the transport (default 1s). Timeout time.Duration } @@ -31,6 +36,7 @@ type RedriveItem struct { From string // the DLQ it was read from To string // target queue (the plan, even on a dry run; "" when skipped/undecodable) Redriven bool // true only when actually re-published to To + Bypassed bool // true when the bq-replay-bypass header was stamped on the redriven message } // RedriveResult summarizes a [Redrive] run. @@ -124,13 +130,15 @@ func Redrive(ctx context.Context, t Transport, dlq string, opts RedriveOptions) _ = t.Ack(ctx, p.msg) return res, err } - if err := t.Publish(ctx, target, string(body)); err != nil { + bypassed, err := publishRedriven(ctx, t, target, string(body), opts.Bypass) + if err != nil { _ = t.Publish(ctx, dlq, p.msg.Body) // restore on a publish failure _ = t.Ack(ctx, p.msg) return res, err } _ = t.Ack(ctx, p.msg) item.Redriven = true + item.Bypassed = bypassed res.Redriven++ res.Items = append(res.Items, item) } @@ -145,3 +153,15 @@ func sourceQueueOf(env Envelope) string { } return env.Meta.Queue } + +// publishRedriven re-publishes a reset message to queue. When bypass is set and the transport +// is a [HeaderPublisher], it stamps the bq-replay-bypass header and reports bypassed=true; +// otherwise it publishes plainly and reports bypassed=false. +func publishRedriven(ctx context.Context, t Transport, queue, body string, bypass bool) (bool, error) { + if bypass { + if hp, ok := t.(HeaderPublisher); ok { + return true, hp.PublishWithHeaders(ctx, queue, body, map[string]string{HeaderReplayBypass: "1"}) + } + } + return false, t.Publish(ctx, queue, body) +} diff --git a/replay.go b/replay.go new file mode 100644 index 0000000..b40093c --- /dev/null +++ b/replay.go @@ -0,0 +1,42 @@ +package babelqueue + +import "context" + +// HeaderReplayBypass is the out-of-band transport header [Redrive] stamps (when its options +// set Bypass) on a replayed message, and that the [App] runtime surfaces to the handler as +// [IsReplay]. It lets a handler skip external side-effects that already ran, with no change to +// the frozen envelope (ADR-0027). +const HeaderReplayBypass = "bq-replay-bypass" + +type replayKey struct{} + +func withReplay(ctx context.Context) context.Context { + return context.WithValue(ctx, replayKey{}, true) +} + +// IsReplay reports whether the message currently being handled was redriven with the +// replay-bypass marker set — i.e. this is a deliberate replay, and external side-effects that +// already happened should be skipped. It reads the marker the runtime put on the context from +// the [HeaderReplayBypass] transport header. +func IsReplay(ctx context.Context) bool { + v, _ := ctx.Value(replayKey{}).(bool) + return v +} + +// BypassExternalEffects runs fn unless the current message is a replay (see [IsReplay]), in +// which case fn is skipped and nil is returned. Wrap the external, non-idempotent side of a +// handler — sending an email, charging a card, calling a third party — so a replay re-runs the +// idempotent core but does not re-fire effects that already happened: +// +// app.Handle("urn:babel:orders:created", func(ctx context.Context, env babelqueue.Envelope) error { +// saveOrder(env) // idempotent core — always runs +// return babelqueue.BypassExternalEffects(ctx, func() error { +// return sendConfirmationEmail(env) // external effect — skipped on replay +// }) +// }) +func BypassExternalEffects(ctx context.Context, fn func() error) error { + if IsReplay(ctx) { + return nil + } + return fn() +} diff --git a/replay_test.go b/replay_test.go new file mode 100644 index 0000000..3d00f70 --- /dev/null +++ b/replay_test.go @@ -0,0 +1,112 @@ +package babelqueue + +import ( + "context" + "errors" + "testing" + "time" +) + +func TestIsReplayAndBypassExternalEffects(t *testing.T) { + bg := context.Background() + if IsReplay(bg) { + t.Error("a plain context must not be a replay") + } + rp := withReplay(bg) + if !IsReplay(rp) { + t.Error("a withReplay context must be a replay") + } + + ran := false + if err := BypassExternalEffects(bg, func() error { ran = true; return nil }); err != nil { + t.Fatal(err) + } + if !ran { + t.Error("fn must run when not a replay") + } + + ran = false + if err := BypassExternalEffects(rp, func() error { ran = true; return errors.New("must not run") }); err != nil { + t.Fatalf("a replay must skip fn and return nil, got %v", err) + } + if ran { + t.Error("fn must be skipped on a replay") + } +} + +func TestInMemoryTransportHeadersRoundTrip(t *testing.T) { + tr := NewInMemoryTransport() + if err := tr.PublishWithHeaders(context.Background(), "q", "body", map[string]string{HeaderReplayBypass: "1"}); err != nil { + t.Fatal(err) + } + msg, _ := tr.Pop(context.Background(), "q", 0) + if msg == nil || msg.Headers[HeaderReplayBypass] != "1" { + t.Fatalf("headers not carried through Pop: %+v", msg) + } + _ = tr.Publish(context.Background(), "q", "plain") + m2, _ := tr.Pop(context.Background(), "q", 0) + if m2.Headers[HeaderReplayBypass] != "" { + t.Error("a plain Publish must carry no replay header") + } +} + +func TestRedriveBypassStampsHeaderAndConsumeSkipsEffects(t *testing.T) { + tr := NewInMemoryTransport() + deadLettered(t, tr, "orders.dlq", "urn:babel:orders:created", "orders", map[string]any{"order_id": 1}) + + res, err := Redrive(context.Background(), tr, "orders.dlq", RedriveOptions{Bypass: true}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 1 || len(res.Items) != 1 || !res.Items[0].Bypassed { + t.Fatalf("expected one bypassed redrive: %+v", res) + } + + msg, _ := tr.Pop(context.Background(), "orders", 0) + if msg == nil || msg.Headers[HeaderReplayBypass] != "1" { + t.Fatalf("redriven message is missing the bypass header: %+v", msg) + } + + emailed := false + app := NewApp(tr) + app.Handle("urn:babel:orders:created", func(ctx context.Context, _ Envelope) error { + if !IsReplay(ctx) { + t.Error("the handler should see this delivery as a replay") + } + return BypassExternalEffects(ctx, func() error { emailed = true; return nil }) + }) + app.dispatch(context.Background(), msg) + if emailed { + t.Error("the external side-effect must be skipped on a bypassed replay") + } +} + +func TestRedriveBypassWithoutHeaderSupportFallsBack(t *testing.T) { + base := NewInMemoryTransport() + deadLettered(t, base, "dlq", "urn:babel:orders:created", "orders", nil) + tr := plainTransport{inner: base} + + res, err := Redrive(context.Background(), tr, "dlq", RedriveOptions{Bypass: true}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 1 || res.Items[0].Bypassed { + t.Fatalf("Bypass must be a no-op when the transport is not a HeaderPublisher: %+v", res) + } +} + +// plainTransport wraps InMemoryTransport but exposes only the base Transport methods, so it is +// deliberately NOT a HeaderPublisher. +type plainTransport struct{ inner *InMemoryTransport } + +func (p plainTransport) Publish(ctx context.Context, q, b string) error { + return p.inner.Publish(ctx, q, b) +} + +func (p plainTransport) Pop(ctx context.Context, q string, d time.Duration) (*ReceivedMessage, error) { + return p.inner.Pop(ctx, q, d) +} + +func (p plainTransport) Ack(ctx context.Context, m *ReceivedMessage) error { + return p.inner.Ack(ctx, m) +} diff --git a/transport.go b/transport.go index 723a63f..4c82cbd 100644 --- a/transport.go +++ b/transport.go @@ -12,6 +12,10 @@ type ReceivedMessage struct { Body string Queue string Handle any + // Headers are out-of-band transport headers a [HeaderPublisher] carried with the + // message (e.g. the bq-replay-bypass marker). Nil for transports that don't surface + // them; reads are nil-safe. + Headers map[string]string } // Transport is the minimal broker contract the [App] runtime talks to: publish a @@ -29,24 +33,45 @@ type Transport interface { Ack(ctx context.Context, msg *ReceivedMessage) error } +// HeaderPublisher is an optional [Transport] capability: publish a body together with +// out-of-band transport headers (e.g. the replay-bypass marker), for brokers that carry +// per-message metadata. A transport that does not implement it simply does not propagate +// headers — callers fall back to plain Publish (ADR-0027). +type HeaderPublisher interface { + PublishWithHeaders(ctx context.Context, queue, body string, headers map[string]string) error +} + // InMemoryTransport is an in-process [Transport] for tests and broker-free local // runs. It is safe for concurrent use. Pop returns immediately (it does not block // for the timeout) — when the queue is empty it returns (nil, nil). +type inMemoryMessage struct { + body string + headers map[string]string +} + type InMemoryTransport struct { mu sync.Mutex - queues map[string][]string + queues map[string][]inMemoryMessage } // NewInMemoryTransport returns an empty in-process transport. func NewInMemoryTransport() *InMemoryTransport { - return &InMemoryTransport{queues: make(map[string][]string)} + return &InMemoryTransport{queues: make(map[string][]inMemoryMessage)} } // Publish appends body to the in-memory queue. func (t *InMemoryTransport) Publish(_ context.Context, queue, body string) error { t.mu.Lock() defer t.mu.Unlock() - t.queues[queue] = append(t.queues[queue], body) + t.queues[queue] = append(t.queues[queue], inMemoryMessage{body: body}) + return nil +} + +// PublishWithHeaders appends body plus out-of-band headers ([HeaderPublisher]). +func (t *InMemoryTransport) PublishWithHeaders(_ context.Context, queue, body string, headers map[string]string) error { + t.mu.Lock() + defer t.mu.Unlock() + t.queues[queue] = append(t.queues[queue], inMemoryMessage{body: body, headers: headers}) return nil } @@ -58,9 +83,9 @@ func (t *InMemoryTransport) Pop(_ context.Context, queue string, _ time.Duration if len(q) == 0 { return nil, nil } - body := q[0] + m := q[0] t.queues[queue] = q[1:] - return &ReceivedMessage{Body: body, Queue: queue}, nil + return &ReceivedMessage{Body: m.body, Queue: queue, Headers: m.headers}, nil } // Ack is a no-op: Pop already removed the message.