Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.babelqueue</groupId>
<artifactId>babelqueue-core</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<packaging>jar</packaging>

<name>BabelQueue Core</name>
Expand Down
73 changes: 55 additions & 18 deletions src/main/java/com/babelqueue/Redrive.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

/**
Expand All @@ -18,18 +19,30 @@
* the {@code otel} module's {@code Sender}. The wire envelope stays frozen (GR-1) and no
* dependency is added.
*
* <p>Safety in v1 is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The
* <b>Replay-Bypass</b> guard (a {@code bq-replay-bypass} transport header surfaced to handlers
* so a replay can skip external side-effects) is a documented phase two — it touches the
* runtime and every transport, like ADR-0025's {@code traceparent} follow-up.
* <p>Safety is {@code dryRun} + sandbox routing ({@code toQueue}) + {@code select}. The
* {@code bypass} option stamps a {@code bq-replay-bypass} transport header (see {@link Replay})
* so a handler can skip external side-effects on a replay; it takes effect on transports that
* implement {@link HeaderPublisher} (ADR-0027).
*/
public final class Redrive {

private Redrive() {
}

/** A reserved DLQ message: its raw body plus a transport-specific handle used to ack it. */
public record Reserved(String body, Object handle) {
/**
* A reserved DLQ message: its raw body, a transport-specific handle used to ack it, and any
* out-of-band transport headers (e.g. the replay-bypass marker; empty when none).
*/
public record Reserved(String body, Object handle, Map<String, String> headers) {

/** A reserved message with no out-of-band headers. */
public Reserved(String body, Object handle) {
this(body, handle, Map.of());
}

public Reserved {
headers = headers == null ? Map.of() : Map.copyOf(headers);
}
}

/** The minimal transport surface {@link #redrive} needs, implemented over any broker. */
Expand All @@ -45,6 +58,16 @@ public interface Transport {
void ack(Reserved message) throws Exception;
}

/**
* An optional {@link Transport} capability: publish a body together with out-of-band transport
* headers (e.g. the replay-bypass marker), for brokers that carry per-message metadata. A
* transport that does not implement it simply does not propagate headers — {@link #redrive}
* falls back to plain {@link Transport#publish} (ADR-0027).
*/
public interface HeaderPublisher {
void publishWithHeaders(String queue, String body, Map<String, String> headers) throws Exception;
}

/**
* Options for a {@link #redrive} run; immutable, built with the fluent withers from
* {@link #all()}.
Expand All @@ -54,28 +77,34 @@ public interface Transport {
* @param max caps how many messages are pulled from the DLQ (0 = all available)
* @param dryRun inspect and report the plan, restoring every message unchanged
* @param select picks which messages to redrive (unselected are restored unchanged)
* @param bypass stamps the bq-replay-bypass header on each redriven message (see
* {@link Replay}); a no-op unless the transport is a {@link HeaderPublisher}
*/
public record Options(String toQueue, int max, boolean dryRun, Predicate<Envelope> select) {
public record Options(String toQueue, int max, boolean dryRun, Predicate<Envelope> select, boolean bypass) {

/** Redrive every message back to its source queue. */
public static Options all() {
return new Options(null, 0, false, null);
return new Options(null, 0, false, null, false);
}

public Options toQueue(String queue) {
return new Options(queue, max, dryRun, select);
return new Options(queue, max, dryRun, select, bypass);
}

public Options max(int limit) {
return new Options(toQueue, limit, dryRun, select);
return new Options(toQueue, limit, dryRun, select, bypass);
}

public Options dryRun(boolean enabled) {
return new Options(toQueue, max, enabled, select);
return new Options(toQueue, max, enabled, select, bypass);
}

public Options select(Predicate<Envelope> predicate) {
return new Options(toQueue, max, dryRun, predicate);
return new Options(toQueue, max, dryRun, predicate, bypass);
}

public Options bypass(boolean enabled) {
return new Options(toQueue, max, dryRun, select, enabled);
}
}

Expand All @@ -87,7 +116,8 @@ public record Item(
String reason,
String from,
String to,
boolean redriven
boolean redriven,
boolean bypassed
) {
}

Expand Down Expand Up @@ -141,7 +171,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
items.add(new Item(null, null, null, null, dlq, null, false));
items.add(new Item(null, null, null, null, dlq, null, false, false));
continue;
}

Expand All @@ -152,7 +182,7 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false));
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, null, false, false));
continue;
}

Expand All @@ -164,20 +194,27 @@ public static Result redrive(Transport transport, String dlq, Options opts) thro
transport.publish(dlq, msg.body());
transport.ack(msg);
skipped++;
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false));
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, false, false));
continue;
}

