diff --git a/pom.xml b/pom.xml
index 6876866..adfd2dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
com.babelqueue
babelqueue-core
- 1.3.0
+ 1.4.0
jar
BabelQueue Core
diff --git a/src/main/java/com/babelqueue/Redrive.java b/src/main/java/com/babelqueue/Redrive.java
index b79eac3..67086e4 100644
--- a/src/main/java/com/babelqueue/Redrive.java
+++ b/src/main/java/com/babelqueue/Redrive.java
@@ -2,6 +2,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.function.Predicate;
/**
@@ -18,18 +19,30 @@
* the {@code otel} module's {@code Sender}. The wire envelope stays frozen (GR-1) and no
* dependency is added.
*
- *
Safety in v1 is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The
- * Replay-Bypass guard (a {@code bq-replay-bypass} transport header surfaced to handlers
- * so a replay can skip external side-effects) is a documented phase two — it touches the
- * runtime and every transport, like ADR-0025's {@code traceparent} follow-up.
+ *
Safety is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The
+ * {@code bypass} option stamps a {@code bq-replay-bypass} transport header (see {@link Replay})
+ * so a handler can skip external side-effects on a replay; it takes effect on transports that
+ * implement {@link HeaderPublisher} (ADR-0027).
*/
public final class Redrive {
private Redrive() {
}
- /** A reserved DLQ message: its raw body plus a transport-specific handle used to ack it. */
- public record Reserved(String body, Object handle) {
+ /**
+ * A reserved DLQ message: its raw body, a transport-specific handle used to ack it, and any
+ * out-of-band transport headers (e.g. the replay-bypass marker; empty when none).
+ */
+ public record Reserved(String body, Object handle, Map headers) {
+
+ /** A reserved message with no out-of-band headers. */
+ public Reserved(String body, Object handle) {
+ this(body, handle, Map.of());
+ }
+
+ public Reserved {
+ headers = headers == null ? Map.of() : Map.copyOf(headers);
+ }
}
/** The minimal transport surface {@link #redrive} needs, implemented over any broker. */
@@ -45,6 +58,16 @@ public interface Transport {
void ack(Reserved message) throws Exception;
}
+ /**
+ * An optional {@link Transport} capability: publish a body together with out-of-band transport
+ * headers (e.g. the replay-bypass marker), for brokers that carry per-message metadata. A
+ * transport that does not implement it simply does not propagate headers — {@link #redrive}
+ * falls back to plain {@link Transport#publish} (ADR-0027).
+ */
+ public interface HeaderPublisher {
+ void publishWithHeaders(String queue, String body, Map headers) throws Exception;
+ }
+
/**
* Options for a {@link #redrive} run; immutable, built with the fluent withers from
* {@link #all()}.
@@ -54,28 +77,34 @@ public interface Transport {
* @param max caps how many messages are pulled from the DLQ (0 = all available)
* @param dryRun inspect and report the plan, restoring every message unchanged
* @param select picks which messages to redrive (unselected are restored unchanged)
+ * @param bypass stamps the bq-replay-bypass header on each redriven message (see
+ * {@link Replay}); a no-op unless the transport is a {@link HeaderPublisher}
*/
- public record Options(String toQueue, int max, boolean dryRun, Predicate select) {
+ public record Options(String toQueue, int max, boolean dryRun, Predicate select, boolean bypass) {
/** Redrive every message back to its source queue. */
public static Options all() {
- return new Options(null, 0, false, null);
+ return new Options(null, 0, false, null, false);
}
public Options toQueue(String queue) {
- return new Options(queue, max, dryRun, select);
+ return new Options(queue, max, dryRun, select, bypass);
}
public Options max(int limit) {
- return new Options(toQueue, limit, dryRun, select);
+ return new Options(toQueue, limit, dryRun, select, bypass);
}
public Options dryRun(boolean enabled) {
- return new Options(toQueue, max, enabled, select);
+ return new Options(toQueue, max, enabled, select, bypass);
}
public Options select(Predicate predicate) {
- return new Options(toQueue, max, dryRun, predicate);
+ return new Options(toQueue, max, dryRun, predicate, bypass);
+ }
+
+ public Options bypass(boolean enabled) {
+ return new Options(toQueue, max, dryRun, select, enabled);
}
}
@@ -87,7 +116,8 @@ public record Item(
String reason,
String from,
String to,
- boolean redriven
+ boolean redriven,
+ boolean bypassed
) {
}
@@ -141,7 +171,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
- items.add(new Item(null, null, null, null, dlq, null, false));
+ items.add(new Item(null, null, null, null, dlq, null, false, false));
continue;
}
@@ -152,7 +182,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
- items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false));
+ items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false, false));
continue;
}
@@ -164,12 +194,19 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
- items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false));
+ items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false, false));
continue;
}
+ boolean bypassed = false;
try {
- transport.publish(target, EnvelopeCodec.encode(reset(env)));
+ String encoded = EnvelopeCodec.encode(reset(env));
+ if (opts.bypass() && transport instanceof HeaderPublisher hp) {
+ hp.publishWithHeaders(target, encoded, Map.of(Replay.HEADER_REPLAY_BYPASS, "1"));
+ bypassed = true;
+ } else {
+ transport.publish(target, encoded);
+ }
} catch (Exception publishFailure) {
transport.publish(dlq, msg.body());
transport.ack(msg);
@@ -177,7 +214,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro
}
transport.ack(msg);
redriven++;
- items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true));
+ items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true, bypassed));
}
return new Result(redriven, skipped, items);
diff --git a/src/main/java/com/babelqueue/Replay.java b/src/main/java/com/babelqueue/Replay.java
new file mode 100644
index 0000000..b3fae35
--- /dev/null
+++ b/src/main/java/com/babelqueue/Replay.java
@@ -0,0 +1,94 @@
+package com.babelqueue;
+
+import java.util.Map;
+
+/**
+ * Replay-Bypass — a side-effect guard for DLQ replay (ADR-0027).
+ *
+ * A deliberate replay off the dead-letter queue ({@link Redrive}) re-runs the handler, and
+ * the handler's external side-effects re-fire: a second charge, a duplicate email.
+ * {@code Idempotent.wrap} stops an accidental duplicate; it does not stop the
+ * intended reprocess from re-firing effects that already happened. This closes that gap.
+ *
+ *
The marker that says "this is a replay, skip the external effects" rides out of band
+ * as the {@link #HEADER_REPLAY_BYPASS} transport header — never in the frozen envelope (GR-1).
+ * {@link Redrive#redrive} stamps it when its options set {@code bypass} and the transport is a
+ * {@link Redrive.HeaderPublisher}; a consume adapter, having reserved the message with its
+ * headers, surfaces the flag for the duration of the handler via {@link #process}:
+ *
+ *
{@code
+ * Redrive.Reserved msg = transport.pop(queue);
+ * Envelope env = EnvelopeCodec.decode(msg.body());
+ * Replay.process(msg.headers(), () -> handler.handle(env)); // handler can now query isReplay()
+ * }
+ *
+ * A handler wraps its external, non-idempotent side in {@link #bypassExternalEffects} so a
+ * replay re-runs the idempotent core but skips effects that already fired. The Java core is
+ * codec-only, so this is the core/runtime API + in-memory testing; a concrete broker transport
+ * carries the header once it implements {@link Redrive.HeaderPublisher} (a follow-up).
+ */
+public final class Replay {
+
+ /**
+ * The out-of-band transport header {@link Redrive} stamps (with {@code bypass}) on a replayed
+ * message, and that a consume adapter surfaces to the handler via {@link #process}.
+ */
+ public static final String HEADER_REPLAY_BYPASS = "bq-replay-bypass";
+
+ private static final ThreadLocal REPLAY = ThreadLocal.withInitial(() -> Boolean.FALSE);
+
+ private Replay() {
+ }
+
+ /** An action that may throw — the external side of a handler. */
+ @FunctionalInterface
+ public interface Effect {
+ void run() throws Exception;
+ }
+
+ /**
+ * Reports whether the message currently being handled was redriven with the replay-bypass
+ * marker — i.e. a deliberate replay whose external side-effects should be skipped. Meaningful
+ * only inside a {@link #process} scope the consumer established from the message's headers.
+ *
+ * @return whether the current handling is a bypassed replay
+ */
+ public static boolean isReplay() {
+ return REPLAY.get();
+ }
+
+ /**
+ * Runs {@code body} with the replay flag derived from a reserved message's transport headers
+ * (the presence of {@link #HEADER_REPLAY_BYPASS}), restoring the prior flag afterwards. A
+ * consume adapter wraps each handler invocation in this.
+ *
+ * @param headers the reserved message's out-of-band headers (may be null/empty)
+ * @param body the handler invocation to run within the replay scope
+ * @throws Exception whatever {@code body} throws
+ */
+ public static void process(Map headers, Effect body) throws Exception {
+ boolean replay = headers != null && headers.containsKey(HEADER_REPLAY_BYPASS);
+ boolean previous = REPLAY.get();
+ REPLAY.set(replay);
+ try {
+ body.run();
+ } finally {
+ REPLAY.set(previous);
+ }
+ }
+
+ /**
+ * Runs {@code effect} unless the current message is a {@linkplain #isReplay replay}, in which
+ * case it is skipped. Wrap the external, non-idempotent side of a handler — sending an email,
+ * charging a card, calling a third party — so a replay re-runs the idempotent core but does
+ * not re-fire effects that already happened.
+ *
+ * @param effect the external side-effect to run only when this is not a replay
+ * @throws Exception whatever {@code effect} throws
+ */
+ public static void bypassExternalEffects(Effect effect) throws Exception {
+ if (!isReplay()) {
+ effect.run();
+ }
+ }
+}
diff --git a/src/test/java/com/babelqueue/ReplayTest.java b/src/test/java/com/babelqueue/ReplayTest.java
new file mode 100644
index 0000000..23a4c81
--- /dev/null
+++ b/src/test/java/com/babelqueue/ReplayTest.java
@@ -0,0 +1,131 @@
+package com.babelqueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.babelqueue.idempotency.Handler;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.jupiter.api.Test;
+
+class ReplayTest {
+
+ /** In-memory transport that carries out-of-band headers (a {@link Redrive.HeaderPublisher}). */
+ private static final class MemoryTransport implements Redrive.Transport, Redrive.HeaderPublisher {
+ private final Map> queues = new HashMap<>();
+
+ @Override
+ public Redrive.Reserved pop(String queue) {
+ Deque dq = queues.get(queue);
+ return (dq == null || dq.isEmpty()) ? null : dq.pollFirst();
+ }
+
+ @Override
+ public void publish(String queue, String body) {
+ queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(new Redrive.Reserved(body, null));
+ }
+
+ @Override
+ public void publishWithHeaders(String queue, String body, Map headers) {
+ queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(new Redrive.Reserved(body, null, headers));
+ }
+
+ @Override
+ public void ack(Redrive.Reserved message) {
+ // pop already removed it
+ }
+ }
+
+ /** In-memory transport with no header capability (NOT a {@link Redrive.HeaderPublisher}). */
+ private static final class PlainTransport implements Redrive.Transport {
+ private final Map> queues = new HashMap<>();
+
+ @Override
+ public Redrive.Reserved pop(String queue) {
+ Deque dq = queues.get(queue);
+ return (dq == null || dq.isEmpty()) ? null : new Redrive.Reserved(dq.pollFirst(), null);
+ }
+
+ @Override
+ public void publish(String queue, String body) {
+ queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(body);
+ }
+
+ @Override
+ public void ack(Redrive.Reserved message) {
+ // pop already removed it
+ }
+ }
+
+ private static void deadLettered(Redrive.Transport t, String dlq, String urn, String originalQueue)
+ throws Exception {
+ Envelope env = EnvelopeCodec.make(urn, Map.of("order_id", 1), originalQueue, null);
+ Envelope dl = DeadLetters.annotate(env, "failed", originalQueue);
+ t.publish(dlq, EnvelopeCodec.encode(dl));
+ }
+
+ @Test
+ void isReplayDefaultsFalseAndBypassRunsEffect() throws Exception {
+ assertFalse(Replay.isReplay());
+ boolean[] ran = {false};
+ Replay.bypassExternalEffects(() -> ran[0] = true);
+ assertTrue(ran[0], "effect must run when not a replay");
+ }
+
+ @Test
+ void processEstablishesReplayScopeAndBypassSkips() throws Exception {
+ boolean[] ran = {false};
+ boolean[] sawReplay = {false};
+ Replay.process(Map.of(Replay.HEADER_REPLAY_BYPASS, "1"), () -> {
+ sawReplay[0] = Replay.isReplay();
+ Replay.bypassExternalEffects(() -> ran[0] = true);
+ });
+ assertTrue(sawReplay[0], "isReplay must be true inside the process scope");
+ assertFalse(ran[0], "the effect must be skipped on a replay");
+ assertFalse(Replay.isReplay(), "the flag must be restored after process");
+ }
+
+ @Test
+ void processWithoutHeaderIsNotReplay() throws Exception {
+ boolean[] ran = {false};
+ Replay.process(Map.of(), () -> Replay.bypassExternalEffects(() -> ran[0] = true));
+ assertTrue(ran[0], "a message with no replay header is not a replay");
+ }
+
+ @Test
+ void redriveBypassStampsHeaderAndConsumeSkipsEffect() throws Exception {
+ MemoryTransport t = new MemoryTransport();
+ deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");
+
+ Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().bypass(true));
+ assertEquals(1, res.redriven());
+ assertTrue(res.items().get(0).bypassed(), "the item must be flagged bypassed");
+
+ Redrive.Reserved msg = t.pop("orders");
+ assertNotNull(msg);
+ assertEquals("1", msg.headers().get(Replay.HEADER_REPLAY_BYPASS), "redriven message carries the header");
+
+ Envelope env = EnvelopeCodec.decode(msg.body());
+ boolean[] emailed = {false};
+ Handler handler = e -> {
+ assertTrue(Replay.isReplay(), "the handler should see this as a replay");
+ Replay.bypassExternalEffects(() -> emailed[0] = true);
+ };
+ Replay.process(msg.headers(), () -> handler.handle(env));
+ assertFalse(emailed[0], "the external side-effect must be skipped on a bypassed replay");
+ }
+
+ @Test
+ void bypassWithoutHeaderPublisherFallsBack() throws Exception {
+ PlainTransport t = new PlainTransport();
+ deadLettered(t, "dlq", "urn:babel:orders:created", "orders");
+
+ Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all().bypass(true));
+ assertEquals(1, res.redriven());
+ assertFalse(res.items().get(0).bypassed(), "bypass must be a no-op without a HeaderPublisher");
+ }
+}