Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/Exceptions/InvalidPayloadException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

declare(strict_types=1);

namespace BabelQueue\Exceptions;

/**
* A message's `data` block failed validation against the JSON Schema registered for its URN
* (ADR-0024). {@see self::violation()} is the first `"<json-pointer>: <reason>"` mismatch,
* {@see self::urn()} the message URN, and {@see self::data()} the offending payload — for
* logging, branching, or dead-lettering.
*
* Thrown by the consumer-side {@see \BabelQueue\Schema\SchemaValidated::wrap()} so the
* consume runtime redelivers (and eventually dead-letters) a poison message; the recommended
* primary use is producer-side ({@see \BabelQueue\Schema\SchemaValidated::assert()}) so
* invalid data never enters the queue.
*/
final class InvalidPayloadException extends BabelQueueException
{
/**
* @param array<string, mixed> $data
*/
private function __construct(
private readonly string $violation,
private readonly string $urn,
private readonly array $data,
) {
parent::__construct("Message data for [{$urn}] does not match its URN schema: {$violation}.");
}

/**
* @param array<string, mixed> $data
*/
public static function because(string $violation, string $urn, array $data): self
{
return new self($violation, $urn, $data);
}

/** The first `"<json-pointer>: <reason>"` violation. */
public function violation(): string
{
return $this->violation;
}

/** The message URN whose schema was violated. */
public function urn(): string
{
return $this->urn;
}

/**
* The offending `data` payload.
*
* @return array<string, mixed>
*/
public function data(): array
{
return $this->data;
}
}
39 changes: 39 additions & 0 deletions src/Idempotency/IdempotencyStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?php

declare(strict_types=1);

namespace BabelQueue\Idempotency;

/**
* A pluggable record of message ids that have already been processed, keyed on the
* envelope's `meta.id` (the canonical per-message identity — see
* {@see \BabelQueue\Codec\EnvelopeCodec} and message-envelope.md).
*
* The reference {@see InMemoryStore} is for tests / single-process consumers; production
* backends (Redis, a database table, a PSR-16 cache) implement the same three methods.
*
* The contract is **"seen-set" dedupe**: it answers *"was this id processed?"*, not
* *"what did it return"* — queue handlers have no response to replay (unlike an HTTP
* idempotency key). It provides **post-success** dedupe under at-least-once + idempotent
* handlers (error-handling.md §1), **not** exactly-once and **not** in-flight concurrency
* locking. A transactional/outbox mode is a documented future direction (ADR-0022).
*/
interface IdempotencyStore
{
/**
* Has this message id already been processed (remembered)?
*/
public function seen(string $messageId): bool;

/**
* Record this message id as processed. Called only after the handler returns
* normally, so a thrown handler is retried rather than silently swallowed.
*/
public function remember(string $messageId): void;

/**
* Drop a message id from the store (manual eviction; a backend may also expire ids
* on its own TTL).
*/
public function forget(string $messageId): void;
}
56 changes: 56 additions & 0 deletions src/Idempotency/Idempotent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?php

declare(strict_types=1);

namespace BabelQueue\Idempotency;

use BabelQueue\Contracts\ConsumedMessage;

/**
* Wraps a consume handler so a message whose `meta.id` was already processed
* successfully is **skipped** instead of run again — the dependency-free helper for the
* "handlers SHOULD be idempotent" guidance in error-handling.md §1 (ADR-0022).
*
* It composes with the consume runtime's **ack-on-return / redeliver-on-throw** contract
* ({@see \BabelQueue\Consume\Dispatcher}):
*
* $dispatch->on('urn:babel:orders:created', Idempotent::wrap($store, $handler));
*
* - A previously-succeeded id → the wrapper **returns**, so the loop acks it and the
* broker stops redelivering.
* - The handler **throws** → the id is left unmarked and the exception propagates, so
* retry / DLQ (§§5–6) still apply and a later delivery runs the handler again.
* - A message with no usable `meta.id` runs unchanged (fail-open).
*
* This is the PHP mirror of the Go `idempotency.Wrap(store, handler)` helper.
*/
final class Idempotent
{
/**
* @param callable(ConsumedMessage): void $handler
* @return callable(ConsumedMessage): void
*/
public static function wrap(IdempotencyStore $store, callable $handler): callable
{
return static function (ConsumedMessage $message) use ($store, $handler): void {
$meta = $message->getMeta();
$id = isset($meta['id']) && is_string($meta['id']) ? $meta['id'] : '';

// No usable id → cannot dedupe; run the handler unchanged.
if ($id === '') {
$handler($message);

return;
}

// Already processed on an earlier delivery: skip + return so the loop acks it.
if ($store->seen($id)) {
return;
}

// First success wins. A throw here leaves the id unmarked → retry/DLQ apply.
$handler($message);
$store->remember($id);
};
}
}
31 changes: 31 additions & 0 deletions src/Idempotency/InMemoryStore.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

