From 4739403bbf6a9c19de7194a6c4ff9346f1c19ec5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Thu, 18 Jun 2026 13:44:01 +0300 Subject: [PATCH] feat: optional idempotency (ADR-0022) and per-URN schema validation (ADR-0024) Opt-in, dependency-free helpers; wire envelope stays frozen. Includes the vendored payload_schema cross-SDK conformance cases. --- src/babelqueue/__init__.py | 6 +- src/babelqueue/exceptions.py | 14 ++ src/babelqueue/idempotency.py | 95 +++++++++++ src/babelqueue/schema.py | 261 +++++++++++++++++++++++++++++++ tests/conformance/manifest.json | 23 +++ tests/test_idempotency.py | 89 +++++++++++ tests/test_schema.py | 145 +++++++++++++++++ tests/test_schema_conformance.py | 33 ++++ 8 files changed, 665 insertions(+), 1 deletion(-) create mode 100644 src/babelqueue/idempotency.py create mode 100644 src/babelqueue/schema.py create mode 100644 tests/test_idempotency.py create mode 100644 tests/test_schema.py create mode 100644 tests/test_schema_conformance.py diff --git a/src/babelqueue/__init__.py b/src/babelqueue/__init__.py index 1e9c643..3163ea1 100644 --- a/src/babelqueue/__init__.py +++ b/src/babelqueue/__init__.py @@ -11,10 +11,11 @@ from __future__ import annotations -from . import dead_letter +from . import dead_letter, idempotency from .app import BabelQueue from .codec import SCHEMA_VERSION, SOURCE_LANG, EnvelopeCodec from .contracts import HasTraceId, PolyglotMessage +from .idempotency import IdempotencyStore, InMemoryStore from .exceptions import BabelQueueError, UnknownUrnError from .routing import UnknownUrnStrategy from .transport import InMemoryTransport, ReceivedMessage, Transport @@ -35,5 +36,8 @@ "BabelQueueError", "UnknownUrnError", "dead_letter", + "idempotency", + "IdempotencyStore", + "InMemoryStore", "__version__", ] diff --git a/src/babelqueue/exceptions.py b/src/babelqueue/exceptions.py index 8c9a092..a50e181 100644 --- a/src/babelqueue/exceptions.py +++ b/src/babelqueue/exceptions.py @@ -9,3 +9,17 @@ class BabelQueueError(Exception): class UnknownUrnError(BabelQueueError): """A consumed message carries a URN with no mapped handler (strategy "fail").""" + + +class InvalidPayloadError(BabelQueueError): + """A message's ``data`` does not match the JSON Schema registered for its URN (ADR-0024). + + Raised by the producer-side :func:`babelqueue.schema.validate` and the consumer-side + :func:`babelqueue.schema.wrap`; the latter lets the runtime redeliver (and eventually + dead-letter) a poison message. + """ + + def __init__(self, urn: str, violation: str) -> None: + super().__init__(f"data for {urn!r} does not match its URN schema: {violation}") + self.urn = urn + self.violation = violation diff --git a/src/babelqueue/idempotency.py b/src/babelqueue/idempotency.py new file mode 100644 index 0000000..fd29288 --- /dev/null +++ b/src/babelqueue/idempotency.py @@ -0,0 +1,95 @@ +"""Optional idempotency helper (ADR-0022): dedupe a consume handler on ``meta.id``. + +The Python mirror of the PHP ``BabelQueue\\Idempotency`` and Go ``idempotency`` +helpers. It wraps a handler so a message whose ``meta.id`` was already processed +successfully is skipped instead of run again, composing with the runtime's +ack-on-return / redeliver-on-raise contract:: + + from babelqueue import BabelQueue + from babelqueue.idempotency import InMemoryStore, wrap + + app = BabelQueue("redis://localhost:6379/0", queue="orders") + store = InMemoryStore() + app.register("urn:babel:orders:created", wrap(store, on_order_created)) + +A previously-seen id returns early (the runtime acks it, so the broker stops +redelivering); a raising handler leaves the id unmarked so a redelivery runs it again +(retry / dead-letter still apply); a message with no usable ``meta.id`` runs unchanged. +This is "seen-set" post-success dedupe — not exactly-once and not in-flight concurrency +locking; a transactional / outbox mode is a documented future direction. +""" + +from __future__ import annotations + +import functools +import threading +from typing import Any, Callable, Mapping, Protocol, runtime_checkable + +Handler = Callable[..., None] + + +@runtime_checkable +class IdempotencyStore(Protocol): + """A record of message ids already processed, keyed on ``meta.id``.""" + + def seen(self, message_id: str) -> bool: + """Whether this id has already been processed (remembered).""" + + def remember(self, message_id: str) -> None: + """Record this id as processed.""" + + def forget(self, message_id: str) -> None: + """Drop an id from the store (manual eviction; a backend may also expire ids).""" + + +class InMemoryStore: + """Process-local, thread-safe :class:`IdempotencyStore`. + + For tests and single-process consumers; it is not shared across workers and not + persistent — use a Redis- or database-backed store for production fleets. + """ + + def __init__(self) -> None: + self._seen: set[str] = set() + self._lock = threading.Lock() + + def seen(self, message_id: str) -> bool: + with self._lock: + return message_id in self._seen + + def remember(self, message_id: str) -> None: + with self._lock: + self._seen.add(message_id) + + def forget(self, message_id: str) -> None: + with self._lock: + self._seen.discard(message_id) + + +def wrap(store: IdempotencyStore, handler: Handler) -> Handler: + """Wrap ``handler`` so a message whose ``meta.id`` was already processed is skipped. + + The returned callable keeps ``handler``'s signature (via :func:`functools.wraps`), + so the runtime's introspection still passes it the right number of positional args + (``data, meta`` or ``data, meta, envelope``). + """ + + @functools.wraps(handler) + def wrapped(*args: Any) -> None: + meta = args[1] if len(args) > 1 and isinstance(args[1], Mapping) else {} + message_id = meta.get("id") + + # No usable id → cannot dedupe; run the handler unchanged. + if not isinstance(message_id, str) or message_id == "": + handler(*args) + return + + # Already processed on an earlier delivery: return so the runtime acks it. + if store.seen(message_id): + return + + # First success wins; a raise here leaves the id unmarked → retry/DLQ apply. + handler(*args) + store.remember(message_id) + + return wrapped diff --git a/src/babelqueue/schema.py b/src/babelqueue/schema.py new file mode 100644 index 0000000..5f00a08 --- /dev/null +++ b/src/babelqueue/schema.py @@ -0,0 +1,261 @@ +"""Optional per-URN payload schema validation (ADR-0024). + +The Python mirror of the Go ``schema`` package and PHP ``BabelQueue\\Schema``. A +:class:`SchemaProvider` supplies a JSON Schema for a message URN — typically read from a +babelqueue-registry ``registry.json`` — and the message's ``data`` is validated against it. +It is opt-in: a URN with no registered schema is never validated. + +- **Producer-side (recommended):** call :func:`validate` before publishing so invalid data + never enters the queue, or :func:`check` to branch without raising:: + + from babelqueue.schema import MapProvider, validate + + provider = MapProvider.from_json({"urn:babel:orders:created": ORDERS_SCHEMA_JSON}) + validate(provider, "urn:babel:orders:created", {"order_id": 7}) # raises on mismatch + +- **Consumer-side (safety net):** wrap a handler with :func:`wrap`. Because a Python handler + receives ``data`` (and ``meta``, ``envelope``) positionally rather than a message object, + the URN is passed explicitly — usually the same URN you register under:: + + app.register(URN, wrap(provider, URN, on_order_created)) + + Invalid data raises :class:`~babelqueue.exceptions.InvalidPayloadError`, so the runtime + redelivers (and eventually dead-letters) the poison message; a URN with no schema runs the + handler unchanged. + +The validator is a small subset of JSON Schema (draft-07) whose verdicts match the Go and +PHP validators and babelqueue-registry's ``compat`` linter: ``type``, ``required``, +``properties``, ``additionalProperties``, ``items``, ``enum``, ``const``, ``minLength``, +``minimum``. Unknown keywords are ignored. Zero dependencies (stdlib only). +""" + +from __future__ import annotations + +import functools +import json +import os +import threading +from typing import Any, Callable, Mapping, Optional, Protocol, runtime_checkable + +from .exceptions import InvalidPayloadError + +Handler = Callable[..., None] + + +@runtime_checkable +class SchemaProvider(Protocol): + """A source of per-URN ``data`` schemas, keyed on the message URN.""" + + def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]: + """The JSON Schema registered for ``urn``, or None when none is registered.""" + + +class MapProvider: + """In-memory :class:`SchemaProvider`, for tests and for embedding schemas in code.""" + + def __init__(self, schemas: Mapping[str, Mapping[str, Any]]) -> None: + self._schemas: dict[str, Mapping[str, Any]] = dict(schemas) + + @classmethod + def from_json(cls, raw: Mapping[str, str]) -> "MapProvider": + """Build a provider from URN -> raw JSON Schema strings, decoding each.""" + schemas: dict[str, Mapping[str, Any]] = {} + for urn, body in raw.items(): + decoded = json.loads(body) + if not isinstance(decoded, dict): + raise ValueError(f"schema: invalid JSON schema for {urn!r}") + schemas[urn] = decoded + return cls(schemas) + + def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]: + return self._schemas.get(urn) + + +class DirProvider: + """Reads schemas from a babelqueue-registry manifest (``registry.json``): a list of + ``{urn, schema}`` entries mapping each URN to a schema file for its ``data`` block. The + bridge that makes the registry's governed schemas enforceable at runtime. Schema files + are read and decoded lazily and cached (thread-safe). A URN absent from the manifest + returns None (skip); a URN whose schema file is missing raises (config/IO error).""" + + def __init__(self, manifest_path: str) -> None: + with open(manifest_path, "r", encoding="utf-8") as fh: + manifest = json.load(fh) + if not isinstance(manifest, dict): + raise ValueError(f"schema: invalid registry manifest {manifest_path!r}") + + self._dir = os.path.dirname(manifest_path) + self._files: dict[str, str] = {} + self._cache: dict[str, Mapping[str, Any]] = {} + self._lock = threading.Lock() + + entries = manifest.get("schemas") or [] + if isinstance(entries, list): + for entry in entries: + if not isinstance(entry, Mapping): + continue + urn = entry.get("urn") + file = entry.get("schema") + if isinstance(urn, str) and urn and isinstance(file, str) and file: + self._files[urn] = file + + def schema_for(self, urn: str) -> Optional[Mapping[str, Any]]: + with self._lock: + cached = self._cache.get(urn) + if cached is not None: + return cached + file = self._files.get(urn) + if file is None: + return None + path = file if os.path.isabs(file) else os.path.join(self._dir, file) + with open(path, "r", encoding="utf-8") as fh: + decoded = json.load(fh) + if not isinstance(decoded, dict): + raise ValueError(f"schema: invalid schema for {urn!r} ({file})") + self._cache[urn] = decoded + return decoded + + +def check(provider: SchemaProvider, urn: str, data: Mapping[str, Any]) -> Optional[str]: + """The first ``data`` violation for ``(urn, data)``, or None when it is valid or when no + schema is registered for the URN (opt-in). Non-raising; for producer-side branching.""" + schema = provider.schema_for(urn) + if schema is None: + return None + return validate_schema(schema, dict(data)) + + +def validate(provider: SchemaProvider, urn: str, data: Mapping[str, Any]) -> None: + """Validate ``(urn, data)`` against its registered schema, raising otherwise. The + producer-side guard; call it before publishing. + + :raises InvalidPayloadError: when the data does not match the URN's schema. + """ + violation = check(provider, urn, data) + if violation is not None: + raise InvalidPayloadError(urn, violation) + + +def wrap(provider: SchemaProvider, urn: str, handler: Handler) -> Handler: + """Wrap a consume handler so each message's ``data`` is validated against ``urn``'s + schema before the handler runs. The returned callable keeps ``handler``'s signature (via + :func:`functools.wraps`), so the runtime still passes it the right number of positional + args (``data, meta`` or ``data, meta, envelope``).""" + + @functools.wraps(handler) + def wrapped(*args: Any) -> None: + data = args[0] if args and isinstance(args[0], Mapping) else {} + validate(provider, urn, data) + handler(*args) + + return wrapped + + +def validate_schema(schema: Mapping[str, Any], value: Any, path: str = "") -> Optional[str]: + """The first violation of ``value`` against a (subset) JSON Schema node, or None.""" + if "const" in schema and not _equal(value, schema["const"]): + return _violation(path, "wrong_const") + + enum = schema.get("enum") + if isinstance(enum, list) and not any(_equal(value, item) for item in enum): + return _violation(path, "not_in_enum") + + typ = schema.get("type") + if typ == "object": + return _check_object(schema, value, path) + if typ == "array": + return _check_array(schema, value, path) + if typ == "string": + if not isinstance(value, str): + return _violation(path, "not_a_string") + min_len = schema.get("minLength") + if isinstance(min_len, (int, float)) and len(value) < int(min_len): + return _violation(path, "below_min_length") + return None + if typ == "integer": + if not _is_integer(value): + return _violation(path, "not_an_integer") + return _check_minimum(schema, value, path) + if typ == "number": + if not _is_number(value): + return _violation(path, "not_a_number") + return _check_minimum(schema, value, path) + if typ == "boolean": + return None if isinstance(value, bool) else _violation(path, "not_a_boolean") + if typ == "null": + return None if value is None else _violation(path, "not_null") + return None + + +def _check_object(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]: + if not isinstance(value, Mapping): + return _violation(path, "not_an_object") + + required = schema.get("required") + if isinstance(required, list): + for key in required: + if isinstance(key, str) and key not in value: + return _violation(_join(path, key), "missing_required") + + properties = schema.get("properties") + properties = properties if isinstance(properties, Mapping) else {} + additional_allowed = schema.get("additionalProperties") is not False + + for key, item in value.items(): + name = str(key) + prop = properties.get(name) + if isinstance(prop, Mapping): + violation = validate_schema(prop, item, _join(path, name)) + if violation is not None: + return violation + continue + if not additional_allowed: + return _violation(_join(path, name), "additional_not_allowed") + + return None + + +def _check_array(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]: + if not isinstance(value, list): + return _violation(path, "not_an_array") + items = schema.get("items") + if not isinstance(items, Mapping): + return None + for i, item in enumerate(value): + violation = validate_schema(items, item, f"{path}[{i}]") + if violation is not None: + return violation + return None + + +def _check_minimum(schema: Mapping[str, Any], value: Any, path: str) -> Optional[str]: + minimum = schema.get("minimum") + if isinstance(minimum, (int, float)) and not isinstance(minimum, bool) and float(value) < float(minimum): + return _violation(path, "below_minimum") + return None + + +def _is_integer(value: Any) -> bool: + if isinstance(value, bool): + return False + if isinstance(value, int): + return True + return isinstance(value, float) and value.is_integer() + + +def _is_number(value: Any) -> bool: + return isinstance(value, (int, float)) and not isinstance(value, bool) + + +def _equal(a: Any, b: Any) -> bool: + # Type-aware equality so True != 1 and an integer const never matches a float value, + # matching the strict comparisons in the Go and PHP validators. + return type(a) is type(b) and a == b + + +def _violation(path: str, reason: str) -> str: + return f"{path or ''}: {reason}" + + +def _join(path: str, key: str) -> str: + return key if path == "" else f"{path}.{key}" diff --git a/tests/conformance/manifest.json b/tests/conformance/manifest.json index 78e5c3a..5b2fee4 100644 --- a/tests/conformance/manifest.json +++ b/tests/conformance/manifest.json @@ -226,5 +226,28 @@ "x-attempts": 0 } } + }, + "payload_schema": { + "description": "Per-URN data schema validation (ADR-0024). Each case validates `data` against `schema`; every SDK's optional payload validator (Go schema, PHP BabelQueue\\Schema, Python babelqueue.schema) MUST agree on `valid`. The wire envelope stays frozen — this governs the data block only, and is opt-in (consumers/producers without a registered schema skip it).", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["order_id"], + "properties": { + "order_id": { "type": "integer", "minimum": 1 }, + "amount": { "type": "number", "minimum": 0 }, + "currency": { "enum": ["USD", "EUR", "TRY"] } + }, + "additionalProperties": false + }, + "cases": [ + { "name": "valid-minimal", "valid": true, "data": { "order_id": 1042 } }, + { "name": "valid-full", "valid": true, "data": { "order_id": 1042, "amount": 99.9, "currency": "USD" } }, + { "name": "invalid-missing-required", "valid": false, "data": { "amount": 10 } }, + { "name": "invalid-wrong-type", "valid": false, "data": { "order_id": "x" } }, + { "name": "invalid-additional-property", "valid": false, "data": { "order_id": 1, "extra": true } }, + { "name": "invalid-enum", "valid": false, "data": { "order_id": 1, "currency": "GBP" } }, + { "name": "invalid-below-minimum", "valid": false, "data": { "order_id": 0 } } + ] } } diff --git a/tests/test_idempotency.py b/tests/test_idempotency.py new file mode 100644 index 0000000..396f40e --- /dev/null +++ b/tests/test_idempotency.py @@ -0,0 +1,89 @@ +from __future__ import annotations + +import unittest +from typing import Any, Dict, List + +from babelqueue.app import _handler_wants_envelope +from babelqueue.idempotency import InMemoryStore, wrap + + +class IdempotencyTest(unittest.TestCase): + def test_runs_and_remembers_on_first_delivery(self) -> None: + store = InMemoryStore() + calls: List[str] = [] + + def handler(data: Dict[str, Any], meta: Dict[str, Any]) -> None: + calls.append(meta["id"]) + + wrapped = wrap(store, handler) + wrapped({}, {"id": "m1"}) + + self.assertEqual(calls, ["m1"]) + self.assertTrue(store.seen("m1")) + + def test_skips_redelivery_of_same_id(self) -> None: + store = InMemoryStore() + calls: List[int] = [] + + wrapped = wrap(store, lambda data, meta: calls.append(1)) + wrapped({}, {"id": "m1"}) + wrapped({}, {"id": "m1"}) # redelivery → skipped + + self.assertEqual(len(calls), 1) + + def test_runs_again_for_a_different_id(self) -> None: + store = InMemoryStore() + calls: List[int] = [] + + wrapped = wrap(store, lambda data, meta: calls.append(1)) + wrapped({}, {"id": "m1"}) + wrapped({}, {"id": "m2"}) + + self.assertEqual(len(calls), 2) + + def test_does_not_remember_when_handler_raises(self) -> None: + store = InMemoryStore() + calls: List[int] = [] + + def boom(data: Dict[str, Any], meta: Dict[str, Any]) -> None: + calls.append(1) + raise RuntimeError("boom") + + wrapped = wrap(store, boom) + with self.assertRaises(RuntimeError): + wrapped({}, {"id": "m1"}) + self.assertFalse(store.seen("m1")) + + # A redelivery runs the handler again — retry works. + with self.assertRaises(RuntimeError): + wrapped({}, {"id": "m1"}) + self.assertEqual(len(calls), 2) + + def test_runs_when_no_usable_id(self) -> None: + store = InMemoryStore() + calls: List[int] = [] + + wrapped = wrap(store, lambda data, meta: calls.append(1)) + wrapped({}, {"id": ""}) # empty id → cannot dedupe → runs + wrapped({}, {}) # no id at all → runs + + self.assertEqual(len(calls), 2) + + def test_preserves_handler_arity_for_the_runtime(self) -> None: + # functools.wraps keeps inspect.signature transparent, so the runtime still + # passes the wrapped handler the right number of positional args. + store = InMemoryStore() + self.assertTrue(_handler_wants_envelope(wrap(store, lambda d, m, e: None))) + self.assertFalse(_handler_wants_envelope(wrap(store, lambda d, m: None))) + + def test_forget_removes_a_remembered_id(self) -> None: + store = InMemoryStore() + store.remember("m1") + self.assertTrue(store.seen("m1")) + + store.forget("m1") + self.assertFalse(store.seen("m1")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_schema.py b/tests/test_schema.py new file mode 100644 index 0000000..9bc79b5 --- /dev/null +++ b/tests/test_schema.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +import json +import os +import tempfile +import unittest +from typing import List + +from babelqueue.exceptions import InvalidPayloadError +from babelqueue.schema import ( + DirProvider, + MapProvider, + check, + validate, + validate_schema, + wrap, +) + +ORDERS = ( + '{"type":"object","required":["order_id"],' + '"properties":{"order_id":{"type":"integer"}},"additionalProperties":false}' +) + + +class ValidatorTest(unittest.TestCase): + def test_object_required_types_and_additional(self) -> None: + schema = json.loads( + '{"type":"object","required":["order_id"],' + '"properties":{"order_id":{"type":"integer"},"note":{"type":"string","minLength":1}},' + '"additionalProperties":false}' + ) + self.assertIsNone(validate_schema(schema, {"order_id": 7})) + self.assertIsNotNone(validate_schema(schema, {})) + self.assertIsNotNone(validate_schema(schema, {"order_id": "x"})) + self.assertIsNotNone(validate_schema(schema, {"order_id": 7, "extra": 1})) + self.assertIsNotNone(validate_schema(schema, {"order_id": 7, "note": ""})) + + def test_enum_minimum_and_array_items(self) -> None: + schema = json.loads( + '{"type":"object","properties":{"status":{"enum":["new","paid"]},' + '"qty":{"type":"integer","minimum":1},' + '"tags":{"type":"array","items":{"type":"string"}}}}' + ) + self.assertIsNone(validate_schema(schema, {"status": "paid", "qty": 2, "tags": ["a", "b"]})) + self.assertIsNotNone(validate_schema(schema, {"status": "cancelled"})) + self.assertIsNotNone(validate_schema(schema, {"qty": 0})) + self.assertIsNotNone(validate_schema(schema, {"tags": ["a", 1]})) + + def test_scalar_types(self) -> None: + cases = [ + ('{"type":"boolean"}', True, True), + ('{"type":"boolean"}', "x", False), + ('{"type":"null"}', None, True), + ('{"type":"null"}', 1, False), + ('{"type":"number","minimum":0.5}', 0.6, True), + ('{"type":"number","minimum":0.5}', 0.4, False), + ('{"type":"number"}', "x", False), + ('{"type":"string"}', 5, False), + ('{"type":"integer"}', 1.0, True), + ('{"type":"integer"}', 1.5, False), + ('{"type":"integer"}', True, False), # bool is not an integer + ('{"const":"v1"}', "v1", True), + ('{"const":"v1"}', "v2", False), + ] + for src, value, valid in cases: + schema = json.loads(src) + violation = validate_schema(schema, value) + self.assertEqual(valid, violation is None, f"{src} / {value!r}: {violation}") + + +class ProviderTest(unittest.TestCase): + def test_map_provider_from_json(self) -> None: + provider = MapProvider.from_json({"urn:babel:orders:created": ORDERS}) + self.assertIsNotNone(provider.schema_for("urn:babel:orders:created")) + self.assertIsNone(provider.schema_for("urn:babel:unknown")) + + def test_map_provider_invalid_json(self) -> None: + with self.assertRaises(ValueError): + MapProvider.from_json({"u": "not json"}) + + def test_dir_provider_reads_registry_lazily(self) -> None: + with tempfile.TemporaryDirectory() as d: + os.makedirs(os.path.join(d, "schemas")) + with open(os.path.join(d, "schemas", "orders.json"), "w", encoding="utf-8") as fh: + fh.write('{"type":"object","required":["order_id"],"properties":{"order_id":{"type":"integer"}}}') + with open(os.path.join(d, "registry.json"), "w", encoding="utf-8") as fh: + fh.write('{"schemas":[{"urn":"urn:babel:orders:created","schema":"schemas/orders.json"},{"urn":"","schema":"x"}]}') + + provider = DirProvider(os.path.join(d, "registry.json")) + for _ in range(2): # the second call hits the cache + self.assertIsNotNone(provider.schema_for("urn:babel:orders:created")) + self.assertIsNone(provider.schema_for("urn:babel:unknown")) + + def test_dir_provider_missing_manifest(self) -> None: + with self.assertRaises(OSError): + DirProvider(os.path.join(tempfile.gettempdir(), "nope_registry_xyz.json")) + + def test_dir_provider_missing_schema_file(self) -> None: + with tempfile.TemporaryDirectory() as d: + with open(os.path.join(d, "registry.json"), "w", encoding="utf-8") as fh: + fh.write('{"schemas":[{"urn":"u","schema":"missing.json"}]}') + provider = DirProvider(os.path.join(d, "registry.json")) + with self.assertRaises(OSError): + provider.schema_for("u") + + +class FacadeTest(unittest.TestCase): + def _provider(self) -> MapProvider: + return MapProvider.from_json({"urn:babel:orders:created": ORDERS}) + + def test_check_valid_invalid_unregistered(self) -> None: + provider = self._provider() + self.assertIsNone(check(provider, "urn:babel:orders:created", {"order_id": 1})) + self.assertIsNone(check(provider, "urn:babel:unknown", {"x": 1})) # opt-in + self.assertIsNotNone(check(provider, "urn:babel:orders:created", {})) + + def test_validate_raises_on_invalid(self) -> None: + with self.assertRaises(InvalidPayloadError): + validate(self._provider(), "urn:babel:orders:created", {"order_id": "x"}) + + def test_validate_passes_unregistered(self) -> None: + validate(self._provider(), "urn:babel:unknown", {"anything": True}) # no raise + + def test_wrap_runs_handler_on_valid(self) -> None: + calls: List[int] = [] + wrapped = wrap(self._provider(), "urn:babel:orders:created", lambda data, meta: calls.append(1)) + wrapped({"order_id": 1}, {"id": "m1"}) + self.assertEqual(calls, [1]) + + def test_wrap_raises_and_skips_on_invalid(self) -> None: + calls: List[int] = [] + wrapped = wrap(self._provider(), "urn:babel:orders:created", lambda data, meta: calls.append(1)) + with self.assertRaises(InvalidPayloadError): + wrapped({}, {"id": "m1"}) + self.assertEqual(calls, []) + + def test_wrap_runs_handler_for_unregistered_urn(self) -> None: + calls: List[int] = [] + wrapped = wrap(self._provider(), "urn:babel:unknown", lambda data, meta: calls.append(1)) + wrapped({"anything": True}, {"id": "m1"}) + self.assertEqual(calls, [1]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_schema_conformance.py b/tests/test_schema_conformance.py new file mode 100644 index 0000000..4c0a1b8 --- /dev/null +++ b/tests/test_schema_conformance.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +import json +import unittest +from pathlib import Path + +from babelqueue.schema import validate_schema + +MANIFEST = Path(__file__).parent / "conformance" / "manifest.json" + + +class PayloadConformanceTest(unittest.TestCase): + """Runs the shared cross-SDK payload-schema cases (ADR-0024) from the vendored conformance + suite: this validator must agree with the Go and PHP ones on each case's ``valid`` flag.""" + + def test_payload_cases_match_across_sdks(self) -> None: + if not MANIFEST.is_file(): + self.skipTest("vendored conformance suite not present") + manifest = json.loads(MANIFEST.read_text(encoding="utf-8")) + section = manifest.get("payload_schema") + if not isinstance(section, dict): + self.skipTest("manifest has no payload_schema section") + + schema = section["schema"] + cases = section["cases"] + self.assertTrue(cases) + for case in cases: + valid = validate_schema(schema, case["data"]) is None + self.assertEqual(case["valid"], valid, f"case {case['name']}") + + +if __name__ == "__main__": + unittest.main()