Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Install (with Celery + Django adapters)
run: |
python -m pip install --upgrade pip
pip install -e ".[dev,celery,django]"
pip install -e ".[dev,celery,django,otel]" opentelemetry-sdk

- name: Run tests
run: pytest
Expand All @@ -44,7 +44,7 @@ jobs:
- name: Install (dev + all adapters for type context)
run: |
python -m pip install --upgrade pip
pip install -e ".[dev,celery,django,redis,amqp,sqs,azureservicebus,artemis]"
pip install -e ".[dev,celery,django,redis,amqp,sqs,azureservicebus,artemis,otel]" opentelemetry-sdk
- name: Ruff
run: ruff check src tests
- name: Mypy
Expand Down Expand Up @@ -89,7 +89,7 @@ jobs:
- name: Install (all adapters — full coverage with brokers)
run: |
python -m pip install --upgrade pip
pip install -e ".[redis,amqp,sqs,azureservicebus,artemis,celery,django,dev]"
pip install -e ".[redis,amqp,sqs,azureservicebus,artemis,celery,django,dev,otel]" opentelemetry-sdk

- name: Wait for ElasticMQ
run: |
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ azureservicebus = ["azure-servicebus>=7.11", "azure-identity>=1.15"]
pulsar = ["pulsar-client>=3.4"]
kafka = ["confluent-kafka>=2.3"]
artemis = ["python-qpid-proton>=0.39"]
otel = ["opentelemetry-api>=1.20"]
celery = ["celery>=5"]
django = ["django>=4.2"]
dev = ["pytest>=7", "pytest-cov>=4", "mypy>=1.8", "ruff>=0.5"]
Expand Down
176 changes: 176 additions & 0 deletions src/babelqueue/otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
"""Optional OpenTelemetry tracing (ADR-0025): produce/consume spans correlated across hops.

The Python mirror of the Go ``babelqueue-go/otel`` module. It emits a CONSUMER span per
handled message and a PRODUCER span per publish, correlating them across every hop and SDK
through the envelope's ``trace_id`` — a UUID, which maps 1:1 to a 128-bit OTel trace id. The
wire envelope is untouched (GR-1) and the core never imports OpenTelemetry: this module is
only importable with the ``[otel]`` extra (``pip install babelqueue[otel]``), exactly like the
optional transport drivers.

from opentelemetry import trace
from babelqueue import BabelQueue, otel

tracer = trace.get_tracer("orders")
app = BabelQueue("redis://localhost:6379/0", queue="orders")
app.register("urn:babel:orders:created", otel.wrap_handler(tracer, on_order_created))
# producer side:
otel.publish(tracer, app, "urn:babel:orders:created", {"order_id": 1042})

Every hop that shares a ``trace_id`` shares one OTel trace. Exact cross-hop *span*
parent-child linkage (W3C ``traceparent`` as a transport header) is a documented follow-up.
"""

from __future__ import annotations

import hashlib
import inspect
from typing import Any, Callable, Mapping, Optional

from opentelemetry.context import Context
from opentelemetry.trace import (
NonRecordingSpan,
SpanContext,
SpanKind,
TraceFlags,
Tracer,
set_span_in_context,
)

Handler = Callable[..., None]

_SYSTEM = "babelqueue"
_MASK_128 = (1 << 128) - 1


def trace_id_of(trace_id: str) -> int:
"""Map an envelope ``trace_id`` to a deterministic 128-bit OTel trace id.

A UUID maps to its 16 raw bytes; any other string is hashed (SHA-256, first 16 bytes).
The result is never zero (OTel's invalid trace id). The inverse of :func:`uuid_of` for
the UUID case.
"""
raw = _uuid_bytes(trace_id)
if raw is not None:
n = int.from_bytes(raw, "big")
if n != 0:
return n
digest = hashlib.sha256(trace_id.encode("utf-8")).digest()[:16]
return int.from_bytes(digest, "big") or 1


