diff --git a/src/main/java/com/babelqueue/Redrive.java b/src/main/java/com/babelqueue/Redrive.java
new file mode 100644
index 0000000..b79eac3
--- /dev/null
+++ b/src/main/java/com/babelqueue/Redrive.java
@@ -0,0 +1,193 @@
+package com.babelqueue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Predicate;
+
+/**
+ * DLQ redrive tooling — safe replay off the dead-letter queue (ADR-0026).
+ *
+ *
The operator-side counterpart to the runtime's dead-letter routing: it reads dead-lettered
+ * messages off a DLQ and re-publishes each to its source queue (its {@code dead_letter
+ * .original_queue}) or to a chosen queue, {@linkplain #reset(Envelope) reset for reprocessing}
+ * — the {@code dead_letter} block removed and {@code attempts} reset to 0, while {@code job},
+ * {@code trace_id}, {@code data} and {@code meta} are preserved verbatim.
+ *
+ *
The Java core is codec-only (transports are separate artifacts), so {@link #redrive} works
+ * over a minimal {@link Transport} the caller implements over their broker — the same shape as
+ * the {@code otel} module's {@code Sender}. The wire envelope stays frozen (GR-1) and no
+ * dependency is added.
+ *
+ *
Safety in v1 is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The
+ * Replay-Bypass guard (a {@code bq-replay-bypass} transport header surfaced to handlers
+ * so a replay can skip external side-effects) is a documented phase two — it touches the
+ * runtime and every transport, like ADR-0025's {@code traceparent} follow-up.
+ */
+public final class Redrive {
+
+ private Redrive() {
+ }
+
+ /** A reserved DLQ message: its raw body plus a transport-specific handle used to ack it. */
+ public record Reserved(String body, Object handle) {
+ }
+
+ /** The minimal transport surface {@link #redrive} needs, implemented over any broker. */
+ public interface Transport {
+
+ /** Reserve the next message from {@code queue}, or {@code null} when it is empty. */
+ Reserved pop(String queue) throws Exception;
+
+ /** Publish an already-encoded {@code body} to {@code queue}. */
+ void publish(String queue, String body) throws Exception;
+
+ /** Acknowledge (remove) a previously reserved message. */
+ void ack(Reserved message) throws Exception;
+ }
+
+ /**
+ * Options for a {@link #redrive} run; immutable, built with the fluent withers from
+ * {@link #all()}.
+ *
+ * @param toQueue overrides the target queue (sandbox/redirect); when blank, each message
+ * goes back to its own {@code dead_letter.original_queue}
+ * @param max caps how many messages are pulled from the DLQ (0 = all available)
+ * @param dryRun inspect and report the plan, restoring every message unchanged
+ * @param select picks which messages to redrive (unselected are restored unchanged)
+ */
+ public record Options(String toQueue, int max, boolean dryRun, Predicate select) {
+
+ /** Redrive every message back to its source queue. */
+ public static Options all() {
+ return new Options(null, 0, false, null);
+ }
+
+ public Options toQueue(String queue) {
+ return new Options(queue, max, dryRun, select);
+ }
+
+ public Options max(int limit) {
+ return new Options(toQueue, limit, dryRun, select);
+ }
+
+ public Options dryRun(boolean enabled) {
+ return new Options(toQueue, max, enabled, select);
+ }
+
+ public Options select(Predicate predicate) {
+ return new Options(toQueue, max, dryRun, predicate);
+ }
+ }
+
+ /** What happened to one message during a {@link #redrive} run. */
+ public record Item(
+ String messageId,
+ String traceId,
+ String urn,
+ String reason,
+ String from,
+ String to,
+ boolean redriven
+ ) {
+ }
+
+ /** Summary of a {@link #redrive} run. */
+ public record Result(int redriven, int skipped, List- items) {
+
+ public Result {
+ items = List.copyOf(items);
+ }
+ }
+
+ /**
+ * Returns a copy of {@code env} reset for reprocessing: the {@code dead_letter} block is
+ * removed and {@code attempts} reset to 0; {@code job}, {@code trace_id}, {@code data} and
+ * {@code meta} are preserved verbatim.
+ */
+ public static Envelope reset(Envelope env) {
+ return new Envelope(env.job(), env.traceId(), env.data(), env.meta(), 0, null);
+ }
+
+ /**
+ * Drains dead-lettered messages off {@code dlq} and re-publishes each (reset) to its source
+ * queue or {@code opts.toQueue}. Messages are drained first and then processed, so restored
+ * messages (skipped, dry-run, or undecodable) are not re-encountered; a DLQ message is
+ * acknowledged only after a successful re-publish, and an undecodable body is restored
+ * rather than dropped.
+ *
+ * @return a {@link Result} with redriven/skipped counts and a per-message breakdown
+ * @throws Exception if the transport fails; on a publish failure the message is restored to
+ * the DLQ before the error is re-thrown
+ */
+ public static Result redrive(Transport transport, String dlq, Options opts) throws Exception {
+ List batch = new ArrayList<>();
+ while (opts.max() == 0 || batch.size() < opts.max()) {
+ Reserved msg = transport.pop(dlq);
+ if (msg == null) {
+ break;
+ }
+ batch.add(msg);
+ }
+
+ int redriven = 0;
+ int skipped = 0;
+ List
- items = new ArrayList<>(batch.size());
+
+ for (Reserved msg : batch) {
+ // decode() never throws — a malformed/non-object body yields an empty envelope
+ // (a null job), which is not redrivable; restore it rather than drop it.
+ Envelope env = EnvelopeCodec.decode(msg.body());
+ if (env.job() == null || env.job().isBlank()) {
+ transport.publish(dlq, msg.body());
+ transport.ack(msg);
+ skipped++;
+ items.add(new Item(null, null, null, null, dlq, null, false));
+ continue;
+ }
+
+ String reason = env.deadLetter() == null ? null : env.deadLetter().reason();
+ String messageId = env.meta() == null ? null : env.meta().id();
+
+ if (opts.select() != null && !opts.select().test(env)) {
+ transport.publish(dlq, msg.body());
+ transport.ack(msg);
+ skipped++;
+ items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false));
+ continue;
+ }
+
+ String target = opts.toQueue() != null && !opts.toQueue().isBlank()
+ ? opts.toQueue()
+ : sourceQueueOf(env);
+
+ if (opts.dryRun()) {
+ transport.publish(dlq, msg.body());
+ transport.ack(msg);
+ skipped++;
+ items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false));
+ continue;
+ }
+
+ try {
+ transport.publish(target, EnvelopeCodec.encode(reset(env)));
+ } catch (Exception publishFailure) {
+ transport.publish(dlq, msg.body());
+ transport.ack(msg);
+ throw publishFailure;
+ }
+ transport.ack(msg);
+ redriven++;
+ items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true));
+ }
+
+ return new Result(redriven, skipped, items);
+ }
+
+ private static String sourceQueueOf(Envelope env) {
+ DeadLetter dl = env.deadLetter();
+ if (dl != null && dl.originalQueue() != null && !dl.originalQueue().isBlank()) {
+ return dl.originalQueue();
+ }
+ return env.meta() == null ? null : env.meta().queue();
+ }
+}
diff --git a/src/test/java/com/babelqueue/RedriveTest.java b/src/test/java/com/babelqueue/RedriveTest.java
new file mode 100644
index 0000000..6f41b80
--- /dev/null
+++ b/src/test/java/com/babelqueue/RedriveTest.java
@@ -0,0 +1,186 @@
+package com.babelqueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+class RedriveTest {
+
+ /** In-memory transport for tests; optionally refuses to publish to one queue. */
+ private static final class MemoryTransport implements Redrive.Transport {
+ private final Map> queues = new HashMap<>();
+ private final String failQueue;
+
+ MemoryTransport() {
+ this(null);
+ }
+
+ MemoryTransport(String failQueue) {
+ this.failQueue = failQueue;
+ }
+
+ @Override
+ public Redrive.Reserved pop(String queue) {
+ Deque dq = queues.get(queue);
+ if (dq == null || dq.isEmpty()) {
+ return null;
+ }
+ return new Redrive.Reserved(dq.pollFirst(), null);
+ }
+
+ @Override
+ public void publish(String queue, String body) {
+ if (queue.equals(failQueue)) {
+ throw new IllegalStateException("publish refused");
+ }
+ queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(body);
+ }
+
+ @Override
+ public void ack(Redrive.Reserved message) {
+ // pop already removed it
+ }
+
+ int size(String queue) {
+ Deque dq = queues.get(queue);
+ return dq == null ? 0 : dq.size();
+ }
+ }
+
+ private static Envelope deadLettered(MemoryTransport t, String dlq, String urn, String originalQueue) {
+ Envelope env = EnvelopeCodec.make(urn, Map.of("order_id", 1), originalQueue, null);
+ Envelope dl = DeadLetters.annotate(env, "failed", originalQueue);
+ t.publish(dlq, EnvelopeCodec.encode(dl));
+ return dl;
+ }
+
+ private static List drain(MemoryTransport t, String queue) {
+ List out = new ArrayList<>();
+ Redrive.Reserved m;
+ while ((m = t.pop(queue)) != null) {
+ out.add(EnvelopeCodec.decode(m.body()));
+ }
+ return out;
+ }
+
+ @Test
+ void redrivesToSourceAndResets() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ Envelope orig = deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");
+
+ Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all());
+
+ assertEquals(1, res.redriven());
+ assertEquals(0, res.skipped());
+ List got = drain(t, "orders");
+ assertEquals(1, got.size());
+ assertNull(got.get(0).deadLetter(), "dead_letter must be stripped");
+ assertEquals(0, got.get(0).attempts(), "attempts must reset");
+ assertEquals(orig.traceId(), got.get(0).traceId(), "trace_id must be preserved");
+ assertEquals("urn:babel:orders:created", got.get(0).job());
+ assertEquals(0, t.size("orders.dlq"));
+ }
+
+ @Test
+ void redrivesToSandbox() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");
+
+ Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().toQueue("sandbox"));
+
+ assertEquals(1, res.redriven());
+ assertEquals(0, t.size("orders"));
+ assertEquals(1, t.size("sandbox"));
+ }
+
+ @Test
+ void dryRunReportsPlanAndChangesNothing() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");
+
+ Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().dryRun(true));
+
+ assertEquals(0, res.redriven());
+ assertEquals(1, res.skipped());
+ assertEquals("orders", res.items().get(0).to());
+ assertFalse(res.items().get(0).redriven());
+ assertEquals(0, t.size("orders"));
+ assertEquals(1, t.size("orders.dlq"));
+ assertNotNull(drain(t, "orders.dlq").get(0).deadLetter(), "DLQ message left unchanged");
+ }
+
+ @Test
+ void selectRedrivesOnlyMatching() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ deadLettered(t, "dlq", "urn:babel:orders:created", "orders");
+ deadLettered(t, "dlq", "urn:babel:emails:welcome", "emails");
+
+ Redrive.Result res = Redrive.redrive(t, "dlq",
+ Redrive.Options.all().select(e -> "urn:babel:orders:created".equals(e.job())));
+
+ assertEquals(1, res.redriven());
+ assertEquals(1, res.skipped());
+ assertEquals(1, t.size("orders"));
+ assertEquals(0, t.size("emails"));
+ assertEquals(1, t.size("dlq"), "unselected message is restored to the DLQ");
+ }
+
+ @Test
+ void maxCapsHowManyArePulled() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ for (int i = 0; i < 3; i++) {
+ deadLettered(t, "dlq", "urn:babel:orders:created", "orders");
+ }
+
+ Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all().max(2));
+
+ assertEquals(2, res.redriven());
+ assertEquals(1, t.size("dlq"));
+ }
+
+ @Test
+ void publishFailureRestoresToDlq() {
+ MemoryTransport t = new MemoryTransport("orders");
+ deadLettered(t, "dlq", "urn:babel:orders:created", "orders");
+
+ assertThrows(IllegalStateException.class,
+ () -> Redrive.redrive(t, "dlq", Redrive.Options.all()));
+
+ assertEquals(1, t.size("dlq"), "a message must be restored when its re-publish fails");
+ assertEquals(0, t.size("orders"));
+ }
+
+ @Test
+ void undecodableBodyIsRestored() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ t.publish("dlq", "not-json{{{");
+
+ Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all());
+
+ assertEquals(0, res.redriven());
+ assertEquals(1, res.skipped());
+ assertEquals("not-json{{{", t.pop("dlq").body(), "undecodable body must be restored");
+ }
+
+ @Test
+ void noDeadLetterFallsBackToMetaQueue() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ Envelope env = EnvelopeCodec.make("urn:babel:orders:created", Map.of(), "orders", null);
+ t.publish("dlq", EnvelopeCodec.encode(env));
+
+ Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all());
+
+ assertEquals(1, res.redriven());
+ assertEquals(1, t.size("orders"));
+ }
+}