org.junit.jupiter
junit-jupiter
diff --git a/src/main/java/com/babelqueue/otel/Sender.java b/src/main/java/com/babelqueue/otel/Sender.java
new file mode 100644
index 0000000..db466bd
--- /dev/null
+++ b/src/main/java/com/babelqueue/otel/Sender.java
@@ -0,0 +1,20 @@
+package com.babelqueue.otel;
+
+import com.babelqueue.Envelope;
+
+/**
+ * Performs the actual transport write for {@link Tracing#publish}. The core is codec-only, so
+ * the producer helper builds the envelope (stamping the active trace's id into its
+ * {@code trace_id}) and hands it to a {@code Sender} that writes it to a broker.
+ */
+@FunctionalInterface
+public interface Sender {
+
+ /**
+ * Sends one already-built envelope to its destination.
+ *
+ * @param envelope the envelope to publish
+ * @throws Exception to signal a failed publish (recorded on the producer span and re-thrown)
+ */
+ void send(Envelope envelope) throws Exception;
+}
diff --git a/src/main/java/com/babelqueue/otel/Tracing.java b/src/main/java/com/babelqueue/otel/Tracing.java
new file mode 100644
index 0000000..aa1e589
--- /dev/null
+++ b/src/main/java/com/babelqueue/otel/Tracing.java
@@ -0,0 +1,222 @@
+package com.babelqueue.otel;
+
+import com.babelqueue.Envelope;
+import com.babelqueue.EnvelopeCodec;
+import com.babelqueue.Meta;
+import com.babelqueue.idempotency.Handler;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanContext;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.TraceFlags;
+import io.opentelemetry.api.trace.TraceState;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Locale;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Optional OpenTelemetry tracing for a babelqueue producer or consumer (ADR-0025) — the Java
+ * mirror of the Go {@code babelqueue-go/otel}, Python {@code babelqueue.otel} and Node
+ * {@code @babelqueue/core/otel} helpers.
+ *
+ * Produce/consume spans are correlated across every hop and SDK through the envelope's
+ * {@code trace_id} — a UUID, which maps 1:1 to a 32-hex OpenTelemetry trace id ({@link #traceIdOf}
+ * / {@link #uuidOf}). A non-UUID {@code trace_id} is hashed deterministically (SHA-256).
+ *
+ *
+ * - Consumer: {@link #wrapHandler} emits a {@code CONSUMER} span {@code process }
+ * in the {@code trace_id}-derived trace, recording the handler's error/status. It reuses
+ * the shared {@link Handler}, so it composes with {@code Idempotent.wrap} /
+ * {@code SchemaValidation.wrap}.
+ * - Producer: {@link #publish} emits a {@code PRODUCER} span {@code publish } that
+ * carries the active trace's id into the message's {@code trace_id}, then writes it via a
+ * {@link Sender}.
+ *
+ *
+ * Every hop that shares a {@code trace_id} shares one OTel trace. Exact cross-hop span
+ * parent-child linkage (W3C {@code traceparent} as a transport header) is a documented follow-up.
+ */
+public final class Tracing {
+
+ private static final String SYSTEM = "babelqueue";
+ private static final String DEFAULT_QUEUE = "default";
+ private static final String INVALID_TRACE_ID = "00000000000000000000000000000000";
+ private static final String INVALID_SPAN_ID = "0000000000000000";
+ private static final Pattern HEX_32 = Pattern.compile("[0-9a-f]{32}");
+
+ private Tracing() {
+ }
+
+ /**
+ * Maps an envelope {@code trace_id} to a deterministic 32-hex OTel trace id: a UUID maps to
+ * its hex bytes; any other string is hashed (SHA-256, first 16 bytes). Never the all-zero
+ * (invalid) trace id. The inverse of {@link #uuidOf} for the UUID case.
+ *
+ * @param traceId the envelope {@code trace_id}
+ * @return a valid 32-hex OTel trace id
+ */
+ public static String traceIdOf(String traceId) {
+ String hex = normalizeHex(traceId);
+ if (HEX_32.matcher(hex).matches() && !hex.equals(INVALID_TRACE_ID)) {
+ return hex;
+ }
+ return toHex(sha256(traceId == null ? "" : traceId), 16);
+ }
+
+ /**
+ * Formats a 32-hex OTel trace id as a canonical UUID string — the form a producer stamps into
+ * the message's {@code trace_id} so a consumer can recover the same trace id via
+ * {@link #traceIdOf}.
+ *
+ * @param traceIdHex a 32-hex OTel trace id
+ * @return the canonical UUID string
+ */
+ public static String uuidOf(String traceIdHex) {
+ StringBuilder h = new StringBuilder(normalizeHex(traceIdHex));
+ while (h.length() < 32) {
+ h.insert(0, '0');
+ }
+ String s = h.substring(0, 32);
+ return s.substring(0, 8) + "-" + s.substring(8, 12) + "-" + s.substring(12, 16)
+ + "-" + s.substring(16, 20) + "-" + s.substring(20, 32);
+ }
+
+ /**
+ * Wraps a consume handler to emit a {@code CONSUMER} span per message, in the OTel trace
+ * derived from the envelope's {@code trace_id}, recording the handler's error/status. The
+ * handler receives the full {@link Envelope} as before.
+ *
+ * @param tracer the OTel tracer
+ * @param handler the handler to instrument
+ * @return the wrapped handler
+ */
+ public static Handler wrapHandler(Tracer tracer, Handler handler) {
+ return envelope -> {
+ Span span = tracer.spanBuilder("process " + nullToEmpty(envelope.job()))
+ .setSpanKind(SpanKind.CONSUMER)
+ .setParent(parentContext(envelope.traceId()))
+ .setAllAttributes(consumeAttributes(envelope))
+ .startSpan();
+ try (Scope scope = span.makeCurrent()) {
+ handler.handle(envelope);
+ } catch (Exception e) {
+ span.recordException(e);
+ span.setStatus(StatusCode.ERROR, nullToEmpty(e.getMessage()));
+ throw e;
+ } finally {
+ span.end();
+ }
+ };
+ }
+
+ /**
+ * Publishes {@code (urn, data)} to the {@code "default"} queue under a {@code PRODUCER} span.
+ * See {@link #publish(Tracer, String, Map, String, Sender)}.
+ *
+ * @param tracer the OTel tracer
+ * @param urn the message URN
+ * @param data the message data
+ * @param send the transport write
+ * @return the published message id ({@code meta.id})
+ * @throws Exception if {@code send} fails (recorded on the span and re-thrown)
+ */
+ public static String publish(Tracer tracer, String urn, Map data, Sender send)
+ throws Exception {
+ return publish(tracer, urn, data, DEFAULT_QUEUE, send);
+ }
+
+ /**
+ * Emits a {@code PRODUCER} span {@code publish }, carrying the active trace's id into the
+ * built envelope's {@code trace_id} so the downstream consumer recovers the same trace, then
+ * writes the envelope via {@code send}.
+ *
+ * @param tracer the OTel tracer
+ * @param urn the message URN
+ * @param data the message data
+ * @param queue the destination queue
+ * @param send the transport write
+ * @return the published message id ({@code meta.id})
+ * @throws Exception if {@code send} fails (recorded on the span and re-thrown)
+ */
+ public static String publish(
+ Tracer tracer, String urn, Map data, String queue, Sender send)
+ throws Exception {
+ Span span = tracer.spanBuilder("publish " + urn)
+ .setSpanKind(SpanKind.PRODUCER)
+ .setAttribute("messaging.system", SYSTEM)
+ .setAttribute("messaging.operation", "publish")
+ .setAttribute("messaging.destination.name", urn)
+ .startSpan();
+ try (Scope scope = span.makeCurrent()) {
+ String traceId = uuidOf(span.getSpanContext().getTraceId());
+ Envelope envelope = EnvelopeCodec.make(urn, data, queue, traceId);
+ send.send(envelope);
+ String id = envelope.meta().id();
+ span.setAttribute("messaging.message.id", id);
+ return id;
+ } catch (Exception e) {
+ span.recordException(e);
+ span.setStatus(StatusCode.ERROR, nullToEmpty(e.getMessage()));
+ throw e;
+ } finally {
+ span.end();
+ }
+ }
+
+ /** A context carrying a remote parent in the {@code trace_id}-derived trace. */
+ private static Context parentContext(String traceId) {
+ SpanContext sc = SpanContext.createFromRemoteParent(
+ traceIdOf(traceId), spanIdOf(traceId), TraceFlags.getSampled(), TraceState.getDefault());
+ return Context.root().with(Span.wrap(sc));
+ }
+
+ private static Attributes consumeAttributes(Envelope envelope) {
+ Meta meta = envelope.meta();
+ return Attributes.builder()
+ .put("messaging.system", SYSTEM)
+ .put("messaging.operation", "process")
+ .put("messaging.destination.name", meta == null ? "" : nullToEmpty(meta.queue()))
+ .put("messaging.message.id", meta == null ? "" : nullToEmpty(meta.id()))
+ .put("messaging.message.conversation_id", nullToEmpty(envelope.traceId()))
+ .put("messaging.babelqueue.attempts", (long) envelope.attempts())
+ .build();
+ }
+
+ /** Deterministic, non-zero 16-hex span id so the remote parent context is valid. */
+ private static String spanIdOf(String traceId) {
+ String sid = toHex(sha256("babelqueue-span:" + (traceId == null ? "" : traceId)), 8);
+ return sid.equals(INVALID_SPAN_ID) ? "0000000000000001" : sid;
+ }
+
+ private static String normalizeHex(String s) {
+ return s == null ? "" : s.replace("-", "").toLowerCase(Locale.ROOT);
+ }
+
+ private static String nullToEmpty(String s) {
+ return s == null ? "" : s;
+ }
+
+ private static byte[] sha256(String s) {
+ try {
+ return MessageDigest.getInstance("SHA-256").digest(s.getBytes(StandardCharsets.UTF_8));
+ } catch (NoSuchAlgorithmException e) {
+ throw new IllegalStateException("SHA-256 is required but unavailable", e);
+ }
+ }
+
+ private static String toHex(byte[] bytes, int n) {
+ StringBuilder sb = new StringBuilder(n * 2);
+ for (int i = 0; i < n; i++) {
+ sb.append(Character.forDigit((bytes[i] >> 4) & 0xF, 16));
+ sb.append(Character.forDigit(bytes[i] & 0xF, 16));
+ }
+ return sb.toString();
+ }
+}
diff --git a/src/main/java/com/babelqueue/otel/package-info.java b/src/main/java/com/babelqueue/otel/package-info.java
new file mode 100644
index 0000000..6234418
--- /dev/null
+++ b/src/main/java/com/babelqueue/otel/package-info.java
@@ -0,0 +1,14 @@
+/**
+ * Optional OpenTelemetry tracing for babelqueue (ADR-0025) — the Java mirror of the Go
+ * {@code babelqueue-go/otel} module.
+ *
+ * {@link com.babelqueue.otel.Tracing} emits a {@code CONSUMER} span per handled message and
+ * a {@code PRODUCER} span per publish, correlating them across every hop and SDK through the
+ * envelope's {@code trace_id} — a UUID, which maps 1:1 to a 32-hex OpenTelemetry trace id. The
+ * wire envelope is untouched, and {@code io.opentelemetry:opentelemetry-api} is declared as an
+ * optional dependency, so the core stays zero-dependency for users who do not opt in.
+ *
+ *
Every hop that shares a {@code trace_id} shares one OTel trace. Exact cross-hop span
+ * parent-child linkage (W3C {@code traceparent} as a transport header) is a documented follow-up.
+ */
+package com.babelqueue.otel;
diff --git a/src/test/java/com/babelqueue/otel/TracingTest.java b/src/test/java/com/babelqueue/otel/TracingTest.java
new file mode 100644
index 0000000..ae4a4ad
--- /dev/null
+++ b/src/test/java/com/babelqueue/otel/TracingTest.java
@@ -0,0 +1,119 @@
+package com.babelqueue.otel;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import com.babelqueue.Envelope;
+import com.babelqueue.EnvelopeCodec;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.api.trace.Tracer;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import java.util.List;
+import java.util.Map;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class TracingTest {
+
+ private static final String TRACE_ID = "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b";
+ private static final String INVALID = "00000000000000000000000000000000";
+
+ private InMemorySpanExporter exporter;
+ private Tracer tracer;
+
+ @BeforeEach
+ void setUp() {
+ exporter = InMemorySpanExporter.create();
+ SdkTracerProvider provider = SdkTracerProvider.builder()
+ .addSpanProcessor(SimpleSpanProcessor.create(exporter))
+ .build();
+ tracer = provider.get("test");
+ }
+
+ private SpanData onlySpan() {
+ List spans = exporter.getFinishedSpanItems();
+ assertEquals(1, spans.size());
+ return spans.get(0);
+ }
+
+ @Test
+ void traceIdRoundTripsAndHashesNonUuid() {
+ String hex = Tracing.traceIdOf(TRACE_ID);
+ assertTrue(hex.matches("[0-9a-f]{32}"));
+ assertEquals(TRACE_ID, Tracing.uuidOf(hex));
+
+ // a non-uuid maps deterministically to a valid, distinct trace id
+ assertEquals(Tracing.traceIdOf("not-a-uuid"), Tracing.traceIdOf("not-a-uuid"));
+ assertNotEquals(hex, Tracing.traceIdOf("not-a-uuid"));
+ assertTrue(Tracing.traceIdOf("zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz").matches("[0-9a-f]{32}"));
+ assertNotEquals(INVALID, Tracing.traceIdOf(null));
+ }
+
+ @Test
+ void wrapHandlerEmitsConsumerSpanInTraceIdTrace() throws Exception {
+ boolean[] called = {false};
+ Envelope env = EnvelopeCodec.make("urn:babel:orders:created", Map.of("order_id", 1), "orders", TRACE_ID);
+
+ Tracing.wrapHandler(tracer, e -> called[0] = true).handle(env);
+
+ assertTrue(called[0]);
+ SpanData span = onlySpan();
+ assertEquals("process urn:babel:orders:created", span.getName());
+ assertEquals(SpanKind.CONSUMER, span.getKind());
+ assertEquals(Tracing.traceIdOf(TRACE_ID), span.getSpanContext().getTraceId());
+ assertEquals(TRACE_ID, span.getAttributes().get(AttributeKey.stringKey("messaging.message.conversation_id")));
+ assertEquals("orders", span.getAttributes().get(AttributeKey.stringKey("messaging.destination.name")));
+ assertEquals(0L, span.getAttributes().get(AttributeKey.longKey("messaging.babelqueue.attempts")));
+ }
+
+ @Test
+ void wrapHandlerRecordsErrorAndRethrows() {
+ Envelope env = EnvelopeCodec.make("urn:babel:orders:created", Map.of(), "orders", TRACE_ID);
+
+ IllegalStateException boom = assertThrows(IllegalStateException.class, () ->
+ Tracing.wrapHandler(tracer, e -> {
+ throw new IllegalStateException("boom");
+ }).handle(env));
+ assertEquals("boom", boom.getMessage());
+
+ SpanData span = onlySpan();
+ assertEquals(StatusCode.ERROR, span.getStatus().getStatusCode());
+ assertFalse(span.getEvents().isEmpty());
+ }
+
+ @Test
+ void publishEmitsProducerSpanAndStampsTraceId() throws Exception {
+ Envelope[] sent = {null};
+
+ String id = Tracing.publish(tracer, "urn:babel:orders:created", Map.of("order_id", 7), e -> sent[0] = e);
+
+ SpanData span = onlySpan();
+ assertEquals(SpanKind.PRODUCER, span.getKind());
+ assertEquals("publish urn:babel:orders:created", span.getName());
+ assertEquals(id, span.getAttributes().get(AttributeKey.stringKey("messaging.message.id")));
+ // the published trace_id encodes the producer span's trace, so a consumer recovers it
+ assertEquals(Tracing.uuidOf(span.getSpanContext().getTraceId()), sent[0].traceId());
+ assertEquals(span.getSpanContext().getTraceId(), Tracing.traceIdOf(sent[0].traceId()));
+ }
+
+ @Test
+ void publishRecordsAFailingSend() {
+ Exception boom = assertThrows(IllegalStateException.class, () ->
+ Tracing.publish(tracer, "urn:babel:orders:created", Map.of(), e -> {
+ throw new IllegalStateException("send failed");
+ }));
+ assertEquals("send failed", boom.getMessage());
+
+ SpanData span = onlySpan();
+ assertEquals(SpanKind.PRODUCER, span.getKind());
+ assertEquals(StatusCode.ERROR, span.getStatus().getStatusCode());
+ }
+}