def uuid_of(trace_id_int: int) -> str:
"""Format a 128-bit OTel trace id as a canonical UUID string.

The form a producer stamps into the message's ``trace_id`` so a consumer can recover the
same trace id via :func:`trace_id_of`.
"""
h = format(trace_id_int & _MASK_128, "032x")
return f"{h[0:8]}-{h[8:12]}-{h[12:16]}-{h[16:20]}-{h[20:32]}"


def _uuid_bytes(s: str) -> Optional[bytes]:
h = s.replace("-", "")
if len(h) != 32:
return None
try:
return bytes.fromhex(h)
except ValueError:
return None


def _span_id_of(trace_id: str) -> int:
"""Derive a deterministic, non-zero 64-bit span id so the remote parent context is valid
(a span needs a valid parent to inherit a specific trace)."""
digest = hashlib.sha256(("babelqueue-span:" + trace_id).encode("utf-8")).digest()[:8]
return int.from_bytes(digest, "big") or 1


def _parent_context(trace_id: str) -> Context:
sc = SpanContext(
trace_id=trace_id_of(trace_id),
span_id=_span_id_of(trace_id),
is_remote=True,
trace_flags=TraceFlags(TraceFlags.SAMPLED),
)
return set_span_in_context(NonRecordingSpan(sc))


def _wants_envelope(fn: Handler) -> bool:
"""True if ``fn`` takes a 3rd positional arg (the full envelope) or ``*args``."""
try:
params = list(inspect.signature(fn).parameters.values())
except (TypeError, ValueError): # pragma: no cover - builtins / C callables
return False
positional = [
p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
]
return any(p.kind == p.VAR_POSITIONAL for p in params) or len(positional) >= 3


def wrap_handler(tracer: Tracer, handler: Handler) -> Handler:
"""Wrap a consume handler to emit a CONSUMER span per message, in the OTel trace derived
from the envelope's ``trace_id``.

Register it like any handler: ``app.register(urn, wrap_handler(tracer, handler))``. The
wrapper's ``*args`` signature makes the runtime pass the full envelope (``data, meta,
envelope``), so it can read ``trace_id``/``job`` even when the inner handler only wants
``(data, meta)``. A raising handler records the exception on the span and re-raises, so the
runtime's retry / dead-letter path still applies.
"""

def wrapped(*args: Any) -> None:
envelope = args[2] if len(args) > 2 and isinstance(args[2], Mapping) else {}
meta = args[1] if len(args) > 1 and isinstance(args[1], Mapping) else {}
trace_id = str(envelope.get("trace_id") or "")
urn = str(envelope.get("job") or envelope.get("urn") or "")

attributes: dict[str, Any] = {
"messaging.system": _SYSTEM,
"messaging.operation": "process",
"messaging.destination.name": str(meta.get("queue") or ""),
"messaging.message.id": str(meta.get("id") or ""),
"messaging.message.conversation_id": trace_id,
"messaging.babelqueue.attempts": int(envelope.get("attempts", 0) or 0),
}
context = _parent_context(trace_id) if trace_id else None

with tracer.start_as_current_span(
"process " + urn,
context=context,
kind=SpanKind.CONSUMER,
attributes=attributes,
):
if _wants_envelope(handler):
handler(*args)
else:
handler(args[0], args[1])

return wrapped


def publish(
tracer: Tracer,
app: Any,
urn: str,
data: Mapping[str, Any],
*,
queue: Optional[str] = None,
) -> str:
"""Publish via a PRODUCER span ``publish <urn>``, carrying the active trace's id into the
message's ``trace_id`` so the downstream consumer recovers the same trace.

Behaves like ``app.publish`` (returns the message id); ``app`` is any object exposing
``publish(urn, data, *, queue=None, trace_id=None) -> str``.
"""
attributes = {
"messaging.system": _SYSTEM,
"messaging.operation": "publish",
"messaging.destination.name": urn,
}
with tracer.start_as_current_span(
"publish " + urn, kind=SpanKind.PRODUCER, attributes=attributes
) as span:
trace_id = uuid_of(span.get_span_context().trace_id)
message_id = app.publish(urn, data, queue=queue, trace_id=trace_id)
span.set_attribute("messaging.message.id", message_id)
return message_id
131 changes: 131 additions & 0 deletions tests/test_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
"""Tests for the optional OpenTelemetry module (ADR-0025).

Skipped when OpenTelemetry is not installed (the ``[otel]`` extra); CI installs it so these
run and count toward coverage.
"""

