diff --git a/src/Exceptions/InvalidPayloadException.php b/src/Exceptions/InvalidPayloadException.php new file mode 100644 index 0000000..c7ea5f8 --- /dev/null +++ b/src/Exceptions/InvalidPayloadException.php @@ -0,0 +1,60 @@ +: "` mismatch, + * {@see self::urn()} the message URN, and {@see self::data()} the offending payload — for + * logging, branching, or dead-lettering. + * + * Thrown by the consumer-side {@see \BabelQueue\Schema\SchemaValidated::wrap()} so the + * consume runtime redelivers (and eventually dead-letters) a poison message; the recommended + * primary use is producer-side ({@see \BabelQueue\Schema\SchemaValidated::assert()}) so + * invalid data never enters the queue. + */ +final class InvalidPayloadException extends BabelQueueException +{ + /** + * @param array $data + */ + private function __construct( + private readonly string $violation, + private readonly string $urn, + private readonly array $data, + ) { + parent::__construct("Message data for [{$urn}] does not match its URN schema: {$violation}."); + } + + /** + * @param array $data + */ + public static function because(string $violation, string $urn, array $data): self + { + return new self($violation, $urn, $data); + } + + /** The first `": "` violation. */ + public function violation(): string + { + return $this->violation; + } + + /** The message URN whose schema was violated. */ + public function urn(): string + { + return $this->urn; + } + + /** + * The offending `data` payload. + * + * @return array + */ + public function data(): array + { + return $this->data; + } +} diff --git a/src/Idempotency/IdempotencyStore.php b/src/Idempotency/IdempotencyStore.php new file mode 100644 index 0000000..326650d --- /dev/null +++ b/src/Idempotency/IdempotencyStore.php @@ -0,0 +1,39 @@ +on('urn:babel:orders:created', Idempotent::wrap($store, $handler)); + * + * - A previously-succeeded id → the wrapper **returns**, so the loop acks it and the + * broker stops redelivering. + * - The handler **throws** → the id is left unmarked and the exception propagates, so + * retry / DLQ (§§5–6) still apply and a later delivery runs the handler again. + * - A message with no usable `meta.id` runs unchanged (fail-open). + * + * This is the PHP mirror of the Go `idempotency.Wrap(store, handler)` helper. + */ +final class Idempotent +{ + /** + * @param callable(ConsumedMessage): void $handler + * @return callable(ConsumedMessage): void + */ + public static function wrap(IdempotencyStore $store, callable $handler): callable + { + return static function (ConsumedMessage $message) use ($store, $handler): void { + $meta = $message->getMeta(); + $id = isset($meta['id']) && is_string($meta['id']) ? $meta['id'] : ''; + + // No usable id → cannot dedupe; run the handler unchanged. + if ($id === '') { + $handler($message); + + return; + } + + // Already processed on an earlier delivery: skip + return so the loop acks it. + if ($store->seen($id)) { + return; + } + + // First success wins. A throw here leaves the id unmarked → retry/DLQ apply. + $handler($message); + $store->remember($id); + }; + } +} diff --git a/src/Idempotency/InMemoryStore.php b/src/Idempotency/InMemoryStore.php new file mode 100644 index 0000000..67d0eb8 --- /dev/null +++ b/src/Idempotency/InMemoryStore.php @@ -0,0 +1,31 @@ + */ + private array $seen = []; + + public function seen(string $messageId): bool + { + return isset($this->seen[$messageId]); + } + + public function remember(string $messageId): void + { + $this->seen[$messageId] = true; + } + + public function forget(string $messageId): void + { + unset($this->seen[$messageId]); + } +} diff --git a/src/Schema/DirProvider.php b/src/Schema/DirProvider.php new file mode 100644 index 0000000..95a1c95 --- /dev/null +++ b/src/Schema/DirProvider.php @@ -0,0 +1,87 @@ + urn => schema file path (relative to the manifest dir) */ + private array $files = []; + + /** @var array> lazily-decoded schema cache */ + private array $cache = []; + + public function __construct(string $manifestPath) + { + $raw = @file_get_contents($manifestPath); + if ($raw === false) { + throw new RuntimeException("schema: cannot read registry manifest [{$manifestPath}]."); + } + $manifest = json_decode($raw, true); + if (! is_array($manifest)) { + throw new RuntimeException("schema: invalid registry manifest [{$manifestPath}]."); + } + + $this->dir = \dirname($manifestPath); + + $entries = is_array($manifest['schemas'] ?? null) ? $manifest['schemas'] : []; + foreach ($entries as $entry) { + if (! is_array($entry)) { + continue; + } + $urn = is_string($entry['urn'] ?? null) ? $entry['urn'] : ''; + $file = is_string($entry['schema'] ?? null) ? $entry['schema'] : ''; + if ($urn === '' || $file === '') { + continue; + } + $this->files[$urn] = $file; + } + } + + public function schemaFor(string $urn): ?array + { + if (isset($this->cache[$urn])) { + return $this->cache[$urn]; + } + if (! isset($this->files[$urn])) { + return null; + } + + $file = $this->files[$urn]; + $path = self::isAbsolute($file) ? $file : $this->dir . DIRECTORY_SEPARATOR . $file; + + $raw = @file_get_contents($path); + if ($raw === false) { + throw new RuntimeException("schema: cannot read schema for [{$urn}] ({$file})."); + } + $decoded = json_decode($raw, true); + if (! is_array($decoded)) { + throw new RuntimeException("schema: invalid schema for [{$urn}] ({$file})."); + } + + /** @var array $decoded */ + $this->cache[$urn] = $decoded; + + return $decoded; + } + + private static function isAbsolute(string $path): bool + { + return $path !== '' && ($path[0] === '/' || preg_match('#^[A-Za-z]:[\\\\/]#', $path) === 1); + } +} diff --git a/src/Schema/MapProvider.php b/src/Schema/MapProvider.php new file mode 100644 index 0000000..fba1e47 --- /dev/null +++ b/src/Schema/MapProvider.php @@ -0,0 +1,48 @@ +> $schemas urn => decoded JSON Schema + */ + public function __construct(private readonly array $schemas) + { + } + + /** + * Build a MapProvider from URN => raw JSON Schema strings, decoding each. + * + * @param array $raw + */ + public static function fromJson(array $raw): self + { + $schemas = []; + foreach ($raw as $urn => $json) { + $decoded = json_decode($json, true); + if (! is_array($decoded)) { + throw new RuntimeException("schema: invalid JSON schema for [{$urn}]."); + } + /** @var array $decoded */ + $schemas[$urn] = $decoded; + } + + return new self($schemas); + } + + public function schemaFor(string $urn): ?array + { + $schema = $this->schemas[$urn] ?? null; + + return is_array($schema) ? $schema : null; + } +} diff --git a/src/Schema/PayloadValidator.php b/src/Schema/PayloadValidator.php new file mode 100644 index 0000000..89aff04 --- /dev/null +++ b/src/Schema/PayloadValidator.php @@ -0,0 +1,159 @@ +: "`, or null when it + * conforms. + * + * @param array $schema + */ + public static function check(array $schema, mixed $value, string $path = ''): ?string + { + if (array_key_exists('const', $schema) && $value !== $schema['const']) { + return self::violation($path, 'wrong_const'); + } + if (isset($schema['enum']) && is_array($schema['enum']) && ! in_array($value, $schema['enum'], true)) { + return self::violation($path, 'not_in_enum'); + } + + $type = isset($schema['type']) && is_string($schema['type']) ? $schema['type'] : ''; + + return match ($type) { + 'object' => self::checkObject($schema, $value, $path), + 'array' => self::checkArray($schema, $value, $path), + 'string' => self::checkString($schema, $value, $path), + 'integer' => self::isInteger($value) + ? self::checkMinimum($schema, $value, $path) + : self::violation($path, 'not_an_integer'), + 'number' => (is_int($value) || is_float($value)) + ? self::checkMinimum($schema, $value, $path) + : self::violation($path, 'not_a_number'), + 'boolean' => is_bool($value) ? null : self::violation($path, 'not_a_boolean'), + 'null' => $value === null ? null : self::violation($path, 'not_null'), + default => null, + }; + } + + /** + * @param array $schema + */ + private static function checkObject(array $schema, mixed $value, string $path): ?string + { + if (! is_array($value) || ($value !== [] && array_is_list($value))) { + return self::violation($path, 'not_an_object'); + } + + if (isset($schema['required']) && is_array($schema['required'])) { + foreach (array_filter($schema['required'], 'is_string') as $key) { + if (! array_key_exists($key, $value)) { + return self::violation(self::join($path, $key), 'missing_required'); + } + } + } + + $properties = is_array($schema['properties'] ?? null) ? $schema['properties'] : []; + $additionalAllowed = ! array_key_exists('additionalProperties', $schema) + || $schema['additionalProperties'] !== false; + + foreach ($value as $key => $item) { + $name = (string) $key; + if (isset($properties[$name]) && is_array($properties[$name])) { + /** @var array $propSchema */ + $propSchema = $properties[$name]; + $violation = self::check($propSchema, $item, self::join($path, $name)); + if ($violation !== null) { + return $violation; + } + + continue; + } + if (! $additionalAllowed) { + return self::violation(self::join($path, $name), 'additional_not_allowed'); + } + } + + return null; + } + + /** + * @param array $schema + */ + private static function checkArray(array $schema, mixed $value, string $path): ?string + { + if (! is_array($value) || ($value !== [] && ! array_is_list($value))) { + return self::violation($path, 'not_an_array'); + } + if (! isset($schema['items']) || ! is_array($schema['items'])) { + return null; + } + + /** @var array $items */ + $items = $schema['items']; + foreach ($value as $i => $item) { + $violation = self::check($items, $item, $path . '[' . $i . ']'); + if ($violation !== null) { + return $violation; + } + } + + return null; + } + + /** + * @param array $schema + */ + private static function checkString(array $schema, mixed $value, string $path): ?string + { + if (! is_string($value)) { + return self::violation($path, 'not_a_string'); + } + if (isset($schema['minLength']) && is_numeric($schema['minLength']) && mb_strlen($value) < (int) $schema['minLength']) { + return self::violation($path, 'below_min_length'); + } + + return null; + } + + /** + * @param array $schema + */ + private static function checkMinimum(array $schema, mixed $value, string $path): ?string + { + if (! isset($schema['minimum']) || ! is_numeric($schema['minimum']) || ! is_numeric($value)) { + return null; + } + + return (float) $value < (float) $schema['minimum'] + ? self::violation($path, 'below_minimum') + : null; + } + + private static function isInteger(mixed $value): bool + { + return is_int($value) || (is_float($value) && floor($value) === $value); + } + + private static function violation(string $path, string $reason): string + { + return ($path === '' ? '' : $path) . ': ' . $reason; + } + + private static function join(string $path, string $key): string + { + return $path === '' ? $key : $path . '.' . $key; + } +} diff --git a/src/Schema/SchemaProvider.php b/src/Schema/SchemaProvider.php new file mode 100644 index 0000000..22a388e --- /dev/null +++ b/src/Schema/SchemaProvider.php @@ -0,0 +1,25 @@ +|null + */ + public function schemaFor(string $urn): ?array; +} diff --git a/src/Schema/SchemaValidated.php b/src/Schema/SchemaValidated.php new file mode 100644 index 0000000..04f4d5c --- /dev/null +++ b/src/Schema/SchemaValidated.php @@ -0,0 +1,78 @@ +on('urn:babel:orders:created', SchemaValidated::wrap($provider, $handler)); + * + * Invalid data throws {@see InvalidPayloadException}, so the message redelivers and is + * eventually dead-lettered (a poison message does not become valid on retry — hence + * producer-side validation is preferred). A URN with no schema runs the handler unchanged. + * + * The PHP mirror of the Go `schema.Check` / `schema.Wrap` helpers. + */ +final class SchemaValidated +{ + /** + * The first `data` violation for ($urn, $data), or null when it is valid or when no + * schema is registered for the URN (opt-in). Non-throwing; for producer-side branching. + * + * @param array $data + */ + public static function check(SchemaProvider $provider, string $urn, array $data): ?string + { + $schema = $provider->schemaFor($urn); + if ($schema === null) { + return null; + } + + return PayloadValidator::check($schema, $data); + } + + /** + * Assert that ($urn, $data) matches its registered schema, throwing otherwise. The + * producer-side guard: call it before publishing. + * + * @param array $data + * + * @throws InvalidPayloadException + */ + public static function assert(SchemaProvider $provider, string $urn, array $data): void + { + $violation = self::check($provider, $urn, $data); + if ($violation !== null) { + throw InvalidPayloadException::because($violation, $urn, $data); + } + } + + /** + * Wrap a consume handler so each message's `data` is validated against its URN schema + * before the handler runs. + * + * @param callable(ConsumedMessage): void $handler + * @return callable(ConsumedMessage): void + */ + public static function wrap(SchemaProvider $provider, callable $handler): callable + { + return static function (ConsumedMessage $message) use ($provider, $handler): void { + self::assert($provider, $message->getUrn(), $message->getData()); + $handler($message); + }; + } +} diff --git a/tests/Idempotency/IdempotencyTest.php b/tests/Idempotency/IdempotencyTest.php new file mode 100644 index 0000000..f932d8b --- /dev/null +++ b/tests/Idempotency/IdempotencyTest.php @@ -0,0 +1,162 @@ +message('msg-1')); + + $this->assertSame(1, $calls); + $this->assertTrue($store->seen('msg-1')); + } + + public function test_skips_the_handler_on_a_redelivery_of_the_same_id(): void + { + $store = new InMemoryStore(); + $calls = 0; + $handler = Idempotent::wrap($store, function (ConsumedMessage $m) use (&$calls): void { + $calls++; + }); + + $handler($this->message('msg-1')); + $handler($this->message('msg-1')); // redelivery → skipped + + $this->assertSame(1, $calls); + } + + public function test_runs_the_handler_again_for_a_different_id(): void + { + $store = new InMemoryStore(); + $calls = 0; + $handler = Idempotent::wrap($store, function (ConsumedMessage $m) use (&$calls): void { + $calls++; + }); + + $handler($this->message('msg-1')); + $handler($this->message('msg-2')); + + $this->assertSame(2, $calls); + } + + public function test_does_not_remember_an_id_when_the_handler_throws(): void + { + $store = new InMemoryStore(); + $calls = 0; + $handler = Idempotent::wrap($store, function (ConsumedMessage $m) use (&$calls): void { + $calls++; + throw new RuntimeException('boom'); + }); + + // The throw must propagate so the consumer redelivers (at-least-once). + try { + $handler($this->message('msg-1')); + $this->fail('handler exception should propagate'); + } catch (RuntimeException) { + // expected + } + $this->assertFalse($store->seen('msg-1')); + + // A redelivery now runs the handler again — retry works. + try { + $handler($this->message('msg-1')); + } catch (RuntimeException) { + } + $this->assertSame(2, $calls); + } + + public function test_runs_the_handler_when_the_message_has_no_usable_id(): void + { + $store = new InMemoryStore(); + $calls = 0; + $handler = Idempotent::wrap($store, function (ConsumedMessage $m) use (&$calls): void { + $calls++; + }); + + $handler($this->message('')); // empty id → cannot dedupe → runs + $handler($this->message('')); // still runs + + $this->assertSame(2, $calls); + } + + public function test_forget_removes_a_remembered_id(): void + { + $store = new InMemoryStore(); + $store->remember('msg-1'); + $this->assertTrue($store->seen('msg-1')); + + $store->forget('msg-1'); + $this->assertFalse($store->seen('msg-1')); + } + + private function message(string $id): ConsumedMessage + { + return new FakeMessage([ + 'job' => 'urn:babel:orders:created', + 'trace_id' => 'trace-1', + 'data' => ['order_id' => 7], + 'meta' => ['id' => $id, 'queue' => 'orders', 'lang' => 'php', 'schema_version' => 1], + 'attempts' => 0, + ]); + } +} + +final class FakeMessage implements ConsumedMessage +{ + /** @param array $envelope */ + public function __construct(private array $envelope) + { + } + + public function getUrn(): string + { + return is_string($this->envelope['job'] ?? null) ? $this->envelope['job'] : ''; + } + + public function getTraceId(): string + { + return is_string($this->envelope['trace_id'] ?? null) ? $this->envelope['trace_id'] : ''; + } + + /** @return array */ + public function getData(): array + { + return is_array($this->envelope['data'] ?? null) ? $this->envelope['data'] : []; + } + + /** @return array */ + public function getMeta(): array + { + return is_array($this->envelope['meta'] ?? null) ? $this->envelope['meta'] : []; + } + + public function attempts(): int + { + return is_int($this->envelope['attempts'] ?? null) ? $this->envelope['attempts'] : 0; + } + + /** @return array */ + public function envelope(): array + { + return $this->envelope; + } +} diff --git a/tests/Schema/PayloadConformanceTest.php b/tests/Schema/PayloadConformanceTest.php new file mode 100644 index 0000000..93d1f95 --- /dev/null +++ b/tests/Schema/PayloadConformanceTest.php @@ -0,0 +1,44 @@ + $manifest */ + $manifest = (array) json_decode((string) file_get_contents($path), true, 512, JSON_THROW_ON_ERROR); + $section = $manifest['payload_schema'] ?? null; + if (! is_array($section) || ! is_array($section['schema'] ?? null) || ! is_array($section['cases'] ?? null)) { + self::markTestSkipped('manifest has no payload_schema section'); + } + + /** @var array $schema */ + $schema = $section['schema']; + /** @var array $cases */ + $cases = $section['cases']; + self::assertNotEmpty($cases); + + foreach ($cases as $case) { + self::assertIsArray($case); + /** @var array $data */ + $data = is_array($case['data'] ?? null) ? $case['data'] : []; + $valid = PayloadValidator::check($schema, $data) === null; + self::assertSame((bool) $case['valid'], $valid, 'case ' . (string) $case['name']); + } + } +} diff --git a/tests/Schema/PayloadValidatorTest.php b/tests/Schema/PayloadValidatorTest.php new file mode 100644 index 0000000..3afe05d --- /dev/null +++ b/tests/Schema/PayloadValidatorTest.php @@ -0,0 +1,72 @@ + 7])); + self::assertNotNull(PayloadValidator::check($schema, [])); + self::assertNotNull(PayloadValidator::check($schema, ['order_id' => 'x'])); + self::assertNotNull(PayloadValidator::check($schema, ['order_id' => 7, 'extra' => 1])); + self::assertNotNull(PayloadValidator::check($schema, ['order_id' => 7, 'note' => ''])); + } + + public function test_enum_minimum_and_array_items(): void + { + $schema = (array) json_decode( + '{"type":"object","properties":{"status":{"enum":["new","paid"]},"qty":{"type":"integer","minimum":1},"tags":{"type":"array","items":{"type":"string"}}}}', + true + ); + + self::assertNull(PayloadValidator::check($schema, ['status' => 'paid', 'qty' => 2, 'tags' => ['a', 'b']])); + self::assertNotNull(PayloadValidator::check($schema, ['status' => 'cancelled'])); + self::assertNotNull(PayloadValidator::check($schema, ['qty' => 0])); + self::assertNotNull(PayloadValidator::check($schema, ['tags' => ['a', 1]])); + } + + #[DataProvider('scalarCases')] + public function test_scalar_types(string $schemaJson, mixed $value, bool $valid): void + { + $schema = (array) json_decode($schemaJson, true); + $violation = PayloadValidator::check($schema, $value); + self::assertSame($valid, $violation === null, (string) $violation); + } + + /** + * @return array + */ + public static function scalarCases(): array + { + return [ + ['{"type":"boolean"}', true, true], + ['{"type":"boolean"}', 'x', false], + ['{"type":"null"}', null, true], + ['{"type":"null"}', 1, false], + ['{"type":"number","minimum":0.5}', 0.6, true], + ['{"type":"number","minimum":0.5}', 0.4, false], + ['{"type":"number"}', 'x', false], + ['{"type":"string"}', 5, false], + ['{"type":"integer"}', 1.0, true], + ['{"type":"integer"}', 1.5, false], + ['{"const":"v1"}', 'v1', true], + ['{"const":"v1"}', 'v2', false], + ]; + } +} diff --git a/tests/Schema/ProviderTest.php b/tests/Schema/ProviderTest.php new file mode 100644 index 0000000..4fc634c --- /dev/null +++ b/tests/Schema/ProviderTest.php @@ -0,0 +1,76 @@ + '{"type":"object","required":["order_id"]}', + ]); + + self::assertNotNull($p->schemaFor('urn:babel:orders:created')); + self::assertNull($p->schemaFor('urn:babel:unknown')); + } + + public function test_map_provider_from_json_rejects_invalid(): void + { + $this->expectException(RuntimeException::class); + MapProvider::fromJson(['u' => 'not json']); + } + + public function test_dir_provider_reads_registry_lazily(): void + { + $dir = $this->tempDir(); + mkdir($dir . '/schemas', 0o777, true); + file_put_contents($dir . '/schemas/orders.json', '{"type":"object","required":["order_id"],"properties":{"order_id":{"type":"integer"}}}'); + // the empty-urn entry is ignored on load + file_put_contents($dir . '/registry.json', '{"schemas":[{"urn":"urn:babel:orders:created","schema":"schemas/orders.json"},{"urn":"","schema":"x"}]}'); + + $p = new DirProvider($dir . '/registry.json'); + // call twice: the second hits the cache + self::assertNotNull($p->schemaFor('urn:babel:orders:created')); + self::assertNotNull($p->schemaFor('urn:babel:orders:created')); + self::assertNull($p->schemaFor('urn:babel:unknown')); + } + + public function test_dir_provider_missing_manifest_throws(): void + { + $this->expectException(RuntimeException::class); + new DirProvider($this->tempDir() . '/nope.json'); + } + + public function test_dir_provider_invalid_manifest_throws(): void + { + $dir = $this->tempDir(); + mkdir($dir, 0o777, true); + file_put_contents($dir . '/registry.json', 'not json'); + + $this->expectException(RuntimeException::class); + new DirProvider($dir . '/registry.json'); + } + + public function test_dir_provider_missing_schema_file_throws(): void + { + $dir = $this->tempDir(); + mkdir($dir, 0o777, true); + file_put_contents($dir . '/registry.json', '{"schemas":[{"urn":"u","schema":"missing.json"}]}'); + + $p = new DirProvider($dir . '/registry.json'); + $this->expectException(RuntimeException::class); + $p->schemaFor('u'); + } + + private function tempDir(): string + { + return sys_get_temp_dir() . '/bqschema_' . bin2hex(random_bytes(6)); + } +} diff --git a/tests/Schema/SchemaValidatedTest.php b/tests/Schema/SchemaValidatedTest.php new file mode 100644 index 0000000..2de677a --- /dev/null +++ b/tests/Schema/SchemaValidatedTest.php @@ -0,0 +1,147 @@ + '{"type":"object","required":["order_id"],"properties":{"order_id":{"type":"integer"}},"additionalProperties":false}', + ]); + } + + public function test_check_valid_invalid_and_unregistered(): void + { + $p = $this->provider(); + self::assertNull(SchemaValidated::check($p, 'urn:babel:orders:created', ['order_id' => 1])); + self::assertNull(SchemaValidated::check($p, 'urn:babel:unknown', ['x' => 1])); // opt-in + self::assertNotNull(SchemaValidated::check($p, 'urn:babel:orders:created', [])); + } + + public function test_assert_throws_on_invalid(): void + { + $this->expectException(InvalidPayloadException::class); + SchemaValidated::assert($this->provider(), 'urn:babel:orders:created', ['order_id' => 'x']); + } + + public function test_assert_passes_valid_and_unregistered(): void + { + $p = $this->provider(); + SchemaValidated::assert($p, 'urn:babel:orders:created', ['order_id' => 1]); + SchemaValidated::assert($p, 'urn:babel:unknown', ['anything' => true]); + $this->expectNotToPerformAssertions(); + } + + public function test_wrap_runs_handler_on_valid_data(): void + { + $calls = 0; + $handler = SchemaValidated::wrap($this->provider(), function (ConsumedMessage $m) use (&$calls): void { + $calls++; + }); + + $handler($this->message('urn:babel:orders:created', ['order_id' => 1])); + + self::assertSame(1, $calls); + } + + public function test_wrap_throws_and_skips_handler_on_invalid_data(): void + { + $calls = 0; + $handler = SchemaValidated::wrap($this->provider(), function (ConsumedMessage $m) use (&$calls): void { + $calls++; + }); + + try { + $handler($this->message('urn:babel:orders:created', [])); // missing order_id + self::fail('expected an InvalidPayloadException'); + } catch (InvalidPayloadException $e) { + self::assertSame('urn:babel:orders:created', $e->urn()); + } + + self::assertSame(0, $calls); + } + + public function test_wrap_runs_handler_for_an_unregistered_urn(): void + { + $calls = 0; + $handler = SchemaValidated::wrap($this->provider(), function (ConsumedMessage $m) use (&$calls): void { + $calls++; + }); + + $handler($this->message('urn:babel:unknown', ['anything' => true])); + + self::assertSame(1, $calls); + } + + /** + * @param array $data + */ + private function message(string $urn, array $data): ConsumedMessage + { + return new FakeMessage([ + 'job' => $urn, + 'trace_id' => 'trace-1', + 'data' => $data, + 'meta' => ['id' => 'm1', 'queue' => 'orders', 'lang' => 'php', 'schema_version' => 1], + 'attempts' => 0, + ]); + } +} + +/** + * Minimal {@see ConsumedMessage} test double (mirrors the one in the idempotency tests). + */ +final class FakeMessage implements ConsumedMessage +{ + /** @param array $envelope */ + public function __construct(private array $envelope) + { + } + + public function getUrn(): string + { + return is_string($this->envelope['job'] ?? null) ? $this->envelope['job'] : ''; + } + + public function getTraceId(): string + { + return is_string($this->envelope['trace_id'] ?? null) ? $this->envelope['trace_id'] : ''; + } + + /** @return array */ + public function getData(): array + { + return is_array($this->envelope['data'] ?? null) ? $this->envelope['data'] : []; + } + + /** @return array */ + public function getMeta(): array + { + return is_array($this->envelope['meta'] ?? null) ? $this->envelope['meta'] : []; + } + + public function attempts(): int + { + return is_int($this->envelope['attempts'] ?? null) ? $this->envelope['attempts'] : 0; + } + + /** @return array */ + public function envelope(): array + { + return $this->envelope; + } +} diff --git a/tests/conformance/manifest.json b/tests/conformance/manifest.json index 78e5c3a..5b2fee4 100644 --- a/tests/conformance/manifest.json +++ b/tests/conformance/manifest.json @@ -226,5 +226,28 @@ "x-attempts": 0 } } + }, + "payload_schema": { + "description": "Per-URN data schema validation (ADR-0024). Each case validates `data` against `schema`; every SDK's optional payload validator (Go schema, PHP BabelQueue\\Schema, Python babelqueue.schema) MUST agree on `valid`. The wire envelope stays frozen — this governs the data block only, and is opt-in (consumers/producers without a registered schema skip it).", + "schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "required": ["order_id"], + "properties": { + "order_id": { "type": "integer", "minimum": 1 }, + "amount": { "type": "number", "minimum": 0 }, + "currency": { "enum": ["USD", "EUR", "TRY"] } + }, + "additionalProperties": false + }, + "cases": [ + { "name": "valid-minimal", "valid": true, "data": { "order_id": 1042 } }, + { "name": "valid-full", "valid": true, "data": { "order_id": 1042, "amount": 99.9, "currency": "USD" } }, + { "name": "invalid-missing-required", "valid": false, "data": { "amount": 10 } }, + { "name": "invalid-wrong-type", "valid": false, "data": { "order_id": "x" } }, + { "name": "invalid-additional-property", "valid": false, "data": { "order_id": 1, "extra": true } }, + { "name": "invalid-enum", "valid": false, "data": { "order_id": 1, "currency": "GBP" } }, + { "name": "invalid-below-minimum", "valid": false, "data": { "order_id": 0 } } + ] } }