diff --git a/pom.xml b/pom.xml index 6876866..adfd2dc 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.babelqueue babelqueue-core - 1.3.0 + 1.4.0 jar BabelQueue Core diff --git a/src/main/java/com/babelqueue/Redrive.java b/src/main/java/com/babelqueue/Redrive.java index b79eac3..67086e4 100644 --- a/src/main/java/com/babelqueue/Redrive.java +++ b/src/main/java/com/babelqueue/Redrive.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.function.Predicate; /** @@ -18,18 +19,30 @@ * the {@code otel} module's {@code Sender}. The wire envelope stays frozen (GR-1) and no * dependency is added. * - *

Safety in v1 is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The - * Replay-Bypass guard (a {@code bq-replay-bypass} transport header surfaced to handlers - * so a replay can skip external side-effects) is a documented phase two — it touches the - * runtime and every transport, like ADR-0025's {@code traceparent} follow-up. + *

Safety is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The + * {@code bypass} option stamps a {@code bq-replay-bypass} transport header (see {@link Replay}) + * so a handler can skip external side-effects on a replay; it takes effect on transports that + * implement {@link HeaderPublisher} (ADR-0027). */ public final class Redrive { private Redrive() { } - /** A reserved DLQ message: its raw body plus a transport-specific handle used to ack it. */ - public record Reserved(String body, Object handle) { + /** + * A reserved DLQ message: its raw body, a transport-specific handle used to ack it, and any + * out-of-band transport headers (e.g. the replay-bypass marker; empty when none). + */ + public record Reserved(String body, Object handle, Map headers) { + + /** A reserved message with no out-of-band headers. */ + public Reserved(String body, Object handle) { + this(body, handle, Map.of()); + } + + public Reserved { + headers = headers == null ? Map.of() : Map.copyOf(headers); + } } /** The minimal transport surface {@link #redrive} needs, implemented over any broker. */ @@ -45,6 +58,16 @@ public interface Transport { void ack(Reserved message) throws Exception; } + /** + * An optional {@link Transport} capability: publish a body together with out-of-band transport + * headers (e.g. the replay-bypass marker), for brokers that carry per-message metadata. A + * transport that does not implement it simply does not propagate headers — {@link #redrive} + * falls back to plain {@link Transport#publish} (ADR-0027). + */ + public interface HeaderPublisher { + void publishWithHeaders(String queue, String body, Map headers) throws Exception; + } + /** * Options for a {@link #redrive} run; immutable, built with the fluent withers from * {@link #all()}. @@ -54,28 +77,34 @@ public interface Transport { * @param max caps how many messages are pulled from the DLQ (0 = all available) * @param dryRun inspect and report the plan, restoring every message unchanged * @param select picks which messages to redrive (unselected are restored unchanged) + * @param bypass stamps the bq-replay-bypass header on each redriven message (see + * {@link Replay}); a no-op unless the transport is a {@link HeaderPublisher} */ - public record Options(String toQueue, int max, boolean dryRun, Predicate select) { + public record Options(String toQueue, int max, boolean dryRun, Predicate select, boolean bypass) { /** Redrive every message back to its source queue. */ public static Options all() { - return new Options(null, 0, false, null); + return new Options(null, 0, false, null, false); } public Options toQueue(String queue) { - return new Options(queue, max, dryRun, select); + return new Options(queue, max, dryRun, select, bypass); } public Options max(int limit) { - return new Options(toQueue, limit, dryRun, select); + return new Options(toQueue, limit, dryRun, select, bypass); } public Options dryRun(boolean enabled) { - return new Options(toQueue, max, enabled, select); + return new Options(toQueue, max, enabled, select, bypass); } public Options select(Predicate predicate) { - return new Options(toQueue, max, dryRun, predicate); + return new Options(toQueue, max, dryRun, predicate, bypass); + } + + public Options bypass(boolean enabled) { + return new Options(toQueue, max, dryRun, select, enabled); } } @@ -87,7 +116,8 @@ public record Item( String reason, String from, String to, - boolean redriven + boolean redriven, + boolean bypassed ) { } @@ -141,7 +171,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro transport.publish(dlq, msg.body()); transport.ack(msg); skipped++; - items.add(new Item(null, null, null, null, dlq, null, false)); + items.add(new Item(null, null, null, null, dlq, null, false, false)); continue; } @@ -152,7 +182,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro transport.publish(dlq, msg.body()); transport.ack(msg); skipped++; - items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false)); + items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false, false)); continue; } @@ -164,12 +194,19 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro transport.publish(dlq, msg.body()); transport.ack(msg); skipped++; - items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false)); + items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false, false)); continue; } + boolean bypassed = false; try { - transport.publish(target, EnvelopeCodec.encode(reset(env))); + String encoded = EnvelopeCodec.encode(reset(env)); + if (opts.bypass() && transport instanceof HeaderPublisher hp) { + hp.publishWithHeaders(target, encoded, Map.of(Replay.HEADER_REPLAY_BYPASS, "1")); + bypassed = true; + } else { + transport.publish(target, encoded); + } } catch (Exception publishFailure) { transport.publish(dlq, msg.body()); transport.ack(msg); @@ -177,7 +214,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro } transport.ack(msg); redriven++; - items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true)); + items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true, bypassed)); } return new Result(redriven, skipped, items); diff --git a/src/main/java/com/babelqueue/Replay.java b/src/main/java/com/babelqueue/Replay.java new file mode 100644 index 0000000..b3fae35 --- /dev/null +++ b/src/main/java/com/babelqueue/Replay.java @@ -0,0 +1,94 @@ +package com.babelqueue; + +import java.util.Map; + +/** + * Replay-Bypass — a side-effect guard for DLQ replay (ADR-0027). + * + *

A deliberate replay off the dead-letter queue ({@link Redrive}) re-runs the handler, and + * the handler's external side-effects re-fire: a second charge, a duplicate email. + * {@code Idempotent.wrap} stops an accidental duplicate; it does not stop the + * intended reprocess from re-firing effects that already happened. This closes that gap. + * + *

The marker that says "this is a replay, skip the external effects" rides out of band + * as the {@link #HEADER_REPLAY_BYPASS} transport header — never in the frozen envelope (GR-1). + * {@link Redrive#redrive} stamps it when its options set {@code bypass} and the transport is a + * {@link Redrive.HeaderPublisher}; a consume adapter, having reserved the message with its + * headers, surfaces the flag for the duration of the handler via {@link #process}: + * + *

{@code
+ * Redrive.Reserved msg = transport.pop(queue);
+ * Envelope env = EnvelopeCodec.decode(msg.body());
+ * Replay.process(msg.headers(), () -> handler.handle(env)); // handler can now query isReplay()
+ * }
+ * + *

A handler wraps its external, non-idempotent side in {@link #bypassExternalEffects} so a + * replay re-runs the idempotent core but skips effects that already fired. The Java core is + * codec-only, so this is the core/runtime API + in-memory testing; a concrete broker transport + * carries the header once it implements {@link Redrive.HeaderPublisher} (a follow-up). + */ +public final class Replay { + + /** + * The out-of-band transport header {@link Redrive} stamps (with {@code bypass}) on a replayed + * message, and that a consume adapter surfaces to the handler via {@link #process}. + */ + public static final String HEADER_REPLAY_BYPASS = "bq-replay-bypass"; + + private static final ThreadLocal REPLAY = ThreadLocal.withInitial(() -> Boolean.FALSE); + + private Replay() { + } + + /** An action that may throw — the external side of a handler. */ + @FunctionalInterface + public interface Effect { + void run() throws Exception; + } + + /** + * Reports whether the message currently being handled was redriven with the replay-bypass + * marker — i.e. a deliberate replay whose external side-effects should be skipped. Meaningful + * only inside a {@link #process} scope the consumer established from the message's headers. + * + * @return whether the current handling is a bypassed replay + */ + public static boolean isReplay() { + return REPLAY.get(); + } + + /** + * Runs {@code body} with the replay flag derived from a reserved message's transport headers + * (the presence of {@link #HEADER_REPLAY_BYPASS}), restoring the prior flag afterwards. A + * consume adapter wraps each handler invocation in this. + * + * @param headers the reserved message's out-of-band headers (may be null/empty) + * @param body the handler invocation to run within the replay scope + * @throws Exception whatever {@code body} throws + */ + public static void process(Map headers, Effect body) throws Exception { + boolean replay = headers != null && headers.containsKey(HEADER_REPLAY_BYPASS); + boolean previous = REPLAY.get(); + REPLAY.set(replay); + try { + body.run(); + } finally { + REPLAY.set(previous); + } + } + + /** + * Runs {@code effect} unless the current message is a {@linkplain #isReplay replay}, in which + * case it is skipped. Wrap the external, non-idempotent side of a handler — sending an email, + * charging a card, calling a third party — so a replay re-runs the idempotent core but does + * not re-fire effects that already happened. + * + * @param effect the external side-effect to run only when this is not a replay + * @throws Exception whatever {@code effect} throws + */ + public static void bypassExternalEffects(Effect effect) throws Exception { + if (!isReplay()) { + effect.run(); + } + } +} diff --git a/src/test/java/com/babelqueue/ReplayTest.java b/src/test/java/com/babelqueue/ReplayTest.java new file mode 100644 index 0000000..23a4c81 --- /dev/null +++ b/src/test/java/com/babelqueue/ReplayTest.java @@ -0,0 +1,131 @@ +package com.babelqueue; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.babelqueue.idempotency.Handler; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class ReplayTest { + + /** In-memory transport that carries out-of-band headers (a {@link Redrive.HeaderPublisher}). */ + private static final class MemoryTransport implements Redrive.Transport, Redrive.HeaderPublisher { + private final Map> queues = new HashMap<>(); + + @Override + public Redrive.Reserved pop(String queue) { + Deque dq = queues.get(queue); + return (dq == null || dq.isEmpty()) ? null : dq.pollFirst(); + } + + @Override + public void publish(String queue, String body) { + queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(new Redrive.Reserved(body, null)); + } + + @Override + public void publishWithHeaders(String queue, String body, Map headers) { + queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(new Redrive.Reserved(body, null, headers)); + } + + @Override + public void ack(Redrive.Reserved message) { + // pop already removed it + } + } + + /** In-memory transport with no header capability (NOT a {@link Redrive.HeaderPublisher}). */ + private static final class PlainTransport implements Redrive.Transport { + private final Map> queues = new HashMap<>(); + + @Override + public Redrive.Reserved pop(String queue) { + Deque dq = queues.get(queue); + return (dq == null || dq.isEmpty()) ? null : new Redrive.Reserved(dq.pollFirst(), null); + } + + @Override + public void publish(String queue, String body) { + queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(body); + } + + @Override + public void ack(Redrive.Reserved message) { + // pop already removed it + } + } + + private static void deadLettered(Redrive.Transport t, String dlq, String urn, String originalQueue) + throws Exception { + Envelope env = EnvelopeCodec.make(urn, Map.of("order_id", 1), originalQueue, null); + Envelope dl = DeadLetters.annotate(env, "failed", originalQueue); + t.publish(dlq, EnvelopeCodec.encode(dl)); + } + + @Test + void isReplayDefaultsFalseAndBypassRunsEffect() throws Exception { + assertFalse(Replay.isReplay()); + boolean[] ran = {false}; + Replay.bypassExternalEffects(() -> ran[0] = true); + assertTrue(ran[0], "effect must run when not a replay"); + } + + @Test + void processEstablishesReplayScopeAndBypassSkips() throws Exception { + boolean[] ran = {false}; + boolean[] sawReplay = {false}; + Replay.process(Map.of(Replay.HEADER_REPLAY_BYPASS, "1"), () -> { + sawReplay[0] = Replay.isReplay(); + Replay.bypassExternalEffects(() -> ran[0] = true); + }); + assertTrue(sawReplay[0], "isReplay must be true inside the process scope"); + assertFalse(ran[0], "the effect must be skipped on a replay"); + assertFalse(Replay.isReplay(), "the flag must be restored after process"); + } + + @Test + void processWithoutHeaderIsNotReplay() throws Exception { + boolean[] ran = {false}; + Replay.process(Map.of(), () -> Replay.bypassExternalEffects(() -> ran[0] = true)); + assertTrue(ran[0], "a message with no replay header is not a replay"); + } + + @Test + void redriveBypassStampsHeaderAndConsumeSkipsEffect() throws Exception { + MemoryTransport t = new MemoryTransport(); + deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders"); + + Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().bypass(true)); + assertEquals(1, res.redriven()); + assertTrue(res.items().get(0).bypassed(), "the item must be flagged bypassed"); + + Redrive.Reserved msg = t.pop("orders"); + assertNotNull(msg); + assertEquals("1", msg.headers().get(Replay.HEADER_REPLAY_BYPASS), "redriven message carries the header"); + + Envelope env = EnvelopeCodec.decode(msg.body()); + boolean[] emailed = {false}; + Handler handler = e -> { + assertTrue(Replay.isReplay(), "the handler should see this as a replay"); + Replay.bypassExternalEffects(() -> emailed[0] = true); + }; + Replay.process(msg.headers(), () -> handler.handle(env)); + assertFalse(emailed[0], "the external side-effect must be skipped on a bypassed replay"); + } + + @Test + void bypassWithoutHeaderPublisherFallsBack() throws Exception { + PlainTransport t = new PlainTransport(); + deadLettered(t, "dlq", "urn:babel:orders:created", "orders"); + + Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all().bypass(true)); + assertEquals(1, res.redriven()); + assertFalse(res.items().get(0).bypassed(), "bypass must be a no-op without a HeaderPublisher"); + } +}