declare(strict_types=1);

namespace BabelQueue\Idempotency;

/**
* Process-local {@see IdempotencyStore} backed by an array. Suitable for tests and
* single-process consumers; it is **not** shared across workers and **not** persistent —
* use a Redis- or database-backed store for production fleets.
*/
final class InMemoryStore implements IdempotencyStore
{
/** @var array<string, true> */
private array $seen = [];

public function seen(string $messageId): bool
{
return isset($this->seen[$messageId]);
}

public function remember(string $messageId): void
{
$this->seen[$messageId] = true;
}

public function forget(string $messageId): void
{
unset($this->seen[$messageId]);
}
}
87 changes: 87 additions & 0 deletions src/Schema/DirProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

declare(strict_types=1);

namespace BabelQueue\Schema;

use RuntimeException;

/**
* Reads schemas from a babelqueue-registry manifest (`registry.json`): a list of
* `{urn, schema}` entries mapping each URN to a Draft-07 schema file for its `data` block.
* This is the bridge that makes the registry's governed schemas enforceable at runtime.
*
* The manifest is read once in the constructor; schema files are read and decoded lazily and
* cached. A URN that is not in the manifest returns null (skip validation); a URN whose
* schema file is missing or unreadable throws (a configuration/IO error → the consumer
* redelivers until it is fixed). The PHP mirror of the Go `schema.DirProvider`.
*/
final class DirProvider implements SchemaProvider
{
private string $dir;

/** @var array<string, string> urn => schema file path (relative to the manifest dir) */
private array $files = [];

/** @var array<string, array<string, mixed>> lazily-decoded schema cache */
private array $cache = [];

public function __construct(string $manifestPath)
{
$raw = @file_get_contents($manifestPath);
if ($raw === false) {
throw new RuntimeException("schema: cannot read registry manifest [{$manifestPath}].");
}
$manifest = json_decode($raw, true);
if (! is_array($manifest)) {
throw new RuntimeException("schema: invalid registry manifest [{$manifestPath}].");
}

$this->dir = \dirname($manifestPath);

$entries = is_array($manifest['schemas'] ?? null) ? $manifest['schemas'] : [];
foreach ($entries as $entry) {
if (! is_array($entry)) {
continue;
}
$urn = is_string($entry['urn'] ?? null) ? $entry['urn'] : '';
$file = is_string($entry['schema'] ?? null) ? $entry['schema'] : '';
if ($urn === '' || $file === '') {
continue;
}
$this->files[$urn] = $file;
}
}

public function schemaFor(string $urn): ?array
{
if (isset($this->cache[$urn])) {
return $this->cache[$urn];
}
if (! isset($this->files[$urn])) {
return null;
}

$file = $this->files[$urn];
$path = self::isAbsolute($file) ? $file : $this->dir . DIRECTORY_SEPARATOR . $file;

$raw = @file_get_contents($path);
if ($raw === false) {
throw new RuntimeException("schema: cannot read schema for [{$urn}] ({$file}).");
}
$decoded = json_decode($raw, true);
if (! is_array($decoded)) {
throw new RuntimeException("schema: invalid schema for [{$urn}] ({$file}).");
}

/** @var array<string, mixed> $decoded */
$this->cache[$urn] = $decoded;

return $decoded;
}

private static function isAbsolute(string $path): bool
{
return $path !== '' && ($path[0] === '/' || preg_match('#^[A-Za-z]:[\\\\/]#', $path) === 1);
}
}
48 changes: 48 additions & 0 deletions src/Schema/MapProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
<?php

declare(strict_types=1);

namespace BabelQueue\Schema;

use RuntimeException;

/**
* In-memory {@see SchemaProvider}, suitable for tests and for embedding schemas in code. It
* is read-only after construction.
*/
final class MapProvider implements SchemaProvider
{
/**
* @param array<string, array<string, mixed>> $schemas urn => decoded JSON Schema
*/
public function __construct(private readonly array $schemas)
{
}

/**
* Build a MapProvider from URN => raw JSON Schema strings, decoding each.
*
* @param array<string, string> $raw
*/
public static function fromJson(array $raw): self
{
$schemas = [];
foreach ($raw as $urn => $json) {
$decoded = json_decode($json, true);
if (! is_array($decoded)) {
throw new RuntimeException("schema: invalid JSON schema for [{$urn}].");
}
/** @var array<string, mixed> $decoded */
$schemas[$urn] = $decoded;
}

return new self($schemas);
}

public function schemaFor(string $urn): ?array
{
$schema = $this->schemas[$urn] ?? null;

return is_array($schema) ? $schema : null;
}
}
Loading