boolean bypassed = false;
try {
transport.publish(target, EnvelopeCodec.encode(reset(env)));
String encoded = EnvelopeCodec.encode(reset(env));
if (opts.bypass() && transport instanceof HeaderPublisher hp) {
hp.publishWithHeaders(target, encoded, Map.of(Replay.HEADER_REPLAY_BYPASS, "1"));
bypassed = true;
} else {
transport.publish(target, encoded);
}
} catch (Exception publishFailure) {
transport.publish(dlq, msg.body());
transport.ack(msg);
throw publishFailure;
}
transport.ack(msg);
redriven++;
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true));
items.add(new Item(messageId, env.traceId(), env.job(), reason, dlq, target, true, bypassed));
}

return new Result(redriven, skipped, items);
Expand Down
94 changes: 94 additions & 0 deletions src/main/java/com/babelqueue/Replay.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.babelqueue;

import java.util.Map;

/**
* Replay-Bypass — a side-effect guard for DLQ replay (ADR-0027).
*
* <p>A deliberate replay off the dead-letter queue ({@link Redrive}) re-runs the handler, and
* the handler's external side-effects re-fire: a second charge, a duplicate email.
* {@code Idempotent.wrap} stops an <i>accidental</i> duplicate; it does not stop the
* <i>intended</i> reprocess from re-firing effects that already happened. This closes that gap.
*
* <p>The marker that says "this is a replay, skip the external effects" rides <b>out of band</b>
* as the {@link #HEADER_REPLAY_BYPASS} transport header — never in the frozen envelope (GR-1).
* {@link Redrive#redrive} stamps it when its options set {@code bypass} and the transport is a
* {@link Redrive.HeaderPublisher}; a consume adapter, having reserved the message with its
* headers, surfaces the flag for the duration of the handler via {@link #process}:
*
* <pre>{@code
* Redrive.Reserved msg = transport.pop(queue);
* Envelope env = EnvelopeCodec.decode(msg.body());
* Replay.process(msg.headers(), () -> handler.handle(env)); // handler can now query isReplay()
* }</pre>
*
* <p>A handler wraps its external, non-idempotent side in {@link #bypassExternalEffects} so a
* replay re-runs the idempotent core but skips effects that already fired. The Java core is
* codec-only, so this is the core/runtime API + in-memory testing; a concrete broker transport
* carries the header once it implements {@link Redrive.HeaderPublisher} (a follow-up).
*/
public final class Replay {

/**
* The out-of-band transport header {@link Redrive} stamps (with {@code bypass}) on a replayed
* message, and that a consume adapter surfaces to the handler via {@link #process}.
*/
public static final String HEADER_REPLAY_BYPASS = "bq-replay-bypass";

private static final ThreadLocal<Boolean> REPLAY = ThreadLocal.withInitial(() -> Boolean.FALSE);

private Replay() {
}

/** An action that may throw — the external side of a handler. */
@FunctionalInterface
public interface Effect {
void run() throws Exception;
}

/**
* Reports whether the message currently being handled was redriven with the replay-bypass
* marker — i.e. a deliberate replay whose external side-effects should be skipped. Meaningful
* only inside a {@link #process} scope the consumer established from the message's headers.
*
* @return whether the current handling is a bypassed replay
*/
public static boolean isReplay() {
return REPLAY.get();
}

/**
* Runs {@code body} with the replay flag derived from a reserved message's transport headers
* (the presence of {@link #HEADER_REPLAY_BYPASS}), restoring the prior flag afterwards. A
* consume adapter wraps each handler invocation in this.
*
* @param headers the reserved message's out-of-band headers (may be null/empty)
* @param body the handler invocation to run within the replay scope
* @throws Exception whatever {@code body} throws
*/
public static void process(Map<String, String> headers, Effect body) throws Exception {
boolean replay = headers != null && headers.containsKey(HEADER_REPLAY_BYPASS);
boolean previous = REPLAY.get();
REPLAY.set(replay);
try {
body.run();
} finally {
REPLAY.set(previous);
}
}

/**
* Runs {@code effect} unless the current message is a {@linkplain #isReplay replay}, in which
* case it is skipped. Wrap the external, non-idempotent side of a handler — sending an email,
* charging a card, calling a third party — so a replay re-runs the idempotent core but does
* not re-fire effects that already happened.
*
* @param effect the external side-effect to run only when this is not a replay
* @throws Exception whatever {@code effect} throws
*/
public static void bypassExternalEffects(Effect effect) throws Exception {
if (!isReplay()) {
effect.run();
}
}
}
131 changes: 131 additions & 0 deletions src/test/java/com/babelqueue/ReplayTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.babelqueue;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.babelqueue.idempotency.Handler;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import org.junit.jupiter.api.Test;

