diff --git a/pyproject.toml b/pyproject.toml index 3bf7446..77c3151 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "babelqueue" -version = "1.9.0" +version = "1.10.0" description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers." readme = "README.md" requires-python = ">=3.9" diff --git a/src/babelqueue/__init__.py b/src/babelqueue/__init__.py index e6e726f..cc03463 100644 --- a/src/babelqueue/__init__.py +++ b/src/babelqueue/__init__.py @@ -11,14 +11,15 @@ from __future__ import annotations -from . import dead_letter, idempotency, redrive +from . import dead_letter, idempotency, redrive, replay from .app import BabelQueue from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec from .contracts import HasTraceId, PolyglotMessage from .idempotency import IdempotencyStore, InMemoryStore from .exceptions import BabelQueueError, UnknownUrnError +from .replay import HEADER_REPLAY_BYPASS, bypass_external_effects, is_replay from .routing import UnknownUrnStrategy -from .transport import InMemoryTransport, ReceivedMessage, Transport +from .transport import HeaderPublisher, InMemoryTransport, ReceivedMessage, Transport __version__ = "1.6.0" @@ -33,11 +34,16 @@ "Transport", "InMemoryTransport", "ReceivedMessage", + "HeaderPublisher", "BabelQueueError", "UnknownUrnError", "dead_letter", "idempotency", "redrive", + "replay", + "is_replay", + "bypass_external_effects", + "HEADER_REPLAY_BYPASS", "IdempotencyStore", "InMemoryStore", "__version__", diff --git a/src/babelqueue/app.py b/src/babelqueue/app.py index 22fbe6e..5fe14c4 100644 --- a/src/babelqueue/app.py +++ b/src/babelqueue/app.py @@ -25,6 +25,7 @@ def on_order_created(data, meta): from . import dead_letter from .codec import EnvelopeCodec from .exceptions import UnknownUrnError +from .replay import HEADER_REPLAY_BYPASS, _replay_scope from .routing import UnknownUrnStrategy from .transport import ReceivedMessage, Transport, make_transport @@ -116,18 +117,19 @@ def consume( def dispatch(self, received: ReceivedMessage) -> None: """Route one reserved message to its handler and acknowledge it.""" - envelope = EnvelopeCodec.decode(received.body) - urn = str(envelope.get("job") or envelope.get("urn") or "") - handler = self._handlers.get(urn) if urn else None - - try: - if handler is None: - self._route_unknown(urn, received, envelope) - return - self._invoke(handler, envelope) - self.transport.ack(received) - except Exception as exc: # noqa: BLE001 - one bad message must not kill the loop - self._retry_or_dead_letter(received, envelope, exc) + with _replay_scope(bool(received.headers.get(HEADER_REPLAY_BYPASS))): + envelope = EnvelopeCodec.decode(received.body) + urn = str(envelope.get("job") or envelope.get("urn") or "") + handler = self._handlers.get(urn) if urn else None + + try: + if handler is None: + self._route_unknown(urn, received, envelope) + return + self._invoke(handler, envelope) + self.transport.ack(received) + except Exception as exc: # noqa: BLE001 - one bad message must not kill the loop + self._retry_or_dead_letter(received, envelope, exc) # -- Internals ---------------------------------------------------------- diff --git a/src/babelqueue/redrive.py b/src/babelqueue/redrive.py index 7fcc848..05fa73b 100644 --- a/src/babelqueue/redrive.py +++ b/src/babelqueue/redrive.py @@ -20,12 +20,13 @@ acknowledged only after a successful re-publish, and an undecodable body is restored, not dropped. -Replay safety today is sandbox routing (``to_queue``) + ``dry_run``. The **Replay-Bypass** -guard — a ``bq-replay-bypass`` transport header surfaced to handlers so a replay can skip -external side-effects (don't re-charge, don't re-email) — is a documented phase two: like the -OpenTelemetry ``traceparent`` follow-up, it carries out-of-band metadata as a transport header -and so touches the runtime + every transport binding. Until then, sandbox routing is the -safe-replay answer. +Replay safety is sandbox routing (``to_queue``) + ``dry_run``, plus the **Replay-Bypass** guard +(``bypass=True``): it stamps a ``bq-replay-bypass`` transport header that the runtime surfaces to +handlers via :func:`babelqueue.is_replay` / :func:`babelqueue.bypass_external_effects`, so a +replay can skip external side-effects that already fired (don't re-charge, don't re-email) — see +:mod:`babelqueue.replay` (ADR-0027). The header rides out of band, so the envelope stays frozen; +it propagates over a real broker only once that broker's transport implements the optional +:class:`~babelqueue.transport.HeaderPublisher` capability (the in-memory transport does today). """ from __future__ import annotations @@ -34,7 +35,8 @@ from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple from .codec import EnvelopeCodec -from .transport import ReceivedMessage, Transport +from .replay import HEADER_REPLAY_BYPASS +from .transport import HeaderPublisher, ReceivedMessage, Transport Envelope = Mapping[str, Any] Select = Callable[[Envelope], bool] @@ -51,6 +53,7 @@ class RedriveItem: from_queue: str to: str # target queue (the plan, even on a dry run; "" when skipped/undecodable) redriven: bool # True only when actually re-published to ``to`` + bypassed: bool = False # True when the bq-replay-bypass header was stamped on the message @dataclass @@ -70,6 +73,7 @@ def redrive( max: int = 0, dry_run: bool = False, select: Optional[Select] = None, + bypass: bool = False, timeout: float = 1.0, ) -> RedriveResult: """Move dead-lettered messages off ``dlq`` and replay them; see the module docstring.""" @@ -125,7 +129,9 @@ def redrive( reset.pop("dead_letter", None) reset["attempts"] = 0 try: - transport.publish(target, EnvelopeCodec.encode(reset)) + item.bypassed = _publish_redriven( + transport, target, EnvelopeCodec.encode(reset), bypass + ) except Exception: transport.publish(dlq, message.body) # restore on a publish failure, then surface transport.ack(message) @@ -150,6 +156,17 @@ def _decoded(body: str) -> Optional[Dict[str, Any]]: return envelope +def _publish_redriven(transport: Transport, queue: str, body: str, bypass: bool) -> bool: + """Re-publish a reset message to ``queue``. When ``bypass`` is set and the transport is a + :class:`~babelqueue.transport.HeaderPublisher`, stamp the ``bq-replay-bypass`` header and + return True; otherwise publish plainly and return False.""" + if bypass and isinstance(transport, HeaderPublisher): + transport.publish_with_headers(queue, body, {HEADER_REPLAY_BYPASS: "1"}) + return True + transport.publish(queue, body) + return False + + def _source_queue_of(envelope: Envelope) -> str: """Default redrive target: ``dead_letter.original_queue``, falling back to ``meta.queue``.""" dead_letter = envelope.get("dead_letter") diff --git a/src/babelqueue/replay.py b/src/babelqueue/replay.py new file mode 100644 index 0000000..d9f06cb --- /dev/null +++ b/src/babelqueue/replay.py @@ -0,0 +1,74 @@ +"""Optional Replay-Bypass guard (ADR-0027): skip external side-effects on a deliberate replay. + +The Python mirror of the Go ``replay.go``. It closes the loop left open by +:mod:`babelqueue.redrive`: a deliberate replay re-runs the handler, and its external +side-effects re-fire (a second charge, a duplicate email). :mod:`babelqueue.idempotency` stops +an *accidental* duplicate; this stops the *intended* reprocess from re-firing effects that +already happened. + +``Redrive`` (with ``bypass=True``) stamps a ``bq-replay-bypass`` transport header on a redriven +message; the runtime surfaces it to the handler via a :class:`contextvars.ContextVar`. A handler +wraps its external, non-idempotent side in :func:`bypass_external_effects` so a replay re-runs the +idempotent core but skips effects that already fired:: + + from babelqueue import is_replay, bypass_external_effects + + @app.handler("urn:babel:orders:created") + def on_order_created(data, meta): + save_order(data) # idempotent core — always runs + bypass_external_effects(lambda: send_email(data)) # external effect — skipped on replay + +The marker rides **out of band** as a transport header, so the frozen envelope is untouched +(GR-1). It propagates over a real broker only once that broker's concrete transport implements +the optional :class:`~babelqueue.transport.HeaderPublisher` capability — a follow-up, like the +broker bindings; the in-memory transport supports it today, so the path is end-to-end testable. +""" + +from __future__ import annotations + +import contextlib +import contextvars +from typing import Callable, Iterator, Optional, TypeVar + +#: The out-of-band transport header :func:`~babelqueue.redrive.redrive` stamps on a replayed +#: message (with ``bypass=True``) and that the runtime surfaces as :func:`is_replay`. +HEADER_REPLAY_BYPASS = "bq-replay-bypass" + +_replay_var: contextvars.ContextVar[bool] = contextvars.ContextVar( + "babelqueue_replay", default=False +) + +T = TypeVar("T") + + +def is_replay() -> bool: + """Whether the message currently being handled was redriven with the replay-bypass marker. + + True means this is a deliberate replay, so external side-effects that already happened should + be skipped. Reads the flag the runtime set on the context from the + :data:`HEADER_REPLAY_BYPASS` transport header. + """ + return _replay_var.get() + + +def bypass_external_effects(fn: Callable[[], T]) -> Optional[T]: + """Run ``fn`` unless the current message is a replay (see :func:`is_replay`), in which case + skip it and return ``None``. + + Wrap the external, non-idempotent side of a handler — sending an email, charging a card, + calling a third party — so a replay re-runs the idempotent core but does not re-fire effects + that already happened. + """ + if is_replay(): + return None + return fn() + + +@contextlib.contextmanager +def _replay_scope(active: bool) -> Iterator[None]: + """Internal: mark the current context as a replay (or not) for the span of one dispatch.""" + token = _replay_var.set(active) + try: + yield + finally: + _replay_var.reset(token) diff --git a/src/babelqueue/transport.py b/src/babelqueue/transport.py index a61ceeb..306ceda 100644 --- a/src/babelqueue/transport.py +++ b/src/babelqueue/transport.py @@ -8,8 +8,8 @@ from abc import ABC, abstractmethod from collections import defaultdict, deque -from dataclasses import dataclass -from typing import Any, Deque, Dict, Optional +from dataclasses import dataclass, field +from typing import Any, Deque, Dict, Optional, Protocol, runtime_checkable from .exceptions import BabelQueueError @@ -21,6 +21,9 @@ class ReceivedMessage: body: str queue: str handle: Any = None + #: Out-of-band transport headers a :class:`HeaderPublisher` carried with the message + #: (e.g. the ``bq-replay-bypass`` marker). Empty for transports that don't surface them. + headers: Dict[str, str] = field(default_factory=dict) class Transport(ABC): @@ -42,20 +45,42 @@ def close(self) -> None: # pragma: no cover - optional """Release any resources (override if needed).""" +@runtime_checkable +class HeaderPublisher(Protocol): + """Optional :class:`Transport` capability: publish a body together with out-of-band + transport headers (e.g. the replay-bypass marker), for brokers that carry per-message + metadata. A transport that does not implement it simply does not propagate headers — + callers fall back to plain :meth:`Transport.publish` (ADR-0027).""" + + def publish_with_headers(self, queue: str, body: str, headers: Dict[str, str]) -> None: + """Append ``body`` to ``queue`` along with out-of-band ``headers``.""" + ... + + class InMemoryTransport(Transport): """In-process transport for tests and broker-free local runs (``memory://``).""" def __init__(self) -> None: + # Bodies and their out-of-band headers are kept in lockstep parallel deques, so the + # body storage layout stays a plain Deque[str]. self._queues: Dict[str, Deque[str]] = defaultdict(deque) + self._headers: Dict[str, Deque[Dict[str, str]]] = defaultdict(deque) def publish(self, queue: str, body: str) -> None: self._queues[queue].append(body) + self._headers[queue].append({}) + + def publish_with_headers(self, queue: str, body: str, headers: Dict[str, str]) -> None: + self._queues[queue].append(body) + self._headers[queue].append(dict(headers)) def pop(self, queue: str, timeout: float = 1.0) -> Optional[ReceivedMessage]: dq = self._queues.get(queue) if not dq: return None - return ReceivedMessage(body=dq.popleft(), queue=queue) + return ReceivedMessage( + body=dq.popleft(), queue=queue, headers=self._headers[queue].popleft() + ) def ack(self, message: ReceivedMessage) -> None: # Already removed on pop; nothing to do. diff --git a/tests/test_redrive.py b/tests/test_redrive.py index 659c6ce..09b6e38 100644 --- a/tests/test_redrive.py +++ b/tests/test_redrive.py @@ -33,6 +33,7 @@ def test_redrive_to_source(): result = redrive(t, "orders.dlq") assert result.redriven == 1 and result.skipped == 0 + assert result.items[0].bypassed is False # bypass is off by default got = _drain(t, "orders") assert len(got) == 1 assert "dead_letter" not in got[0] diff --git a/tests/test_replay.py b/tests/test_replay.py new file mode 100644 index 0000000..44eb229 --- /dev/null +++ b/tests/test_replay.py @@ -0,0 +1,124 @@ +"""Tests for the optional Replay-Bypass guard (ADR-0027).""" + +from babelqueue import ( + BabelQueue, + bypass_external_effects, + dead_letter, + is_replay, +) +from babelqueue.codec import EnvelopeCodec +from babelqueue.redrive import redrive +from babelqueue.replay import HEADER_REPLAY_BYPASS, _replay_scope +from babelqueue.transport import InMemoryTransport, Transport + + +def _dead_letter(transport, dlq, urn, original_queue, data=None): + env = EnvelopeCodec.make(urn, data or {}, queue=original_queue) + annotated = dead_letter.annotate(env, "failed", original_queue, 3, error="boom") + transport.publish(dlq, EnvelopeCodec.encode(annotated)) + return annotated + + +def test_is_replay_default_false(): + assert is_replay() is False + + +def test_bypass_runs_when_not_replay(): + ran = [] + + def effect(): + ran.append("x") + return "did-it" + + assert bypass_external_effects(effect) == "did-it" + assert ran == ["x"] + + +def test_bypass_skips_on_replay_and_scope_resets(): + ran = [] + + def effect(): + ran.append("x") + raise AssertionError("must not run on a replay") + + with _replay_scope(True): + assert is_replay() is True + assert bypass_external_effects(effect) is None + assert ran == [] + assert is_replay() is False # the scope reset the contextvar + + +def test_in_memory_transport_carries_headers(): + t = InMemoryTransport() + t.publish_with_headers("q", "body", {HEADER_REPLAY_BYPASS: "1"}) + msg = t.pop("q", 0) + assert msg is not None and msg.headers.get(HEADER_REPLAY_BYPASS) == "1" + # a plain publish carries no headers + t.publish("q", "plain") + msg2 = t.pop("q", 0) + assert msg2 is not None and msg2.headers == {} + + +def test_redrive_bypass_stamps_header_and_consume_skips_effects(): + t = InMemoryTransport() + _dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders", {"order_id": 1}) + + result = redrive(t, "orders.dlq", bypass=True) + assert result.redriven == 1 + assert result.items[0].bypassed is True + + # the redriven message on the source queue carries the bypass header + msg = t.pop("orders", 0) + assert msg is not None and msg.headers.get(HEADER_REPLAY_BYPASS) == "1" + + # consume it: the handler sees is_replay and BypassExternalEffects skips the side-effect + emailed = [] + app = BabelQueue(transport=t) + + @app.handler("urn:babel:orders:created") + def on_created(data, meta): + assert is_replay() is True + bypass_external_effects(lambda: emailed.append(data)) + + app.dispatch(msg) + assert emailed == [] # the external side-effect was skipped on the bypassed replay + + +def test_normal_delivery_is_not_a_replay(): + t = InMemoryTransport() + app = BabelQueue(transport=t) + fired = [] + + @app.handler("urn:babel:orders:created") + def on_created(data, meta): + assert is_replay() is False + bypass_external_effects(lambda: fired.append(data)) + + app.publish("urn:babel:orders:created", {"order_id": 9}) + app.consume(max_messages=1, timeout=0) + assert fired == [{"order_id": 9}] # effect runs on a normal (non-replay) delivery + + +class _PlainTransport(Transport): + """A Transport that is deliberately NOT a HeaderPublisher (no publish_with_headers).""" + + def __init__(self): + self._inner = InMemoryTransport() + + def publish(self, queue, body): + self._inner.publish(queue, body) + + def pop(self, queue, timeout=1.0): + return self._inner.pop(queue, timeout) + + def ack(self, message): + self._inner.ack(message) + + +def test_redrive_bypass_without_header_support_falls_back(): + t = _PlainTransport() + _dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders") + + result = redrive(t, "orders.dlq", bypass=True) + assert result.redriven == 1 + assert result.items[0].bypassed is False # no-op without a HeaderPublisher