From dc1a0d3af52e60ed400987a52b57164f77d88f3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Fri, 19 Jun 2026 08:11:29 +0300 Subject: [PATCH] =?UTF-8?q?feat:=20DLQ=20redrive=20tooling=20=E2=80=94=20s?= =?UTF-8?q?afe=20replay=20off=20the=20dead-letter=20queue=20(ADR-0026)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Redrive(ctx, transport, dlq, opts): reads dead-lettered messages, resets each (strip dead_letter, attempts->0, preserve job/trace_id/data/meta) and re-publishes to its dead_letter.original_queue or a chosen ToQueue. Options: ToQueue (sandbox replay), Max, DryRun (inspect+restore), Select (filter). Drains-then-processes; acks a DLQ message only after a successful re-publish; restores undecodable bodies. Pure transport+codec, no new dependency. Replay-Bypass header is a documented phase two. --- redrive.go | 147 ++++++++++++++++++++++++++++++ redrive_test.go | 237 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 384 insertions(+) create mode 100644 redrive.go create mode 100644 redrive_test.go diff --git a/redrive.go b/redrive.go new file mode 100644 index 0000000..d8188bb --- /dev/null +++ b/redrive.go @@ -0,0 +1,147 @@ +package babelqueue + +import ( + "context" + "time" +) + +// RedriveOptions configures a [Redrive] run. +type RedriveOptions struct { + // ToQueue overrides where messages are re-published. When empty, each message goes back + // to its own dead_letter.original_queue; set it to a sandbox queue to replay safely. + ToQueue string + // Max caps how many messages are pulled from the DLQ (0 = all currently available). + Max int + // DryRun inspects without redriving: every message is read, reported, and returned to the + // DLQ unchanged — nothing is re-published to a source/sandbox queue. + DryRun bool + // Select, when non-nil, picks which messages to redrive (e.g. by reason or URN). + // Unselected messages are returned to the DLQ unchanged. + Select func(Envelope) bool + // Timeout is the per-pop wait passed to the transport (default 1s). + Timeout time.Duration +} + +// RedriveItem records what happened to one message during a [Redrive] run. +type RedriveItem struct { + MessageID string // meta.id ("" if absent or undecodable) + TraceID string + URN string + Reason string // dead_letter.reason + From string // the DLQ it was read from + To string // target queue (the plan, even on a dry run; "" when skipped/undecodable) + Redriven bool // true only when actually re-published to To +} + +// RedriveResult summarizes a [Redrive] run. +type RedriveResult struct { + Redriven int + Skipped int + Items []RedriveItem +} + +// Redrive moves dead-lettered messages off the dlq queue and re-publishes each to its source +// queue (its dead_letter.original_queue) or to opts.ToQueue, resetting it for reprocessing: +// the dead_letter block is removed and attempts reset to 0, while job, trace_id, data and meta +// are preserved verbatim. It is the operator-side counterpart to the runtime's dead-letter +// routing — the contract leaves redrive to tooling, and this is that tool (ADR-0026). +// +// Messages are drained from the DLQ first and then processed, so restored messages (skipped, +// dry-run, or undecodable) are never re-encountered in the same run. A DLQ message is +// acknowledged only after its re-publish succeeds; an undecodable body is restored, not lost. +func Redrive(ctx context.Context, t Transport, dlq string, opts RedriveOptions) (RedriveResult, error) { + timeout := opts.Timeout + if timeout <= 0 { + timeout = time.Second + } + + type pending struct { + msg *ReceivedMessage + env Envelope + decoded bool + } + var batch []pending + for opts.Max == 0 || len(batch) < opts.Max { + msg, err := t.Pop(ctx, dlq, timeout) + if err != nil { + return RedriveResult{}, err + } + if msg == nil { + break + } + env, derr := Decode([]byte(msg.Body)) + batch = append(batch, pending{msg: msg, env: env, decoded: derr == nil}) + } + + res := RedriveResult{Items: make([]RedriveItem, 0, len(batch))} + for _, p := range batch { + if !p.decoded { + _ = t.Publish(ctx, dlq, p.msg.Body) // restore the poison body; never drop it + _ = t.Ack(ctx, p.msg) + res.Skipped++ + res.Items = append(res.Items, RedriveItem{From: dlq}) + continue + } + + item := RedriveItem{ + MessageID: p.env.Meta.ID, + TraceID: p.env.TraceID, + URN: p.env.URN(), + From: dlq, + } + if p.env.DeadLetter != nil { + item.Reason = p.env.DeadLetter.Reason + } + + if opts.Select != nil && !opts.Select(p.env) { + _ = t.Publish(ctx, dlq, p.msg.Body) // not selected: restore unchanged + _ = t.Ack(ctx, p.msg) + res.Skipped++ + res.Items = append(res.Items, item) + continue + } + + target := opts.ToQueue + if target == "" { + target = sourceQueueOf(p.env) + } + item.To = target + + if opts.DryRun { + _ = t.Publish(ctx, dlq, p.msg.Body) // report the plan; restore unchanged + _ = t.Ack(ctx, p.msg) + res.Skipped++ + res.Items = append(res.Items, item) + continue + } + + reset := p.env + reset.DeadLetter = nil + reset.Attempts = 0 + body, err := reset.Encode() + if err != nil { + _ = t.Publish(ctx, dlq, p.msg.Body) // restore on a re-encode failure + _ = t.Ack(ctx, p.msg) + return res, err + } + if err := t.Publish(ctx, target, string(body)); err != nil { + _ = t.Publish(ctx, dlq, p.msg.Body) // restore on a publish failure + _ = t.Ack(ctx, p.msg) + return res, err + } + _ = t.Ack(ctx, p.msg) + item.Redriven = true + res.Redriven++ + res.Items = append(res.Items, item) + } + return res, nil +} + +// sourceQueueOf returns where a redriven message should go by default: its +// dead_letter.original_queue, falling back to meta.queue. +func sourceQueueOf(env Envelope) string { + if env.DeadLetter != nil && env.DeadLetter.OriginalQueue != "" { + return env.DeadLetter.OriginalQueue + } + return env.Meta.Queue +} diff --git a/redrive_test.go b/redrive_test.go new file mode 100644 index 0000000..80a72ec --- /dev/null +++ b/redrive_test.go @@ -0,0 +1,237 @@ +package babelqueue + +import ( + "context" + "errors" + "testing" +) + +// deadLettered builds a dead-lettered envelope and puts it on the dlq queue. +func deadLettered(t *testing.T, tr *InMemoryTransport, dlq, urn, originalQueue string, data map[string]any) Envelope { + t.Helper() + env, err := Make(urn, data, WithQueue(originalQueue)) + if err != nil { + t.Fatal(err) + } + dl := Annotate(env, "failed", originalQueue, 3, errors.New("boom")) + body, err := dl.Encode() + if err != nil { + t.Fatal(err) + } + if err := tr.Publish(context.Background(), dlq, string(body)); err != nil { + t.Fatal(err) + } + return dl +} + +// drain reads and acks every message on a queue, returning the decoded envelopes. +func drain(t *testing.T, tr *InMemoryTransport, queue string) []Envelope { + t.Helper() + var out []Envelope + for { + msg, err := tr.Pop(context.Background(), queue, 0) + if err != nil { + t.Fatal(err) + } + if msg == nil { + break + } + env, derr := Decode([]byte(msg.Body)) + if derr != nil { + t.Fatalf("undecodable on %s: %v", queue, derr) + } + out = append(out, env) + _ = tr.Ack(context.Background(), msg) + } + return out +} + +func TestRedrive_ToSource(t *testing.T) { + tr := NewInMemoryTransport() + orig := deadLettered(t, tr, "orders.dlq", "urn:babel:orders:created", "orders", map[string]any{"order_id": 1}) + + res, err := Redrive(context.Background(), tr, "orders.dlq", RedriveOptions{}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 1 || res.Skipped != 0 { + t.Fatalf("res = %+v", res) + } + + got := drain(t, tr, "orders") + if len(got) != 1 { + t.Fatalf("source queue has %d messages, want 1", len(got)) + } + if got[0].DeadLetter != nil { + t.Error("dead_letter block was not stripped") + } + if got[0].Attempts != 0 { + t.Errorf("attempts = %d, want 0 (reset)", got[0].Attempts) + } + if got[0].TraceID != orig.TraceID { + t.Error("trace_id was not preserved") + } + if got[0].URN() != "urn:babel:orders:created" { + t.Errorf("urn = %q", got[0].URN()) + } + if d := drain(t, tr, "orders.dlq"); len(d) != 0 { + t.Errorf("DLQ not drained: %d left", len(d)) + } +} + +func TestRedrive_ToSandbox(t *testing.T) { + tr := NewInMemoryTransport() + deadLettered(t, tr, "orders.dlq", "urn:babel:orders:created", "orders", nil) + + res, err := Redrive(context.Background(), tr, "orders.dlq", RedriveOptions{ToQueue: "sandbox"}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 1 { + t.Fatalf("res = %+v", res) + } + if len(drain(t, tr, "orders")) != 0 { + t.Error("sandbox replay must not touch the source queue") + } + if len(drain(t, tr, "sandbox")) != 1 { + t.Error("message did not land on the sandbox queue") + } +} + +func TestRedrive_DryRun(t *testing.T) { + tr := NewInMemoryTransport() + deadLettered(t, tr, "orders.dlq", "urn:babel:orders:created", "orders", nil) + + res, err := Redrive(context.Background(), tr, "orders.dlq", RedriveOptions{DryRun: true}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 0 || res.Skipped != 1 { + t.Fatalf("res = %+v", res) + } + if len(res.Items) != 1 || res.Items[0].To != "orders" || res.Items[0].Redriven { + t.Errorf("dry-run should report the plan without redriving: %+v", res.Items) + } + if len(drain(t, tr, "orders")) != 0 { + t.Error("dry-run touched the source queue") + } + if d := drain(t, tr, "orders.dlq"); len(d) != 1 || d[0].DeadLetter == nil { + t.Error("dry-run altered the DLQ") + } +} + +func TestRedrive_Select(t *testing.T) { + tr := NewInMemoryTransport() + deadLettered(t, tr, "dlq", "urn:babel:orders:created", "orders", nil) + deadLettered(t, tr, "dlq", "urn:babel:emails:welcome", "emails", nil) + + res, err := Redrive(context.Background(), tr, "dlq", RedriveOptions{ + Select: func(e Envelope) bool { return e.URN() == "urn:babel:orders:created" }, + }) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 1 || res.Skipped != 1 { + t.Fatalf("res = %+v", res) + } + if len(drain(t, tr, "orders")) != 1 { + t.Error("selected message was not redriven") + } + if len(drain(t, tr, "emails")) != 0 { + t.Error("unselected message was wrongly redriven") + } + if len(drain(t, tr, "dlq")) != 1 { + t.Error("unselected message was not restored to the DLQ") + } +} + +func TestRedrive_Max(t *testing.T) { + tr := NewInMemoryTransport() + for i := 0; i < 3; i++ { + deadLettered(t, tr, "dlq", "urn:babel:orders:created", "orders", nil) + } + res, err := Redrive(context.Background(), tr, "dlq", RedriveOptions{Max: 2}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 2 { + t.Fatalf("res = %+v", res) + } + if len(drain(t, tr, "dlq")) != 1 { + t.Error("Max was not respected — the DLQ should still hold 1") + } +} + +// failOnTarget refuses to publish to one queue, to exercise the restore-on-failure path. +type failOnTarget struct { + *InMemoryTransport + failQueue string +} + +func (f *failOnTarget) Publish(ctx context.Context, queue, body string) error { + if queue == f.failQueue { + return errors.New("publish refused") + } + return f.InMemoryTransport.Publish(ctx, queue, body) +} + +func TestRedrive_PublishFailureRestores(t *testing.T) { + base := NewInMemoryTransport() + deadLettered(t, base, "dlq", "urn:babel:orders:created", "orders", nil) + tr := &failOnTarget{InMemoryTransport: base, failQueue: "orders"} + + if _, err := Redrive(context.Background(), tr, "dlq", RedriveOptions{}); err == nil { + t.Fatal("expected the publish error to surface") + } + if len(drain(t, base, "dlq")) != 1 { + t.Error("a message must be restored to the DLQ when its re-publish fails") + } + if len(drain(t, base, "orders")) != 0 { + t.Error("nothing should have reached the source queue") + } +} + +func TestRedrive_NoDeadLetterFallsBackToMetaQueue(t *testing.T) { + tr := NewInMemoryTransport() + // a plain (never dead-lettered) envelope sitting on the DLQ — redrive falls back to meta.queue + env, err := Make("urn:babel:orders:created", nil, WithQueue("orders")) + if err != nil { + t.Fatal(err) + } + body, err := env.Encode() + if err != nil { + t.Fatal(err) + } + if err := tr.Publish(context.Background(), "dlq", string(body)); err != nil { + t.Fatal(err) + } + + res, err := Redrive(context.Background(), tr, "dlq", RedriveOptions{}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 1 { + t.Fatalf("res = %+v", res) + } + if len(drain(t, tr, "orders")) != 1 { + t.Error("a message with no dead_letter block should redrive to its meta.queue") + } +} + +func TestRedrive_UndecodableRestored(t *testing.T) { + tr := NewInMemoryTransport() + if err := tr.Publish(context.Background(), "dlq", "not-json{{{"); err != nil { + t.Fatal(err) + } + res, err := Redrive(context.Background(), tr, "dlq", RedriveOptions{}) + if err != nil { + t.Fatal(err) + } + if res.Redriven != 0 || res.Skipped != 1 { + t.Fatalf("res = %+v", res) + } + msg, _ := tr.Pop(context.Background(), "dlq", 0) + if msg == nil || msg.Body != "not-json{{{" { + t.Error("an undecodable body must be restored to the DLQ, not lost") + } +}