From eacf6e31fe00d9f41ed926562382f47f5290e8f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Fri, 19 Jun 2026 08:45:04 +0300 Subject: [PATCH] =?UTF-8?q?feat(redrive):=20DLQ=20redrive=20tooling=20?= =?UTF-8?q?=E2=80=94=20safe=20replay=20(ADR-0026)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit .NET mirror of babelqueue-go Redrive: Redrive.RedriveAsync(transport, dlq, options) drains a DLQ and re-publishes each message (Reset: strip dead_letter, attempts->0, preserve job/trace_id/data/meta via a record with-expression) to its dead_letter.original_queue or options.ToQueue. Options ToQueue/Max/DryRun/Select. The codec-only core has no transport, so it works over a minimal Redrive.ITransport seam (async pop/publish/ack). Drains-then-processes; acks only after a successful re-publish; restores undecodable bodies (a null-Job envelope) and on publish failure (try/finally). No new dependency; envelope frozen. Replay-Bypass header documented as phase two. --- src/BabelQueue.Core/Redrive.cs | 192 ++++++++++++++++++++ tests/BabelQueue.Core.Tests/RedriveTests.cs | 191 +++++++++++++++++++ 2 files changed, 383 insertions(+) create mode 100644 src/BabelQueue.Core/Redrive.cs create mode 100644 tests/BabelQueue.Core.Tests/RedriveTests.cs diff --git a/src/BabelQueue.Core/Redrive.cs b/src/BabelQueue.Core/Redrive.cs new file mode 100644 index 0000000..75d707a --- /dev/null +++ b/src/BabelQueue.Core/Redrive.cs @@ -0,0 +1,192 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace BabelQueue; + +/// +/// DLQ redrive tooling — safe replay off the dead-letter queue (ADR-0026). +/// +/// +/// +/// The operator-side counterpart to the runtime's dead-letter routing: it reads dead-lettered +/// messages off a DLQ and re-publishes each to its source queue (its +/// dead_letter.original_queue) or a chosen queue, for reprocessing — +/// the dead_letter block removed and attempts reset to 0, while job, +/// trace_id, data and meta are preserved verbatim. +/// +/// +/// The codec-only .NET core has no transport, so works over a minimal +/// seam the caller implements over their broker. The wire envelope stays +/// frozen (GR-1) and no dependency is added. +/// +/// +/// Safety in v1 is DryRun + sandbox routing (ToQueue) + Select. The +/// Replay-Bypass guard (a bq-replay-bypass transport header surfaced to handlers so a +/// replay can skip external side-effects) is a documented phase two — it touches the runtime and +/// every transport, like the OpenTelemetry traceparent follow-up. +/// +/// +public static class Redrive +{ + /// A reserved DLQ message: its raw body plus a transport-specific ack handle. + public sealed record Reserved(string Body, object? Handle); + + /// The minimal transport surface needs. + public interface ITransport + { + /// Reserve the next message from , or null when empty. + Task PopAsync(string queue); + + /// Publish an already-encoded to . + Task PublishAsync(string queue, string body); + + /// Acknowledge (remove) a previously reserved message. + Task AckAsync(Reserved message); + } + + /// Options for a run. + /// Overrides the target queue (sandbox/redirect); when blank, each message goes back to its own dead_letter.original_queue. + /// Caps how many messages are pulled from the DLQ (0 = all available). + /// Inspect and report the plan, restoring every message unchanged. + /// Picks which messages to redrive (unselected are restored unchanged). + public sealed record Options( + string? ToQueue = null, + int Max = 0, + bool DryRun = false, + Func? Select = null); + + /// What happened to one message during a run. + public sealed record Item( + string? MessageId, + string? TraceId, + string? Urn, + string? Reason, + string From, + string? To, + bool Redriven); + + /// Summary of a run. + public sealed record Result(int Redriven, int Skipped, IReadOnlyList Items); + + /// + /// Returns a copy of reset for reprocessing: the + /// dead_letter block removed and attempts reset to 0; everything else preserved. + /// + public static Envelope Reset(Envelope envelope) + { + ArgumentNullException.ThrowIfNull(envelope); + return envelope with { Attempts = 0, DeadLetter = null }; + } + + /// + /// Drains dead-lettered messages off and re-publishes each (reset) to + /// its source queue or options.ToQueue. Messages are drained first and then processed, + /// so restored messages (skipped, dry-run, or undecodable) are not re-encountered; a DLQ + /// message is acknowledged only after a successful re-publish, and an undecodable body is + /// restored rather than dropped. On a publish failure the message is restored to the DLQ + /// before the error propagates. + /// + public static async Task RedriveAsync(ITransport transport, string dlq, Options options) + { + ArgumentNullException.ThrowIfNull(transport); + ArgumentNullException.ThrowIfNull(options); + + var batch = new List(); + while (options.Max == 0 || batch.Count < options.Max) + { + var message = await transport.PopAsync(dlq).ConfigureAwait(false); + if (message is null) + { + break; + } + + batch.Add(message); + } + + var redriven = 0; + var skipped = 0; + var items = new List(batch.Count); + + foreach (var message in batch) + { + // Decode never throws — a malformed/non-object body yields an empty envelope + // (a null Job), which is not redrivable; restore it rather than drop it. + var envelope = EnvelopeCodec.Decode(message.Body); + if (string.IsNullOrWhiteSpace(envelope.Job)) + { + await transport.PublishAsync(dlq, message.Body).ConfigureAwait(false); + await transport.AckAsync(message).ConfigureAwait(false); + skipped++; + items.Add(new Item(null, null, null, null, dlq, null, false)); + continue; + } + + var reason = envelope.DeadLetter?.Reason; + var messageId = envelope.Meta?.Id; + + if (options.Select is not null && !options.Select(envelope)) + { + await transport.PublishAsync(dlq, message.Body).ConfigureAwait(false); + await transport.AckAsync(message).ConfigureAwait(false); + skipped++; + items.Add(new Item(messageId, envelope.TraceId, envelope.Job, reason, dlq, null, false)); + continue; + } + + var target = !string.IsNullOrWhiteSpace(options.ToQueue) ? options.ToQueue : SourceQueueOf(envelope); + if (string.IsNullOrWhiteSpace(target)) + { + // no source/sandbox queue to send it to — leave it on the DLQ + await transport.PublishAsync(dlq, message.Body).ConfigureAwait(false); + await transport.AckAsync(message).ConfigureAwait(false); + skipped++; + items.Add(new Item(messageId, envelope.TraceId, envelope.Job, reason, dlq, null, false)); + continue; + } + + if (options.DryRun) + { + await transport.PublishAsync(dlq, message.Body).ConfigureAwait(false); + await transport.AckAsync(message).ConfigureAwait(false); + skipped++; + items.Add(new Item(messageId, envelope.TraceId, envelope.Job, reason, dlq, target, false)); + continue; + } + + var encoded = EnvelopeCodec.Encode(Reset(envelope)); + var published = false; + try + { + await transport.PublishAsync(target, encoded).ConfigureAwait(false); + published = true; + } + finally + { + if (!published) + { + // restore the original to the DLQ before the error propagates + await transport.PublishAsync(dlq, message.Body).ConfigureAwait(false); + await transport.AckAsync(message).ConfigureAwait(false); + } + } + + await transport.AckAsync(message).ConfigureAwait(false); + redriven++; + items.Add(new Item(messageId, envelope.TraceId, envelope.Job, reason, dlq, target, true)); + } + + return new Result(redriven, skipped, items); + } + + private static string? SourceQueueOf(Envelope envelope) + { + if (envelope.DeadLetter is { OriginalQueue: var originalQueue } + && !string.IsNullOrWhiteSpace(originalQueue)) + { + return originalQueue; + } + + return envelope.Meta?.Queue; + } +} diff --git a/tests/BabelQueue.Core.Tests/RedriveTests.cs b/tests/BabelQueue.Core.Tests/RedriveTests.cs new file mode 100644 index 0000000..a60b176 --- /dev/null +++ b/tests/BabelQueue.Core.Tests/RedriveTests.cs @@ -0,0 +1,191 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using BabelQueue; +using Xunit; + +namespace BabelQueue.Tests; + +public sealed class RedriveTests +{ + private sealed class MemoryTransport : Redrive.ITransport + { + private readonly Dictionary> _queues = new(); + private readonly string? _failQueue; + + public MemoryTransport(string? failQueue = null) => _failQueue = failQueue; + + public Task PopAsync(string queue) + { + if (_queues.TryGetValue(queue, out var q) && q.Count > 0) + { + return Task.FromResult(new Redrive.Reserved(q.Dequeue(), null)); + } + + return Task.FromResult(null); + } + + public Task PublishAsync(string queue, string body) + { + if (queue == _failQueue) + { + throw new InvalidOperationException("publish refused"); + } + + if (!_queues.TryGetValue(queue, out var q)) + { + q = new Queue(); + _queues[queue] = q; + } + + q.Enqueue(body); + return Task.CompletedTask; + } + + public Task AckAsync(Redrive.Reserved message) => Task.CompletedTask; + + public int Size(string queue) => _queues.TryGetValue(queue, out var q) ? q.Count : 0; + } + + private static async Task DeadLetteredAsync(MemoryTransport transport, string dlq, string urn, string originalQueue) + { + var env = EnvelopeCodec.Make(urn, new Dictionary { ["order_id"] = 1 }, originalQueue); + var dead = DeadLetters.Annotate(env, "failed", originalQueue); + await transport.PublishAsync(dlq, EnvelopeCodec.Encode(dead)); + return dead; + } + + private static async Task> DrainAsync(MemoryTransport transport, string queue) + { + var result = new List(); + while (await transport.PopAsync(queue) is { } message) + { + result.Add(EnvelopeCodec.Decode(message.Body)); + } + + return result; + } + + [Fact] + public async Task RedrivesToSourceAndResets() + { + var transport = new MemoryTransport(); + var orig = await DeadLetteredAsync(transport, "orders.dlq", "urn:babel:orders:created", "orders"); + + var result = await Redrive.RedriveAsync(transport, "orders.dlq", new Redrive.Options()); + + Assert.Equal(1, result.Redriven); + Assert.Equal(0, result.Skipped); + var got = await DrainAsync(transport, "orders"); + Assert.Single(got); + Assert.Null(got[0].DeadLetter); + Assert.Equal(0, got[0].Attempts); + Assert.Equal(orig.TraceId, got[0].TraceId); + Assert.Equal("urn:babel:orders:created", got[0].Job); + Assert.Equal(0, transport.Size("orders.dlq")); + } + + [Fact] + public async Task RedrivesToSandbox() + { + var transport = new MemoryTransport(); + await DeadLetteredAsync(transport, "orders.dlq", "urn:babel:orders:created", "orders"); + + var result = await Redrive.RedriveAsync(transport, "orders.dlq", new Redrive.Options(ToQueue: "sandbox")); + + Assert.Equal(1, result.Redriven); + Assert.Equal(0, transport.Size("orders")); + Assert.Equal(1, transport.Size("sandbox")); + } + + [Fact] + public async Task DryRunReportsPlanAndChangesNothing() + { + var transport = new MemoryTransport(); + await DeadLetteredAsync(transport, "orders.dlq", "urn:babel:orders:created", "orders"); + + var result = await Redrive.RedriveAsync(transport, "orders.dlq", new Redrive.Options(DryRun: true)); + + Assert.Equal(0, result.Redriven); + Assert.Equal(1, result.Skipped); + Assert.Equal("orders", result.Items[0].To); + Assert.False(result.Items[0].Redriven); + Assert.Equal(0, transport.Size("orders")); + var dlq = await DrainAsync(transport, "orders.dlq"); + Assert.Single(dlq); + Assert.NotNull(dlq[0].DeadLetter); + } + + [Fact] + public async Task SelectRedrivesOnlyMatching() + { + var transport = new MemoryTransport(); + await DeadLetteredAsync(transport, "dlq", "urn:babel:orders:created", "orders"); + await DeadLetteredAsync(transport, "dlq", "urn:babel:emails:welcome", "emails"); + + var result = await Redrive.RedriveAsync(transport, "dlq", + new Redrive.Options(Select: e => e.Job == "urn:babel:orders:created")); + + Assert.Equal(1, result.Redriven); + Assert.Equal(1, result.Skipped); + Assert.Equal(1, transport.Size("orders")); + Assert.Equal(0, transport.Size("emails")); + Assert.Equal(1, transport.Size("dlq")); + } + + [Fact] + public async Task MaxCapsHowManyArePulled() + { + var transport = new MemoryTransport(); + for (var i = 0; i < 3; i++) + { + await DeadLetteredAsync(transport, "dlq", "urn:babel:orders:created", "orders"); + } + + var result = await Redrive.RedriveAsync(transport, "dlq", new Redrive.Options(Max: 2)); + + Assert.Equal(2, result.Redriven); + Assert.Equal(1, transport.Size("dlq")); + } + + [Fact] + public async Task PublishFailureRestoresToDlq() + { + var transport = new MemoryTransport(failQueue: "orders"); + await DeadLetteredAsync(transport, "dlq", "urn:babel:orders:created", "orders"); + + await Assert.ThrowsAsync( + () => Redrive.RedriveAsync(transport, "dlq", new Redrive.Options())); + + Assert.Equal(1, transport.Size("dlq")); + Assert.Equal(0, transport.Size("orders")); + } + + [Fact] + public async Task UndecodableBodyIsRestored() + { + var transport = new MemoryTransport(); + await transport.PublishAsync("dlq", "not-json{{{"); + + var result = await Redrive.RedriveAsync(transport, "dlq", new Redrive.Options()); + + Assert.Equal(0, result.Redriven); + Assert.Equal(1, result.Skipped); + var restored = await transport.PopAsync("dlq"); + Assert.NotNull(restored); + Assert.Equal("not-json{{{", restored!.Body); + } + + [Fact] + public async Task NoDeadLetterFallsBackToMetaQueue() + { + var transport = new MemoryTransport(); + var env = EnvelopeCodec.Make("urn:babel:orders:created", queue: "orders"); + await transport.PublishAsync("dlq", EnvelopeCodec.Encode(env)); + + var result = await Redrive.RedriveAsync(transport, "dlq", new Redrive.Options()); + + Assert.Equal(1, result.Redriven); + Assert.Equal(1, transport.Size("orders")); + } +}