Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "babelqueue"
version = "1.9.0"
version = "1.10.0"
description = "Polyglot Queues, Simplified — the Python core: the canonical BabelQueue wire-envelope codec, contracts and dead-letter helpers."
readme = "README.md"
requires-python = ">=3.9"
Expand Down
10 changes: 8 additions & 2 deletions src/babelqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

from __future__ import annotations

from . import dead_letter, idempotency, redrive
from . import dead_letter, idempotency, redrive, replay
from .app import BabelQueue
from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec
from .contracts import HasTraceId, PolyglotMessage
from .idempotency import IdempotencyStore, InMemoryStore
from .exceptions import BabelQueueError, UnknownUrnError
from .replay import HEADER_REPLAY_BYPASS, bypass_external_effects, is_replay
from .routing import UnknownUrnStrategy
from .transport import InMemoryTransport, ReceivedMessage, Transport
from .transport import HeaderPublisher, InMemoryTransport, ReceivedMessage, Transport

__version__ = "1.6.0"

Expand All @@ -33,11 +34,16 @@
"Transport",
"InMemoryTransport",
"ReceivedMessage",
"HeaderPublisher",
"BabelQueueError",
"UnknownUrnError",
"dead_letter",
"idempotency",
"redrive",
"replay",
"is_replay",
"bypass_external_effects",
"HEADER_REPLAY_BYPASS",
"IdempotencyStore",
"InMemoryStore",
"__version__",
Expand Down
26 changes: 14 additions & 12 deletions src/babelqueue/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def on_order_created(data, meta):
from . import dead_letter
from .codec import EnvelopeCodec
from .exceptions import UnknownUrnError
from .replay import HEADER_REPLAY_BYPASS, _replay_scope
from .routing import UnknownUrnStrategy
from .transport import ReceivedMessage, Transport, make_transport