import pytest

pytest.importorskip("opentelemetry")

from opentelemetry.sdk.trace import TracerProvider # noqa: E402
from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: E402
from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: E402
InMemorySpanExporter,
)
from opentelemetry.trace import SpanKind # noqa: E402

from babelqueue import otel # noqa: E402

_TRACE_ID = "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b"


def _recorder():
exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
return provider.get_tracer("test"), exporter


def _envelope(trace_id=_TRACE_ID, attempts=0):
return {
"job": "urn:babel:orders:created",
"trace_id": trace_id,
"data": {"order_id": 1},
"meta": {"id": "m1", "queue": "orders"},
"attempts": attempts,
}


class FakeApp:
"""An object exposing publish(urn, data, *, queue, trace_id) -> str, recording calls."""

def __init__(self):
self.calls = []

def publish(self, urn, data, *, queue=None, trace_id=None):
self.calls.append({"urn": urn, "data": data, "queue": queue, "trace_id": trace_id})
return "msg-123"


def test_trace_id_round_trip():
tid = otel.trace_id_of(_TRACE_ID)
assert tid != 0
assert otel.uuid_of(tid) == _TRACE_ID
# a non-uuid trace_id maps deterministically to a valid, distinct trace id
assert otel.trace_id_of("not-a-uuid") == otel.trace_id_of("not-a-uuid")
assert otel.trace_id_of("not-a-uuid") != 0
assert otel.trace_id_of("not-a-uuid") != tid
# 32 chars but not hex → not a UUID, so it is hashed
assert otel.trace_id_of("z" * 32) != 0


def test_wrap_handler_span_in_trace_with_attrs():
tracer, exporter = _recorder()
seen = {}

def handler(data, meta, envelope):
seen["called"] = True

env = _envelope()
otel.wrap_handler(tracer, handler)(env["data"], env["meta"], env)

assert seen.get("called")
spans = exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.name == "process urn:babel:orders:created"
assert span.kind == SpanKind.CONSUMER
assert span.context.trace_id == otel.trace_id_of(_TRACE_ID)
assert span.attributes["messaging.message.conversation_id"] == _TRACE_ID
assert span.attributes["messaging.message.id"] == "m1"
assert span.attributes["messaging.destination.name"] == "orders"


def test_wrap_handler_two_arg_inner():
tracer, exporter = _recorder()
got = {}

def handler(data, meta): # only two positional args
got["data"] = data

env = _envelope()
# the runtime passes (data, meta, envelope); the wrapper forwards only what the inner wants
otel.wrap_handler(tracer, handler)(env["data"], env["meta"], env)

assert got["data"] == env["data"]
assert len(exporter.get_finished_spans()) == 1


def test_wrap_handler_records_error():
tracer, exporter = _recorder()

def handler(data, meta, envelope):
raise ValueError("boom")

env = _envelope()
with pytest.raises(ValueError):
otel.wrap_handler(tracer, handler)(env["data"], env["meta"], env)

span = exporter.get_finished_spans()[0]
assert span.status.status_code.name == "ERROR"
assert len(span.events) >= 1 # the recorded exception


def test_publish_stamps_trace_id_from_span():
tracer, exporter = _recorder()
app = FakeApp()

message_id = otel.publish(tracer, app, "urn:babel:orders:created", {"order_id": 7})

assert message_id == "msg-123"
spans = exporter.get_finished_spans()
assert len(spans) == 1
span = spans[0]
assert span.kind == SpanKind.PRODUCER
assert span.attributes["messaging.message.id"] == "msg-123"

stamped = app.calls[0]["trace_id"]
# the published trace_id encodes the producer span's trace, so a consumer recovers it
assert stamped == otel.uuid_of(span.context.trace_id)
assert otel.trace_id_of(stamped) == span.context.trace_id
Loading