diff --git a/src/babelqueue/__init__.py b/src/babelqueue/__init__.py index 3163ea1..e6e726f 100644 --- a/src/babelqueue/__init__.py +++ b/src/babelqueue/__init__.py @@ -11,7 +11,7 @@ from __future__ import annotations -from . import dead_letter, idempotency +from . import dead_letter, idempotency, redrive from .app import BabelQueue from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec from .contracts import HasTraceId, PolyglotMessage @@ -37,6 +37,7 @@ "UnknownUrnError", "dead_letter", "idempotency", + "redrive", "IdempotencyStore", "InMemoryStore", "__version__", diff --git a/src/babelqueue/redrive.py b/src/babelqueue/redrive.py new file mode 100644 index 0000000..7fcc848 --- /dev/null +++ b/src/babelqueue/redrive.py @@ -0,0 +1,165 @@ +"""Optional DLQ redrive tooling (ADR-0026): safe replay off the dead-letter queue. + +The Python mirror of the Go ``Redrive``. It reads dead-lettered messages off a DLQ and +re-publishes each to its source queue (its ``dead_letter.original_queue``) or a chosen +``to_queue``, **reset for reprocessing**: the ``dead_letter`` block is removed and ``attempts`` +reset to 0, while ``job``, ``trace_id``, ``data`` and ``meta`` are preserved verbatim. It is +the operator-side counterpart to the runtime's dead-letter routing — the contract leaves +redrive to tooling, and this is that tool. + + from babelqueue import BabelQueue + from babelqueue.redrive import redrive + + app = BabelQueue("redis://localhost:6379/0") + result = redrive(app.transport, "orders.dlq") # back to each source + result = redrive(app.transport, "orders.dlq", to_queue="sandbox") # safe sandbox replay + plan = redrive(app.transport, "orders.dlq", dry_run=True) # inspect, change nothing + +Messages are drained from the DLQ first and then processed, so restored messages (skipped, +dry-run, or undecodable) are never re-encountered in the same run; a DLQ message is +acknowledged only after a successful re-publish, and an undecodable body is restored, not +dropped. + +Replay safety today is sandbox routing (``to_queue``) + ``dry_run``. The **Replay-Bypass** +guard — a ``bq-replay-bypass`` transport header surfaced to handlers so a replay can skip +external side-effects (don't re-charge, don't re-email) — is a documented phase two: like the +OpenTelemetry ``traceparent`` follow-up, it carries out-of-band metadata as a transport header +and so touches the runtime + every transport binding. Until then, sandbox routing is the +safe-replay answer. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple + +from .codec import EnvelopeCodec +from .transport import ReceivedMessage, Transport + +Envelope = Mapping[str, Any] +Select = Callable[[Envelope], bool] + + +@dataclass +class RedriveItem: + """What happened to one message during a redrive run.""" + + message_id: str + trace_id: str + urn: str + reason: str + from_queue: str + to: str # target queue (the plan, even on a dry run; "" when skipped/undecodable) + redriven: bool # True only when actually re-published to ``to`` + + +@dataclass +class RedriveResult: + """Summary of a redrive run.""" + + redriven: int = 0 + skipped: int = 0 + items: List[RedriveItem] = field(default_factory=list) + + +def redrive( + transport: Transport, + dlq: str, + *, + to_queue: Optional[str] = None, + max: int = 0, + dry_run: bool = False, + select: Optional[Select] = None, + timeout: float = 1.0, +) -> RedriveResult: + """Move dead-lettered messages off ``dlq`` and replay them; see the module docstring.""" + # Drain up to ``max`` messages (or all available) before processing any of them. + batch: List[Tuple[ReceivedMessage, Optional[Dict[str, Any]]]] = [] + while max == 0 or len(batch) < max: + message = transport.pop(dlq, timeout) + if message is None: + break + batch.append((message, _decoded(message.body))) + + result = RedriveResult() + for message, envelope in batch: + if envelope is None: + transport.publish(dlq, message.body) # restore the poison body; never drop it + transport.ack(message) + result.skipped += 1 + result.items.append(RedriveItem("", "", "", "", dlq, "", False)) + continue + + meta_raw = envelope.get("meta") + meta: Mapping[str, Any] = meta_raw if isinstance(meta_raw, Mapping) else {} + dl_raw = envelope.get("dead_letter") + dead_letter: Mapping[str, Any] = dl_raw if isinstance(dl_raw, Mapping) else {} + item = RedriveItem( + message_id=str(meta.get("id", "")), + trace_id=str(envelope.get("trace_id", "")), + urn=EnvelopeCodec.urn(envelope), + reason=str(dead_letter.get("reason", "")), + from_queue=dlq, + to="", + redriven=False, + ) + + if select is not None and not select(envelope): + transport.publish(dlq, message.body) # not selected: restore unchanged + transport.ack(message) + result.skipped += 1 + result.items.append(item) + continue + + target = to_queue or _source_queue_of(envelope) + item.to = target + + if dry_run: + transport.publish(dlq, message.body) # report the plan; restore unchanged + transport.ack(message) + result.skipped += 1 + result.items.append(item) + continue + + reset = dict(envelope) + reset.pop("dead_letter", None) + reset["attempts"] = 0 + try: + transport.publish(target, EnvelopeCodec.encode(reset)) + except Exception: + transport.publish(dlq, message.body) # restore on a publish failure, then surface + transport.ack(message) + raise + transport.ack(message) + item.redriven = True + result.redriven += 1 + result.items.append(item) + + return result + + +def _decoded(body: str) -> Optional[Dict[str, Any]]: + """Decode a DLQ body, or None when it is not a redrivable envelope. + + ``EnvelopeCodec.decode`` returns ``{}`` for malformed/non-object input; an object with no + string ``job`` is likewise not redrivable. + """ + envelope = EnvelopeCodec.decode(body) + if not envelope or not isinstance(envelope.get("job"), str): + return None + return envelope + + +def _source_queue_of(envelope: Envelope) -> str: + """Default redrive target: ``dead_letter.original_queue``, falling back to ``meta.queue``.""" + dead_letter = envelope.get("dead_letter") + if isinstance(dead_letter, Mapping): + original = dead_letter.get("original_queue") + if isinstance(original, str) and original: + return original + meta = envelope.get("meta") + if isinstance(meta, Mapping): + queue = meta.get("queue") + if isinstance(queue, str): + return queue + return "" diff --git a/tests/test_redrive.py b/tests/test_redrive.py new file mode 100644 index 0000000..659c6ce --- /dev/null +++ b/tests/test_redrive.py @@ -0,0 +1,139 @@ +"""Tests for the optional DLQ redrive tooling (ADR-0026).""" + +import pytest + +from babelqueue import dead_letter +from babelqueue.codec import EnvelopeCodec +from babelqueue.redrive import redrive +from babelqueue.transport import InMemoryTransport + + +def _dead_letter(transport, dlq, urn, original_queue, data=None): + env = EnvelopeCodec.make(urn, data or {}, queue=original_queue) + annotated = dead_letter.annotate(env, "failed", original_queue, 3, error="boom") + transport.publish(dlq, EnvelopeCodec.encode(annotated)) + return annotated + + +def _drain(transport, queue): + out = [] + while True: + message = transport.pop(queue, 0) + if message is None: + break + out.append(EnvelopeCodec.decode(message.body)) + transport.ack(message) + return out + + +def test_redrive_to_source(): + t = InMemoryTransport() + orig = _dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders", {"order_id": 1}) + + result = redrive(t, "orders.dlq") + + assert result.redriven == 1 and result.skipped == 0 + got = _drain(t, "orders") + assert len(got) == 1 + assert "dead_letter" not in got[0] + assert got[0]["attempts"] == 0 + assert got[0]["trace_id"] == orig["trace_id"] + assert got[0]["data"] == {"order_id": 1} + assert EnvelopeCodec.urn(got[0]) == "urn:babel:orders:created" + assert _drain(t, "orders.dlq") == [] + + +def test_redrive_to_sandbox(): + t = InMemoryTransport() + _dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders") + + result = redrive(t, "orders.dlq", to_queue="sandbox") + + assert result.redriven == 1 + assert _drain(t, "orders") == [] + assert len(_drain(t, "sandbox")) == 1 + + +def test_redrive_dry_run(): + t = InMemoryTransport() + _dead_letter(t, "orders.dlq", "urn:babel:orders:created", "orders") + + result = redrive(t, "orders.dlq", dry_run=True) + + assert result.redriven == 0 and result.skipped == 1 + assert result.items[0].to == "orders" + assert result.items[0].redriven is False + assert _drain(t, "orders") == [] + dlq = _drain(t, "orders.dlq") + assert len(dlq) == 1 and "dead_letter" in dlq[0] + + +def test_redrive_select(): + t = InMemoryTransport() + _dead_letter(t, "dlq", "urn:babel:orders:created", "orders") + _dead_letter(t, "dlq", "urn:babel:emails:welcome", "emails") + + result = redrive(t, "dlq", select=lambda e: EnvelopeCodec.urn(e) == "urn:babel:orders:created") + + assert result.redriven == 1 and result.skipped == 1 + assert len(_drain(t, "orders")) == 1 + assert _drain(t, "emails") == [] + assert len(_drain(t, "dlq")) == 1 # the unselected one is restored + + +def test_redrive_max(): + t = InMemoryTransport() + for _ in range(3): + _dead_letter(t, "dlq", "urn:babel:orders:created", "orders") + + result = redrive(t, "dlq", max=2) + + assert result.redriven == 2 + assert len(_drain(t, "dlq")) == 1 # Max respected + + +def test_redrive_no_dead_letter_falls_back_to_meta_queue(): + t = InMemoryTransport() + # a plain (never dead-lettered) envelope on the DLQ — redrive falls back to meta.queue + env = EnvelopeCodec.make("urn:babel:orders:created", {}, queue="orders") + t.publish("dlq", EnvelopeCodec.encode(env)) + + result = redrive(t, "dlq") + + assert result.redriven == 1 + assert len(_drain(t, "orders")) == 1 + + +class _FailOnTarget(InMemoryTransport): + """An in-memory transport that refuses to publish to one queue.""" + + def __init__(self, fail_queue): + super().__init__() + self._fail_queue = fail_queue + + def publish(self, queue, body): + if queue == self._fail_queue: + raise RuntimeError("publish refused") + super().publish(queue, body) + + +def test_redrive_publish_failure_restores(): + t = _FailOnTarget("orders") + _dead_letter(t, "dlq", "urn:babel:orders:created", "orders") + + with pytest.raises(RuntimeError): + redrive(t, "dlq") + + assert len(_drain(t, "dlq")) == 1 # restored to the DLQ, not lost + assert _drain(t, "orders") == [] # nothing reached the source queue + + +def test_redrive_undecodable_restored(): + t = InMemoryTransport() + t.publish("dlq", "not-json{{{") + + result = redrive(t, "dlq") + + assert result.redriven == 0 and result.skipped == 1 + message = t.pop("dlq", 0) + assert message is not None and message.body == "not-json{{{"