Expand Down Expand Up @@ -116,18 +117,19 @@ def consume(

def dispatch(self, received: ReceivedMessage) -> None:
"""Route one reserved message to its handler and acknowledge it."""
envelope = EnvelopeCodec.decode(received.body)
urn = str(envelope.get("job") or envelope.get("urn") or "")
handler = self._handlers.get(urn) if urn else None

try:
if handler is None:
self._route_unknown(urn, received, envelope)
return
self._invoke(handler, envelope)
self.transport.ack(received)
except Exception as exc: # noqa: BLE001 - one bad message must not kill the loop
self._retry_or_dead_letter(received, envelope, exc)
with _replay_scope(bool(received.headers.get(HEADER_REPLAY_BYPASS))):
envelope = EnvelopeCodec.decode(received.body)
urn = str(envelope.get("job") or envelope.get("urn") or "")
handler = self._handlers.get(urn) if urn else None

try:
if handler is None:
self._route_unknown(urn, received, envelope)
return
self._invoke(handler, envelope)
self.transport.ack(received)
except Exception as exc: # noqa: BLE001 - one bad message must not kill the loop
self._retry_or_dead_letter(received, envelope, exc)

# -- Internals ----------------------------------------------------------

Expand Down
33 changes: 25 additions & 8 deletions src/babelqueue/redrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@
acknowledged only after a successful re-publish, and an undecodable body is restored, not
dropped.

Replay safety today is sandbox routing (``to_queue``) + ``dry_run``. The **Replay-Bypass**
guard — a ``bq-replay-bypass`` transport header surfaced to handlers so a replay can skip
external side-effects (don't re-charge, don't re-email) — is a documented phase two: like the
OpenTelemetry ``traceparent`` follow-up, it carries out-of-band metadata as a transport header
and so touches the runtime + every transport binding. Until then, sandbox routing is the
safe-replay answer.
Replay safety is sandbox routing (``to_queue``) + ``dry_run``, plus the **Replay-Bypass** guard
(``bypass=True``): it stamps a ``bq-replay-bypass`` transport header that the runtime surfaces to
handlers via :func:`babelqueue.is_replay` / :func:`babelqueue.bypass_external_effects`, so a
replay can skip external side-effects that already fired (don't re-charge, don't re-email) — see
:mod:`babelqueue.replay` (ADR-0027). The header rides out of band, so the envelope stays frozen;
it propagates over a real broker only once that broker's transport implements the optional
:class:`~babelqueue.transport.HeaderPublisher` capability (the in-memory transport does today).
"""

from __future__ import annotations
Expand All @@ -34,7 +35,8 @@
from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple

from .codec import EnvelopeCodec
from .transport import ReceivedMessage, Transport
from .replay import HEADER_REPLAY_BYPASS
from .transport import HeaderPublisher, ReceivedMessage, Transport

Envelope = Mapping[str, Any]
Select = Callable[[Envelope], bool]
Expand All @@ -51,6 +53,7 @@ class RedriveItem:
from_queue: str
to: str # target queue (the plan, even on a dry run; "" when skipped/undecodable)
redriven: bool # True only when actually re-published to ``to``
bypassed: bool = False # True when the bq-replay-bypass header was stamped on the message


@dataclass
Expand All @@ -70,6 +73,7 @@ def redrive(
max: int = 0,
dry_run: bool = False,
select: Optional[Select] = None,
bypass: bool = False,
timeout: float = 1.0,
) -> RedriveResult:
"""Move dead-lettered messages off ``dlq`` and replay them; see the module docstring."""
Expand Down Expand Up @@ -125,7 +129,9 @@ def redrive(
reset.pop("dead_letter", None)
reset["attempts"] = 0
try:
transport.publish(target, EnvelopeCodec.encode(reset))
item.bypassed = _publish_redriven(
transport, target, EnvelopeCodec.encode(reset), bypass
)
except Exception:
transport.publish(dlq, message.body) # restore on a publish failure, then surface
transport.ack(message)
Expand All @@ -150,6 +156,17 @@ def _decoded(body: str) -> Optional[Dict[str, Any]]:
return envelope


def _publish_redriven(transport: Transport, queue: str, body: str, bypass: bool) -> bool:
"""Re-publish a reset message to ``queue``. When ``bypass`` is set and the transport is a
:class:`~babelqueue.transport.HeaderPublisher`, stamp the ``bq-replay-bypass`` header and
return True; otherwise publish plainly and return False."""
if bypass and isinstance(transport, HeaderPublisher):
transport.publish_with_headers(queue, body, {HEADER_REPLAY_BYPASS: "1"})
return True
transport.publish(queue, body)
return False


def _source_queue_of(envelope: Envelope) -> str:
"""Default redrive target: ``dead_letter.original_queue``, falling back to ``meta.queue``."""
dead_letter = envelope.get("dead_letter")
Expand Down
74 changes: 74 additions & 0 deletions src/babelqueue/replay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Optional Replay-Bypass guard (ADR-0027): skip external side-effects on a deliberate replay.

The Python mirror of the Go ``replay.go``. It closes the loop left open by
:mod:`babelqueue.redrive`: a deliberate replay re-runs the handler, and its external
side-effects re-fire (a second charge, a duplicate email). :mod:`babelqueue.idempotency` stops
an *accidental* duplicate; this stops the *intended* reprocess from re-firing effects that
already happened.

``Redrive`` (with ``bypass=True``) stamps a ``bq-replay-bypass`` transport header on a redriven
message; the runtime surfaces it to the handler via a :class:`contextvars.ContextVar`. A handler
wraps its external, non-idempotent side in :func:`bypass_external_effects` so a replay re-runs the
idempotent core but skips effects that already fired::

from babelqueue import is_replay, bypass_external_effects

@app.handler("urn:babel:orders:created")
def on_order_created(data, meta):
save_order(data) # idempotent core — always runs
bypass_external_effects(lambda: send_email(data)) # external effect — skipped on replay

The marker rides **out of band** as a transport header, so the frozen envelope is untouched
(GR-1). It propagates over a real broker only once that broker's concrete transport implements
the optional :class:`~babelqueue.transport.HeaderPublisher` capability — a follow-up, like the
broker bindings; the in-memory transport supports it today, so the path is end-to-end testable.
"""

from __future__ import annotations

import contextlib
import contextvars
from typing import Callable, Iterator, Optional, TypeVar

#: The out-of-band transport header :func:`~babelqueue.redrive.redrive` stamps on a replayed
#: message (with ``bypass=True``) and that the runtime surfaces as :func:`is_replay`.
HEADER_REPLAY_BYPASS = "bq-replay-bypass"

_replay_var: contextvars.ContextVar[bool] = contextvars.ContextVar(
"babelqueue_replay", default=False
)

T = TypeVar("T")


def is_replay() -> bool:
"""Whether the message currently being handled was redriven with the replay-bypass marker.

True means this is a deliberate replay, so external side-effects that already happened should
be skipped. Reads the flag the runtime set on the context from the
:data:`HEADER_REPLAY_BYPASS` transport header.
"""
return _replay_var.get()


def bypass_external_effects(fn: Callable[[], T]) -> Optional[T]:
"""Run ``fn`` unless the current message is a replay (see :func:`is_replay`), in which case
skip it and return ``None``.

Wrap the external, non-idempotent side of a handler — sending an email, charging a card,
calling a third party — so a replay re-runs the idempotent core but does not re-fire effects
that already happened.
"""
if is_replay():
return None
return fn()


@contextlib.contextmanager
def _replay_scope(active: bool) -> Iterator[None]:
"""Internal: mark the current context as a replay (or not) for the span of one dispatch."""
token = _replay_var.set(active)
try:
yield
finally:
_replay_var.reset(token)
31 changes: 28 additions & 3 deletions src/babelqueue/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

from abc import ABC, abstractmethod
from collections import defaultdict, deque
from dataclasses import dataclass
from typing import Any, Deque, Dict, Optional
from dataclasses import dataclass, field
from typing import Any, Deque, Dict, Optional, Protocol, runtime_checkable

from .exceptions import BabelQueueError

Expand All @@ -21,6 +21,9 @@ class ReceivedMessage:
body: str
queue: str
handle: Any = None
#: Out-of-band transport headers a :class:`HeaderPublisher` carried with the message
#: (e.g. the ``bq-replay-bypass`` marker). Empty for transports that don't surface them.
headers: Dict[str, str] = field(default_factory=dict)


class Transport(ABC):
Expand All @@ -42,20 +45,42 @@ def close(self) -> None: # pragma: no cover - optional
"""Release any resources (override if needed)."""


@runtime_checkable
class HeaderPublisher(Protocol):
"""Optional :class:`Transport` capability: publish a body together with out-of-band
transport headers (e.g. the replay-bypass marker), for brokers that carry per-message
metadata. A transport that does not implement it simply does not propagate headers —
callers fall back to plain :meth:`Transport.publish` (ADR-0027)."""

def publish_with_headers(self, queue: str, body: str, headers: Dict[str, str]) -> None:
"""Append ``body`` to ``queue`` along with out-of-band ``headers``."""
...


class InMemoryTransport(Transport):
"""In-process transport for tests and broker-free local runs (``memory://``)."""

def __init__(self) -> None:
# Bodies and their out-of-band headers are kept in lockstep parallel deques, so the
# body storage layout stays a plain Deque[str].
self._queues: Dict[str, Deque[str]] = defaultdict(deque)
self._headers: Dict[str, Deque[Dict[str, str]]] = defaultdict(deque)

def publish(self, queue: str, body: str) -> None:
self._queues[queue].append(body)
self._headers[queue].append({})

def publish_with_headers(self, queue: str, body: str, headers: Dict[str, str]) -> None:
self._queues[queue].append(body)
self._headers[queue].append(dict(headers))

def pop(self, queue: str, timeout: float = 1.0) -> Optional[ReceivedMessage]:
dq = self._queues.get(queue)
if not dq:
return None
return ReceivedMessage(body=dq.popleft(), queue=queue)
return ReceivedMessage(
body=dq.popleft(), queue=queue, headers=self._headers[queue].popleft()
)

def ack(self, message: ReceivedMessage) -> None:
# Already removed on pop; nothing to do.
Expand Down
1 change: 1 addition & 0 deletions tests/test_redrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def test_redrive_to_source():
result = redrive(t, "orders.dlq")

assert result.redriven == 1 and result.skipped == 0
assert result.items[0].bypassed is False # bypass is off by default
got = _drain(t, "orders")
assert len(got) == 1
assert "dead_letter" not in got[0]
Expand Down
Loading
Loading