class ReplayTest {

/** In-memory transport that carries out-of-band headers (a {@link Redrive.HeaderPublisher}). */
private static final class MemoryTransport implements Redrive.Transport, Redrive.HeaderPublisher {
private final Map<String, Deque<Redrive.Reserved>> queues = new HashMap<>();

@Override
public Redrive.Reserved pop(String queue) {
Deque<Redrive.Reserved> dq = queues.get(queue);
return (dq == null || dq.isEmpty()) ? null : dq.pollFirst();
}

@Override
public void publish(String queue, String body) {
queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(new Redrive.Reserved(body, null));
}

@Override
public void publishWithHeaders(String queue, String body, Map<String, String> headers) {
queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(new Redrive.Reserved(body, null, headers));
}

@Override
public void ack(Redrive.Reserved message) {
// pop already removed it
}
}

/** In-memory transport with no header capability (NOT a {@link Redrive.HeaderPublisher}). */
private static final class PlainTransport implements Redrive.Transport {
private final Map<String, Deque<String>> queues = new HashMap<>();

@Override
public Redrive.Reserved pop(String queue) {
Deque<String> dq = queues.get(queue);
return (dq == null || dq.isEmpty()) ? null : new Redrive.Reserved(dq.pollFirst(), null);
}

@Override
public void publish(String queue, String body) {
queues.computeIfAbsent(queue, k -> new ArrayDeque<>()).addLast(body);
}

@Override
public void ack(Redrive.Reserved message) {
// pop already removed it
}
}

private static void deadLettered(Redrive.Transport t, String dlq, String urn, String originalQueue)
throws Exception {
Envelope env = EnvelopeCodec.make(urn, Map.of("order_id", 1), originalQueue, null);
Envelope dl = DeadLetters.annotate(env, "failed", originalQueue);
t.publish(dlq, EnvelopeCodec.encode(dl));
}

@Test
void isReplayDefaultsFalseAndBypassRunsEffect() throws Exception {
assertFalse(Replay.isReplay());
boolean[] ran = {false};
Replay.bypassExternalEffects(() -> ran[0] = true);
assertTrue(ran[0], "effect must run when not a replay");
}

@Test
void processEstablishesReplayScopeAndBypassSkips() throws Exception {
boolean[] ran = {false};
boolean[] sawReplay = {false};
Replay.process(Map.of(Replay.HEADER_REPLAY_BYPASS, "1"), () -> {
sawReplay[0] = Replay.isReplay();
Replay.bypassExternalEffects(() -> ran[0] = true);
});
assertTrue(sawReplay[0], "isReplay must be true inside the process scope");
assertFalse(ran[0], "the effect must be skipped on a replay");
assertFalse(Replay.isReplay(), "the flag must be restored after process");
}

@Test
void processWithoutHeaderIsNotReplay() throws Exception {
boolean[] ran = {false};
Replay.process(Map.of(), () -> Replay.bypassExternalEffects(() -> ran[0] = true));
assertTrue(ran[0], "a message with no replay header is not a replay");
}

@Test
void redriveBypassStampsHeaderAndConsumeSkipsEffect() throws Exception {
MemoryTransport t = new MemoryTransport();
deadLettered(t, "orders.dlq", "urn:babel:orders:created", "orders");

Redrive.Result res = Redrive.redrive(t, "orders.dlq", Redrive.Options.all().bypass(true));
assertEquals(1, res.redriven());
assertTrue(res.items().get(0).bypassed(), "the item must be flagged bypassed");

Redrive.Reserved msg = t.pop("orders");
assertNotNull(msg);
assertEquals("1", msg.headers().get(Replay.HEADER_REPLAY_BYPASS), "redriven message carries the header");

Envelope env = EnvelopeCodec.decode(msg.body());
boolean[] emailed = {false};
Handler handler = e -> {
assertTrue(Replay.isReplay(), "the handler should see this as a replay");
Replay.bypassExternalEffects(() -> emailed[0] = true);
};
Replay.process(msg.headers(), () -> handler.handle(env));
assertFalse(emailed[0], "the external side-effect must be skipped on a bypassed replay");
}

@Test
void bypassWithoutHeaderPublisherFallsBack() throws Exception {
PlainTransport t = new PlainTransport();
deadLettered(t, "dlq", "urn:babel:orders:created", "orders");

Redrive.Result res = Redrive.redrive(t, "dlq", Redrive.Options.all().bypass(true));
assertEquals(1, res.redriven());
assertFalse(res.items().get(0).bypassed(), "bypass must be a no-op without a HeaderPublisher");
}
}
Loading