diff --git a/src/main/java/com/babelqueue/Redrive.java b/src/main/java/com/babelqueue/Redrive.java new file mode 100644 index 0000000..b79eac3 --- /dev/null +++ b/src/main/java/com/babelqueue/Redrive.java @@ -0,0 +1,193 @@ +package com.babelqueue; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Predicate; + +/** + * DLQ redrive tooling — safe replay off the dead-letter queue (ADR-0026). + * + *

The operator-side counterpart to the runtime's dead-letter routing: it reads dead-lettered + * messages off a DLQ and re-publishes each to its source queue (its {@code dead_letter + * .original_queue}) or to a chosen queue, {@linkplain #reset(Envelope) reset for reprocessing} + * — the {@code dead_letter} block removed and {@code attempts} reset to 0, while {@code job}, + * {@code trace_id}, {@code data} and {@code meta} are preserved verbatim. + * + *

The Java core is codec-only (transports are separate artifacts), so {@link #redrive} works + * over a minimal {@link Transport} the caller implements over their broker — the same shape as + * the {@code otel} module's {@code Sender}. The wire envelope stays frozen (GR-1) and no + * dependency is added. + * + *

Safety in v1 is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The + * Replay-Bypass guard (a {@code bq-replay-bypass} transport header surfaced to handlers + * so a replay can skip external side-effects) is a documented phase two — it touches the + * runtime and every transport, like ADR-0025's {@code traceparent} follow-up. + */ +public final class Redrive { + + private Redrive() { + } + + /** A reserved DLQ message: its raw body plus a transport-specific handle used to ack it. */ + public record Reserved(String body, Object handle) { + } + + /** The minimal transport surface {@link #redrive} needs, implemented over any broker. */ + public interface Transport { + + /** Reserve the next message from {@code queue}, or {@code null} when it is empty. */ + Reserved pop(String queue) throws Exception; + + /** Publish an already-encoded {@code body} to {@code queue}. */ + void publish(String queue, String body) throws Exception; + + /** Acknowledge (remove) a previously reserved message. */ + void ack(Reserved message) throws Exception; + } + + /** + * Options for a {@link #redrive} run; immutable, built with the fluent withers from + * {@link #all()}. + * + * @param toQueue overrides the target queue (sandbox/redirect); when blank, each message + * goes back to its own {@code dead_letter.original_queue} + * @param max caps how many messages are pulled from the DLQ (0 = all available) + * @param dryRun inspect and report the plan, restoring every message unchanged + * @param select picks which messages to redrive (unselected are restored unchanged) + */ + public record Options(String toQueue, int max, boolean dryRun, Predicate select) { + + /** Redrive every message back to its source queue. */ + public static Options all() { + return new Options(null, 0, false, null); + } + + public Options toQueue(String queue) { + return new Options(queue, max, dryRun, select); + } + + public Options max(int limit) { + return new Options(toQueue, limit, dryRun, select); + } + + public Options dryRun(boolean enabled) { + return new Options(toQueue, max, enabled, select); + } + + public Options select(Predicate predicate) { + return new Options(toQueue, max, dryRun, predicate); + } + } + + /** What happened to one message during a {@link #redrive} run. */ + public record Item( + String messageId, + String traceId, + String urn, + String reason, + String from, + String to, + boolean redriven + ) { + } + + /** Summary of a {@link #redrive} run. */ + public record Result(int redriven, int skipped, List items) { + + public Result { + items = List.copyOf(items); + } + } + + /** + * Returns a copy of {@code env} reset for reprocessing: the {@code dead_letter} block is + * removed and {@code attempts} reset to 0; {@code job}, {@code trace_id}, {@code data} and + * {@code meta} are preserved verbatim. + */ + public static Envelope reset(Envelope env) { + return new Envelope(env.job(), env.traceId(), env.data(), env.meta(), 0, null); + } + + /** + * Drains dead-lettered messages off {@code dlq} and re-publishes each (reset) to its source + * queue or {@code opts.toQueue}. Messages are drained first and then processed, so restored + * messages (skipped, dry-run, or undecodable) are not re-encountered; a DLQ message is + * acknowledged only after a successful re-publish, and an undecodable body is restored + * rather than dropped. + * + * @return a {@link Result} with redriven/skipped counts and a per-message breakdown + * @throws Exception if the transport fails; on a publish failure the message is restored to + * the DLQ before the error is re-thrown + */ + public static Result redrive(Transport transport, String dlq, Options opts) throws Exception { + List batch = new ArrayList<>(); + while (opts.max() == 0 || batch.size() < opts.max()) { + Reserved msg = transport.pop(dlq); + if (msg == null) { + break; + } + batch.add(msg); + } + + int redriven = 0; + int skipped = 0; + List items = new ArrayList<>(batch.size()); + + for (Reserved msg : batch) { + // decode() never throws — a malformed/non-object body yields an empty envelope + // (a null job), which is not redrivable; restore it rather than drop it. + Envelope env = EnvelopeCodec.decode(msg.body()); + if (env.job() == null || env.job().isBlank()) { + transport.publish(dlq, msg.body()); + transport.ack(msg); + skipped++; + items.add(new Item(null, null, null, null, dlq, null, false)); + continue; + } + + String reason = env.deadLetter() == null ? null : env.deadLetter().reason(); + String messageId = env.meta() == null ? null : env.meta().id(); + + if (opts.select() != null && !opts.select().test(env)) { + transport.publish(dlq, msg.body()); + transport.ack(msg); + skipped++; + items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false)); + continue; + } + + String target = opts.toQueue() != null && !opts.toQueue().isBlank() + ? opts.toQueue() + : sourceQueueOf(env); + + if (opts.dryRun()) { + transport.publish(dlq, msg.body()); + transport.ack(msg); + skipped++; + items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false)); + continue; + } + + try { + transport.publish(target, EnvelopeCodec.encode(reset(env))); + } catch (Exception publishFailure) { + transport.publish(dlq, msg.body()); + transport.ack(msg); + throw publishFailure; + } + transport.ack(msg); + redriven++; + items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true)); + } + + return new Result(redriven, skipped, items); + } + + private static String sourceQueueOf(Envelope env) { + DeadLetter dl = env.deadLetter(); + if (dl != null && dl.originalQueue() != null && !dl.originalQueue().isBlank()) { + return dl.originalQueue(); + } + return env.meta() == null ? null : env.meta().queue(); + } +} diff --git a/src/test/java/com/babelqueue/RedriveTest.java b/src/test/java/com/babelqueue/RedriveTest.java new file mode 100644 index 0000000..6f41b80 --- /dev/null +++ b/src/test/java/com/babelqueue/RedriveTest.java @@ -0,0 +1,186 @@ +package com.babelqueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class RedriveTest { + + /** In-memory transport for tests; optionally refuses to publish to one queue. */ + private static final class MemoryTransport implements Redrive.Transport { + private final Map> queues = new HashMap<>(); + private final String failQueue; + + MemoryTransport() { + this(null); + } + + MemoryTransport(String failQueue) { + this.failQueue = failQueue; + } + + @Override + public Redrive.Reserved pop(String queue) { + Deque dq = queues.get(queue); + if (dq == null || dq.isEmpty()) { + return null; + } + return new Redrive.Reserved(dq.pollFirst(), null); + } + + @Override + public void publish(String queue, String body) { + if (queue.equals(failQueue)) { + throw new IllegalStateException("publish refused"); + } + queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(body); + } + + @Override + public void ack(Redrive.Reserved message) { + // pop already removed it + } + + int size(String queue) { + Deque dq = queues.get(queue); + return dq == null ? 0 : dq.size(); + } + } + + private static Envelope deadLettered(MemoryTransport t, String dlq, String urn, String originalQueue) { + Envelope env = EnvelopeCodec.make(urn, Map.of("order_id", 1), originalQueue, null); + Envelope dl = DeadLetters.annotate(env, "failed", originalQueue); + t.publish(dlq, EnvelopeCodec.encode(dl)); + return dl; + } + + private static List drain(MemoryTransport t, String queue) { + List out = new ArrayList<>(); + Redrive.Reserved m; + while ((m = t.pop(queue)) != null) { + out.add(EnvelopeCodec.decode(m.body())); + } + return out; + } + + @Test + void redrivesToSourceAndResets() throws Exception { + MemoryTransport t = new MemoryTransport(); + Envelope orig = deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders"); + + Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all()); + + assertEquals(1, res.redriven()); + assertEquals(0, res.skipped()); + List got = drain(t, "orders"); + assertEquals(1, got.size()); + assertNull(got.get(0).deadLetter(), "dead_letter must be stripped"); + assertEquals(0, got.get(0).attempts(), "attempts must reset"); + assertEquals(orig.traceId(), got.get(0).traceId(), "trace_id must be preserved"); + assertEquals("urn:babel:orders:created", got.get(0).job()); + assertEquals(0, t.size("orders.dlq")); + } + + @Test + void redrivesToSandbox() throws Exception { + MemoryTransport t = new MemoryTransport(); + deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders"); + + Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().toQueue("sandbox")); + + assertEquals(1, res.redriven()); + assertEquals(0, t.size("orders")); + assertEquals(1, t.size("sandbox")); + } + + @Test + void dryRunReportsPlanAndChangesNothing() throws Exception { + MemoryTransport t = new MemoryTransport(); + deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders"); + + Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().dryRun(true)); + + assertEquals(0, res.redriven()); + assertEquals(1, res.skipped()); + assertEquals("orders", res.items().get(0).to()); + assertFalse(res.items().get(0).redriven()); + assertEquals(0, t.size("orders")); + assertEquals(1, t.size("orders.dlq")); + assertNotNull(drain(t, "orders.dlq").get(0).deadLetter(), "DLQ message left unchanged"); + } + + @Test + void selectRedrivesOnlyMatching() throws Exception { + MemoryTransport t = new MemoryTransport(); + deadLettered(t, "dlq", "urn:babel:orders:created", "orders"); + deadLettered(t, "dlq", "urn:babel:emails:welcome", "emails"); + + Redrive.Result res = Redrive.redrive(t, "dlq", + Redrive.Options.all().select(e -> "urn:babel:orders:created".equals(e.job()))); + + assertEquals(1, res.redriven()); + assertEquals(1, res.skipped()); + assertEquals(1, t.size("orders")); + assertEquals(0, t.size("emails")); + assertEquals(1, t.size("dlq"), "unselected message is restored to the DLQ"); + } + + @Test + void maxCapsHowManyArePulled() throws Exception { + MemoryTransport t = new MemoryTransport(); + for (int i = 0; i < 3; i++) { + deadLettered(t, "dlq", "urn:babel:orders:created", "orders"); + } + + Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all().max(2)); + + assertEquals(2, res.redriven()); + assertEquals(1, t.size("dlq")); + } + + @Test + void publishFailureRestoresToDlq() { + MemoryTransport t = new MemoryTransport("orders"); + deadLettered(t, "dlq", "urn:babel:orders:created", "orders"); + + assertThrows(IllegalStateException.class, + () -> Redrive.redrive(t, "dlq", Redrive.Options.all())); + + assertEquals(1, t.size("dlq"), "a message must be restored when its re-publish fails"); + assertEquals(0, t.size("orders")); + } + + @Test + void undecodableBodyIsRestored() throws Exception { + MemoryTransport t = new MemoryTransport(); + t.publish("dlq", "not-json{{{"); + + Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all()); + + assertEquals(0, res.redriven()); + assertEquals(1, res.skipped()); + assertEquals("not-json{{{", t.pop("dlq").body(), "undecodable body must be restored"); + } + + @Test + void noDeadLetterFallsBackToMetaQueue() throws Exception { + MemoryTransport t = new MemoryTransport(); + Envelope env = EnvelopeCodec.make("urn:babel:orders:created", Map.of(), "orders", null); + t.publish("dlq", EnvelopeCodec.encode(env)); + + Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all()); + + assertEquals(1, res.redriven()); + assertEquals(1, t.size("orders")); + } +}