From e420e35ab242caee9dbc01afcf394c0ccdee593f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Thu, 18 Jun 2026 13:44:06 +0300 Subject: [PATCH] feat: optional idempotency (ADR-0022) and per-URN schema validation (ADR-0024) Opt-in, dependency-free helpers; wire envelope stays frozen. Includes the vendored payload_schema cross-SDK conformance cases. --- src/BabelQueue.Core/Idempotency.cs | 99 +++++++++ src/BabelQueue.Core/Schema/ISchemaProvider.cs | 16 ++ .../Schema/InvalidPayloadException.cs | 26 +++ src/BabelQueue.Core/Schema/MapProvider.cs | 31 +++ .../Schema/PayloadValidator.cs | 209 ++++++++++++++++++ src/BabelQueue.Core/Schema/SchemaJson.cs | 66 ++++++ .../Schema/SchemaValidation.cs | 53 +++++ .../BabelQueue.Core.Tests/IdempotencyTests.cs | 114 ++++++++++ .../PayloadSchemaConformanceTests.cs | 41 ++++ .../SchemaValidationTests.cs | 104 +++++++++ .../conformance/manifest.json | 23 ++ 11 files changed, 782 insertions(+) create mode 100644 src/BabelQueue.Core/Idempotency.cs create mode 100644 src/BabelQueue.Core/Schema/ISchemaProvider.cs create mode 100644 src/BabelQueue.Core/Schema/InvalidPayloadException.cs create mode 100644 src/BabelQueue.Core/Schema/MapProvider.cs create mode 100644 src/BabelQueue.Core/Schema/PayloadValidator.cs create mode 100644 src/BabelQueue.Core/Schema/SchemaJson.cs create mode 100644 src/BabelQueue.Core/Schema/SchemaValidation.cs create mode 100644 tests/BabelQueue.Core.Tests/IdempotencyTests.cs create mode 100644 tests/BabelQueue.Core.Tests/PayloadSchemaConformanceTests.cs create mode 100644 tests/BabelQueue.Core.Tests/SchemaValidationTests.cs diff --git a/src/BabelQueue.Core/Idempotency.cs b/src/BabelQueue.Core/Idempotency.cs new file mode 100644 index 0000000..3708a74 --- /dev/null +++ b/src/BabelQueue.Core/Idempotency.cs @@ -0,0 +1,99 @@ +namespace BabelQueue; + +/// +/// A consume handler: processes one decoded . May be async; a +/// thrown/faulted handler leaves the message unacknowledged so the runtime redelivers it +/// (the core is codec-only, so an adapter drives the actual consume loop). +/// +public delegate Task Handler(Envelope envelope); + +/// +/// A pluggable record of message ids already processed, keyed on the envelope's +/// meta.id. The reference is for tests / single-process +/// consumers; production backends (Redis, a database table) implement the same three +/// methods. "Seen-set" post-success dedupe — not exactly-once, not in-flight locking; a +/// transactional / outbox mode is a documented future direction (ADR-0022). +/// +public interface IIdempotencyStore +{ + /// Whether this message id has already been processed (remembered). + bool Seen(string messageId); + + /// Records this message id as processed. + void Remember(string messageId); + + /// Drops an id from the store (manual eviction; a backend may also expire ids). + void Forget(string messageId); +} + +/// +/// Process-local, thread-safe backed by a set. For tests +/// and single-process consumers; not shared across workers and not persistent — use a +/// Redis- or database-backed store for production fleets. +/// +public sealed class InMemoryStore : IIdempotencyStore +{ + private readonly HashSet _ids = new(); + private readonly object _gate = new(); + + /// + public bool Seen(string messageId) + { + lock (_gate) + { + return _ids.Contains(messageId); + } + } + + /// + public void Remember(string messageId) + { + lock (_gate) + { + _ids.Add(messageId); + } + } + + /// + public void Forget(string messageId) + { + lock (_gate) + { + _ids.Remove(messageId); + } + } +} + +/// +/// Wraps a so a message whose meta.id was already processed +/// successfully is skipped (ADR-0022) — the .NET mirror of the PHP, Go, Python, and Node +/// helpers. A previously-seen id returns early (so an adapter acks it); a thrown/faulted +/// handler leaves the id unmarked so a redelivery runs it again; a message with no usable +/// meta.id runs unchanged. +/// +public static class Idempotency +{ + /// Returns guarded by dedupe on meta.id. + public static Handler Wrap(IIdempotencyStore store, Handler handler) => + async envelope => + { + string? id = envelope.Meta?.Id; + + // No usable id → cannot dedupe; run the handler unchanged. + if (string.IsNullOrEmpty(id)) + { + await handler(envelope).ConfigureAwait(false); + return; + } + + // Already processed on an earlier delivery: return so the adapter acks it. + if (store.Seen(id)) + { + return; + } + + // First success wins; a throw here leaves the id unmarked → retry/DLQ apply. + await handler(envelope).ConfigureAwait(false); + store.Remember(id); + }; +} diff --git a/src/BabelQueue.Core/Schema/ISchemaProvider.cs b/src/BabelQueue.Core/Schema/ISchemaProvider.cs new file mode 100644 index 0000000..2c0db43 --- /dev/null +++ b/src/BabelQueue.Core/Schema/ISchemaProvider.cs @@ -0,0 +1,16 @@ +using System.Collections.Generic; + +namespace BabelQueue.Schema; + +/// +/// A source of per-URN data schemas, keyed on the message URN (ADR-0024). Returns the +/// decoded JSON Schema for a URN's data block, or null when none is registered — +/// in which case the caller skips validation (the feature is opt-in). The reference +/// is in-memory; the I/O-free core ships no file-based provider, so a +/// .NET app reads its babelqueue-registry registry.json and passes the schemas in. +/// +public interface ISchemaProvider +{ + /// The decoded JSON Schema registered for , or null. + IReadOnlyDictionary? SchemaFor(string urn); +} diff --git a/src/BabelQueue.Core/Schema/InvalidPayloadException.cs b/src/BabelQueue.Core/Schema/InvalidPayloadException.cs new file mode 100644 index 0000000..10a7f00 --- /dev/null +++ b/src/BabelQueue.Core/Schema/InvalidPayloadException.cs @@ -0,0 +1,26 @@ +using BabelQueue; + +namespace BabelQueue.Schema; + +/// +/// Raised when a message's data does not match the JSON Schema registered for its URN +/// (ADR-0024). The consumer-side throws it so the adapter +/// redelivers (and eventually dead-letters) a poison message; the recommended primary use is +/// producer-side () so invalid data never enters the queue. +/// +public sealed class InvalidPayloadException : BabelQueueException +{ + /// Create the exception for a URN whose data violated its schema. + public InvalidPayloadException(string urn, string violation) + : base($"Message data for \"{urn}\" does not match its URN schema: {violation}.") + { + Urn = urn; + Violation = violation; + } + + /// The message URN whose schema was violated. + public string Urn { get; } + + /// The first "<json-pointer>: <reason>" mismatch. + public string Violation { get; } +} diff --git a/src/BabelQueue.Core/Schema/MapProvider.cs b/src/BabelQueue.Core/Schema/MapProvider.cs new file mode 100644 index 0000000..962f559 --- /dev/null +++ b/src/BabelQueue.Core/Schema/MapProvider.cs @@ -0,0 +1,31 @@ +using System.Collections.Generic; + +namespace BabelQueue.Schema; + +/// In-memory , for tests and for embedding schemas in code. +public sealed class MapProvider : ISchemaProvider +{ + private readonly Dictionary> _schemas; + + /// Build a provider from URN to already-decoded JSON Schema maps. + public MapProvider(IReadOnlyDictionary> schemas) + { + _schemas = new Dictionary>(schemas); + } + + /// Build a provider from URN to raw JSON Schema strings, parsing each. + public static MapProvider FromJson(IReadOnlyDictionary raw) + { + var schemas = new Dictionary>(); + foreach (var entry in raw) + { + schemas[entry.Key] = SchemaJson.ParseObject(entry.Value); + } + + return new MapProvider(schemas); + } + + /// + public IReadOnlyDictionary? SchemaFor(string urn) => + _schemas.TryGetValue(urn, out var schema) ? schema : null; +} diff --git a/src/BabelQueue.Core/Schema/PayloadValidator.cs b/src/BabelQueue.Core/Schema/PayloadValidator.cs new file mode 100644 index 0000000..e1093e2 --- /dev/null +++ b/src/BabelQueue.Core/Schema/PayloadValidator.cs @@ -0,0 +1,209 @@ +using System; +using System.Collections.Generic; + +namespace BabelQueue.Schema; + +/// +/// Validates a message's data block against a per-URN JSON Schema (ADR-0024). A +/// hand-rolled subset of Draft-07 (zero dependencies) whose verdicts match the Go, PHP, +/// Python, Node and Java validators and babelqueue-registry's compat linter. Supported +/// keywords: type, required, properties, additionalProperties, +/// items, enum, const, minLength, minimum; unknown keywords +/// are ignored. It works on the materialized object? tree the codec produces. +/// +public static class PayloadValidator +{ + private static readonly IReadOnlyDictionary EmptyProps = + new Dictionary(); + + /// + /// The first violation of against as + /// "<json-pointer>: <reason>", or null when it conforms. + /// + public static string? Validate(IReadOnlyDictionary schema, object? value) => + ValidateNode(schema, value, string.Empty); + + private static string? ValidateNode(IReadOnlyDictionary schema, object? value, string path) + { + if (schema.TryGetValue("const", out var constValue) && !Equal(value, constValue)) + { + return Violation(path, "wrong_const"); + } + + if (schema.TryGetValue("enum", out var enumObj) + && enumObj is IReadOnlyList enumValues + && !Contains(enumValues, value)) + { + return Violation(path, "not_in_enum"); + } + + var type = schema.TryGetValue("type", out var t) && t is string s ? s : string.Empty; + return type switch + { + "object" => CheckObject(schema, value, path), + "array" => CheckArray(schema, value, path), + "string" => CheckString(schema, value, path), + "integer" => IsInteger(value) ? CheckMinimum(schema, value, path) : Violation(path, "not_an_integer"), + "number" => IsNumber(value) ? CheckMinimum(schema, value, path) : Violation(path, "not_a_number"), + "boolean" => value is bool ? null : Violation(path, "not_a_boolean"), + "null" => value is null ? null : Violation(path, "not_null"), + _ => null, + }; + } + + private static string? CheckObject(IReadOnlyDictionary schema, object? value, string path) + { + if (value is not IReadOnlyDictionary obj) + { + return Violation(path, "not_an_object"); + } + + if (schema.TryGetValue("required", out var req) && req is IReadOnlyList required) + { + foreach (var key in required) + { + if (key is string name && !obj.ContainsKey(name)) + { + return Violation(Join(path, name), "missing_required"); + } + } + } + + var properties = schema.TryGetValue("properties", out var p) + && p is IReadOnlyDictionary props + ? props + : EmptyProps; + var additionalAllowed = !(schema.TryGetValue("additionalProperties", out var ap) && ap is false); + + foreach (var member in obj) + { + if (properties.TryGetValue(member.Key, out var ps) && ps is IReadOnlyDictionary propSchema) + { + var found = ValidateNode(propSchema, member.Value, Join(path, member.Key)); + if (found is not null) + { + return found; + } + } + else if (!additionalAllowed) + { + return Violation(Join(path, member.Key), "additional_not_allowed"); + } + } + + return null; + } + + private static string? CheckArray(IReadOnlyDictionary schema, object? value, string path) + { + if (value is not IReadOnlyList list) + { + return Violation(path, "not_an_array"); + } + + if (!(schema.TryGetValue("items", out var it) && it is IReadOnlyDictionary items)) + { + return null; + } + + for (var i = 0; i < list.Count; i++) + { + var found = ValidateNode(items, list[i], path + "[" + i + "]"); + if (found is not null) + { + return found; + } + } + + return null; + } + + private static string? CheckString(IReadOnlyDictionary schema, object? value, string path) + { + if (value is not string str) + { + return Violation(path, "not_a_string"); + } + + if (schema.TryGetValue("minLength", out var ml) && ml is long min && str.Length < min) + { + return Violation(path, "below_min_length"); + } + + return null; + } + + private static string? CheckMinimum(IReadOnlyDictionary schema, object? value, string path) + { + if (schema.TryGetValue("minimum", out var m) + && TryDouble(m, out var min) + && TryDouble(value, out var actual) + && actual < min) + { + return Violation(path, "below_minimum"); + } + + return null; + } + + // JSON numbers materialize to long (integers) or double (fractions); a whole-valued double + // still counts as an integer, matching the other SDKs. bool is not long/double. + private static bool IsInteger(object? value) => value switch + { + long or int => true, + double d => !double.IsInfinity(d) && d == Math.Floor(d), + _ => false, + }; + + private static bool IsNumber(object? value) => value is long or int or double; + + private static bool TryDouble(object? value, out double result) + { + switch (value) + { + case long l: + result = l; + return true; + case int i: + result = i; + return true; + case double d: + result = d; + return true; + default: + result = 0; + return false; + } + } + + // Type-aware equality: a long never equals a double, true never equals 1 — matching the + // strict comparisons in the other SDK validators. + private static bool Equal(object? a, object? b) + { + if (a is null || b is null) + { + return a is null && b is null; + } + + return a.GetType() == b.GetType() && a.Equals(b); + } + + private static bool Contains(IReadOnlyList values, object? value) + { + foreach (var item in values) + { + if (Equal(value, item)) + { + return true; + } + } + + return false; + } + + private static string Violation(string path, string reason) => + (path.Length == 0 ? "" : path) + ": " + reason; + + private static string Join(string path, string key) => + path.Length == 0 ? key : path + "." + key; +} diff --git a/src/BabelQueue.Core/Schema/SchemaJson.cs b/src/BabelQueue.Core/Schema/SchemaJson.cs new file mode 100644 index 0000000..d8a01ab --- /dev/null +++ b/src/BabelQueue.Core/Schema/SchemaJson.cs @@ -0,0 +1,66 @@ +using System.Collections.Generic; +using System.Text.Json; +using BabelQueue; + +namespace BabelQueue.Schema; + +/// +/// Materializes JSON into the same object? tree the envelope codec produces +/// (objects to , arrays to , +/// numbers to long/double, plus string/bool/null), so a parsed schema and a +/// decoded message data compare under the same types. +/// +public static class SchemaJson +{ + /// Parse any JSON document into its materialized object? tree. + public static object? Parse(string json) + { + using var doc = JsonDocument.Parse(json); + return ToValue(doc.RootElement); + } + + /// Parse a JSON object into a materialized map. + public static IReadOnlyDictionary ParseObject(string json) + { + if (Parse(json) is IReadOnlyDictionary map) + { + return map; + } + + throw new BabelQueueException("schema: expected a JSON object"); + } + + private static object? ToValue(JsonElement e) => e.ValueKind switch + { + JsonValueKind.Object => ToDictionary(e), + JsonValueKind.Array => ToList(e), + JsonValueKind.String => e.GetString(), + // Cast keeps long boxed as long (the ternary would otherwise widen both to double). + JsonValueKind.Number => e.TryGetInt64(out var l) ? (object)l : e.GetDouble(), + JsonValueKind.True => true, + JsonValueKind.False => false, + _ => null, + }; + + private static Dictionary ToDictionary(JsonElement e) + { + var result = new Dictionary(); + foreach (var property in e.EnumerateObject()) + { + result[property.Name] = ToValue(property.Value); + } + + return result; + } + + private static List ToList(JsonElement e) + { + var result = new List(); + foreach (var item in e.EnumerateArray()) + { + result.Add(ToValue(item)); + } + + return result; + } +} diff --git a/src/BabelQueue.Core/Schema/SchemaValidation.cs b/src/BabelQueue.Core/Schema/SchemaValidation.cs new file mode 100644 index 0000000..59d01b3 --- /dev/null +++ b/src/BabelQueue.Core/Schema/SchemaValidation.cs @@ -0,0 +1,53 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using BabelQueue; + +namespace BabelQueue.Schema; + +/// +/// Optional per-URN data schema validation for a babelqueue producer or consumer +/// (ADR-0024). A supplies a JSON Schema for a message URN — +/// typically built from a babelqueue-registry registry.json — and the message's +/// data is validated against it. It is opt-in: a URN with no registered schema is never +/// validated. Producer-side, call before publishing (or +/// to branch without throwing); consumer-side, wrap a handler with . The .NET +/// mirror of the Go schema.Check/schema.Wrap helpers. +/// +public static class SchemaValidation +{ + private static readonly IReadOnlyDictionary Empty = new Dictionary(); + + /// + /// The first data violation for /, or + /// null when valid or when no schema is registered for the URN (opt-in). + /// + public static string? Check(ISchemaProvider provider, string urn, IReadOnlyDictionary data) + { + var schema = provider.SchemaFor(urn); + return schema is null ? null : PayloadValidator.Validate(schema, data); + } + + /// + /// Validate / against its registered schema, + /// throwing otherwise. The producer-side guard. + /// + public static void Validate(ISchemaProvider provider, string urn, IReadOnlyDictionary data) + { + var violation = Check(provider, urn, data); + if (violation is not null) + { + throw new InvalidPayloadException(urn, violation); + } + } + + /// + /// Returns wrapped to validate each message's data + /// against its URN's schema before the handler runs (consumer-side safety net). + /// + public static Handler Wrap(ISchemaProvider provider, Handler handler) => + async envelope => + { + Validate(provider, EnvelopeCodec.Urn(envelope), envelope.Data ?? Empty); + await handler(envelope).ConfigureAwait(false); + }; +} diff --git a/tests/BabelQueue.Core.Tests/IdempotencyTests.cs b/tests/BabelQueue.Core.Tests/IdempotencyTests.cs new file mode 100644 index 0000000..6281f04 --- /dev/null +++ b/tests/BabelQueue.Core.Tests/IdempotencyTests.cs @@ -0,0 +1,114 @@ +using BabelQueue; +using Xunit; + +namespace BabelQueue.Tests; + +public class IdempotencyTests +{ + private static Envelope Msg(string? id) => + new( + "urn:babel:orders:created", + "trace-1", + new Dictionary { ["order_id"] = 7L }, + new Meta(id, "orders", "dotnet", 1, 1L), + 0, + null); + + [Fact] + public async Task RunsAndRemembersOnFirstDelivery() + { + var store = new InMemoryStore(); + var calls = 0; + var handler = Idempotency.Wrap(store, _ => + { + calls++; + return Task.CompletedTask; + }); + + await handler(Msg("m1")); + + Assert.Equal(1, calls); + Assert.True(store.Seen("m1")); + } + + [Fact] + public async Task SkipsRedeliveryOfSameId() + { + var store = new InMemoryStore(); + var calls = 0; + var handler = Idempotency.Wrap(store, _ => + { + calls++; + return Task.CompletedTask; + }); + + await handler(Msg("m1")); + await handler(Msg("m1")); // redelivery → skipped + + Assert.Equal(1, calls); + } + + [Fact] + public async Task RunsAgainForADifferentId() + { + var store = new InMemoryStore(); + var calls = 0; + var handler = Idempotency.Wrap(store, _ => + { + calls++; + return Task.CompletedTask; + }); + + await handler(Msg("m1")); + await handler(Msg("m2")); + + Assert.Equal(2, calls); + } + + [Fact] + public async Task DoesNotRememberWhenHandlerThrows() + { + var store = new InMemoryStore(); + var calls = 0; + var handler = Idempotency.Wrap(store, _ => + { + calls++; + return Task.FromException(new InvalidOperationException("boom")); + }); + + await Assert.ThrowsAsync(() => handler(Msg("m1"))); + Assert.False(store.Seen("m1")); + + // A redelivery runs the handler again — retry works. + await Assert.ThrowsAsync(() => handler(Msg("m1"))); + Assert.Equal(2, calls); + } + + [Fact] + public async Task RunsWhenNoUsableId() + { + var store = new InMemoryStore(); + var calls = 0; + var handler = Idempotency.Wrap(store, _ => + { + calls++; + return Task.CompletedTask; + }); + + await handler(Msg("")); // empty id → cannot dedupe → runs + await handler(Msg(null)); // no id at all → runs + + Assert.Equal(2, calls); + } + + [Fact] + public void ForgetRemovesARememberedId() + { + var store = new InMemoryStore(); + store.Remember("m1"); + Assert.True(store.Seen("m1")); + + store.Forget("m1"); + Assert.False(store.Seen("m1")); + } +} diff --git a/tests/BabelQueue.Core.Tests/PayloadSchemaConformanceTests.cs b/tests/BabelQueue.Core.Tests/PayloadSchemaConformanceTests.cs new file mode 100644 index 0000000..2db8f98 --- /dev/null +++ b/tests/BabelQueue.Core.Tests/PayloadSchemaConformanceTests.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using System.IO; +using BabelQueue.Schema; +using Xunit; + +namespace BabelQueue.Tests; + +/// +/// Runs the shared cross-SDK payload-schema cases (ADR-0024) from the vendored conformance +/// suite: this validator must agree with the Go, PHP, Python, Node and Java ones on each +/// case's valid flag. +/// +public class PayloadSchemaConformanceTests +{ + private static string SuitePath(string relative) => + Path.Combine(AppContext.BaseDirectory, "conformance", relative); + + [Fact] + public void PayloadCasesMatchAcrossSdks() + { + var manifest = SchemaJson.Parse(File.ReadAllText(SuitePath("manifest.json"))) + as IReadOnlyDictionary; + Assert.NotNull(manifest); + + Assert.True(manifest!.TryGetValue("payload_schema", out var sectionObj)); + var section = Assert.IsAssignableFrom>(sectionObj); + var schema = Assert.IsAssignableFrom>(section["schema"]); + var cases = Assert.IsAssignableFrom>(section["cases"]); + Assert.NotEmpty(cases); + + foreach (var caseObj in cases) + { + var testCase = Assert.IsAssignableFrom>(caseObj); + var data = Assert.IsAssignableFrom>(testCase["data"]); + var expected = testCase["valid"] is true; + var isValid = PayloadValidator.Validate(schema, data) is null; + Assert.Equal(expected, isValid); + } + } +} diff --git a/tests/BabelQueue.Core.Tests/SchemaValidationTests.cs b/tests/BabelQueue.Core.Tests/SchemaValidationTests.cs new file mode 100644 index 0000000..c7f4c41 --- /dev/null +++ b/tests/BabelQueue.Core.Tests/SchemaValidationTests.cs @@ -0,0 +1,104 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using BabelQueue; +using BabelQueue.Schema; +using Xunit; + +namespace BabelQueue.Tests; + +public class SchemaValidationTests +{ + private const string Orders = + "{\"type\":\"object\",\"required\":[\"order_id\"]," + + "\"properties\":{\"order_id\":{\"type\":\"integer\"}},\"additionalProperties\":false}"; + + private static IReadOnlyDictionary Json(string raw) => SchemaJson.ParseObject(raw); + + private static ISchemaProvider Provider() => + new MapProvider(new Dictionary> + { + ["urn:babel:orders:created"] = Json(Orders), + }); + + [Fact] + public void ValidatorEnforcesObjectRequiredTypesAndAdditional() + { + var s = Json(Orders); + Assert.Null(PayloadValidator.Validate(s, Json("{\"order_id\":7}"))); + Assert.NotNull(PayloadValidator.Validate(s, Json("{}"))); + Assert.NotNull(PayloadValidator.Validate(s, Json("{\"order_id\":\"x\"}"))); + Assert.NotNull(PayloadValidator.Validate(s, Json("{\"order_id\":7,\"extra\":1}"))); + } + + [Fact] + public void ValidatorScalarParity() + { + Assert.Null(PayloadValidator.Validate(Json("{\"type\":\"boolean\"}"), true)); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"boolean\"}"), "x")); + Assert.Null(PayloadValidator.Validate(Json("{\"type\":\"null\"}"), null)); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"null\"}"), 1L)); + Assert.Null(PayloadValidator.Validate(Json("{\"type\":\"number\",\"minimum\":0.5}"), 0.6)); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"number\",\"minimum\":0.5}"), 0.4)); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"number\"}"), "x")); + Assert.Null(PayloadValidator.Validate(Json("{\"type\":\"integer\"}"), 1L)); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"integer\"}"), 1.5)); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"integer\"}"), true)); + } + + [Fact] + public void ValidatorConstStringEnumArray() + { + Assert.Null(PayloadValidator.Validate(Json("{\"const\":\"v1\"}"), "v1")); + Assert.NotNull(PayloadValidator.Validate(Json("{\"const\":\"v1\"}"), "v2")); + Assert.Null(PayloadValidator.Validate(Json("{\"type\":\"string\",\"minLength\":1}"), "a")); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"string\",\"minLength\":1}"), "")); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"string\"}"), 5L)); + Assert.Null(PayloadValidator.Validate(Json("{\"enum\":[\"a\",\"b\"]}"), "b")); + Assert.NotNull(PayloadValidator.Validate(Json("{\"enum\":[\"a\",\"b\"]}"), "c")); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"object\"}"), "x")); + Assert.NotNull(PayloadValidator.Validate(Json("{\"type\":\"array\"}"), "x")); + var arr = Json("{\"type\":\"array\",\"items\":{\"type\":\"string\"}}"); + Assert.Null(PayloadValidator.Validate(arr, new List { "a", "b" })); + Assert.NotNull(PayloadValidator.Validate(arr, new List { "a", 1L })); + } + + [Fact] + public void CheckValidInvalidUnregistered() + { + var p = Provider(); + Assert.Null(SchemaValidation.Check(p, "urn:babel:orders:created", Json("{\"order_id\":1}"))); + Assert.Null(SchemaValidation.Check(p, "urn:babel:unknown", Json("{\"x\":1}"))); + Assert.NotNull(SchemaValidation.Check(p, "urn:babel:orders:created", Json("{}"))); + } + + [Fact] + public void ValidateThrowsWithDetails() + { + var ex = Assert.Throws(() => + SchemaValidation.Validate(Provider(), "urn:babel:orders:created", Json("{\"order_id\":\"x\"}"))); + Assert.Equal("urn:babel:orders:created", ex.Urn); + Assert.NotNull(ex.Violation); + } + + [Fact] + public async Task WrapRunsThrowsAndRunsForUnregistered() + { + var p = Provider(); + var calls = 0; + Handler handler = SchemaValidation.Wrap(p, _ => + { + calls++; + return Task.CompletedTask; + }); + + await handler(EnvelopeCodec.Make("urn:babel:orders:created", Json("{\"order_id\":1}"))); + Assert.Equal(1, calls); + + await Assert.ThrowsAsync(() => + handler(EnvelopeCodec.Make("urn:babel:orders:created", Json("{}")))); + Assert.Equal(1, calls); + + await handler(EnvelopeCodec.Make("urn:babel:unknown", Json("{\"anything\":true}"))); + Assert.Equal(2, calls); + } +} diff --git a/tests/BabelQueue.Core.Tests/conformance/manifest.json b/tests/BabelQueue.Core.Tests/conformance/manifest.json index 78e5c3a..5b2fee4 100644 --- a/tests/BabelQueue.Core.Tests/conformance/manifest.json +++ b/tests/BabelQueue.Core.Tests/conformance/manifest.json @@ -226,5 +226,28 @@ "x-attempts": 0 } } + }, + "payload_schema": { + "description": "Per-URN data schema validation (ADR-0024). Each case validates `data` against `schema`; every SDK's optional payload validator (Go schema, PHP BabelQueue\\Schema, Python babelqueue.schema) MUST agree on `valid`. The wire envelope stays frozen — this governs the data block only, and is opt-in (consumers/producers without a registered schema skip it).", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["order_id"], + "properties": { + "order_id": { "type": "integer", "minimum": 1 }, + "amount": { "type": "number", "minimum": 0 }, + "currency": { "enum": ["USD", "EUR", "TRY"] } + }, + "additionalProperties": false + }, + "cases": [ + { "name": "valid-minimal", "valid": true, "data": { "order_id": 1042 } }, + { "name": "valid-full", "valid": true, "data": { "order_id": 1042, "amount": 99.9, "currency": "USD" } }, + { "name": "invalid-missing-required", "valid": false, "data": { "amount": 10 } }, + { "name": "invalid-wrong-type", "valid": false, "data": { "order_id": "x" } }, + { "name": "invalid-additional-property", "valid": false, "data": { "order_id": 1, "extra": true } }, + { "name": "invalid-enum", "valid": false, "data": { "order_id": 1, "currency": "GBP" } }, + { "name": "invalid-below-minimum", "valid": false, "data": { "order_id": 0 } } + ] } }