Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions src/BabelQueue.Core/Idempotency.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
namespace BabelQueue;

/// <summary>
/// A consume handler: processes one decoded <see cref="Envelope"/>. May be async; a
/// thrown/faulted handler leaves the message unacknowledged so the runtime redelivers it
/// (the core is codec-only, so an adapter drives the actual consume loop).
/// </summary>
public delegate Task Handler(Envelope envelope);

/// <summary>
/// A pluggable record of message ids already processed, keyed on the envelope's
/// <c>meta.id</c>. The reference <see cref="InMemoryStore"/> is for tests / single-process
/// consumers; production backends (Redis, a database table) implement the same three
/// methods. "Seen-set" post-success dedupe — not exactly-once, not in-flight locking; a
/// transactional / outbox mode is a documented future direction (ADR-0022).
/// </summary>
public interface IIdempotencyStore
{
/// <summary>Whether this message id has already been processed (remembered).</summary>
bool Seen(string messageId);

/// <summary>Records this message id as processed.</summary>
void Remember(string messageId);

/// <summary>Drops an id from the store (manual eviction; a backend may also expire ids).</summary>
void Forget(string messageId);
}

/// <summary>
/// Process-local, thread-safe <see cref="IIdempotencyStore"/> backed by a set. For tests
/// and single-process consumers; not shared across workers and not persistent — use a
/// Redis- or database-backed store for production fleets.
/// </summary>
public sealed class InMemoryStore : IIdempotencyStore
{
private readonly HashSet<string> _ids = new();
private readonly object _gate = new();

/// <inheritdoc/>
public bool Seen(string messageId)
{
lock (_gate)
{
return _ids.Contains(messageId);
}
}

/// <inheritdoc/>
public void Remember(string messageId)
{
lock (_gate)
{
_ids.Add(messageId);
}
}

/// <inheritdoc/>
public void Forget(string messageId)
{
lock (_gate)
{
_ids.Remove(messageId);
}
}
}

/// <summary>
/// Wraps a <see cref="Handler"/> so a message whose <c>meta.id</c> was already processed
/// successfully is skipped (ADR-0022) — the .NET mirror of the PHP, Go, Python, and Node
/// helpers. A previously-seen id returns early (so an adapter acks it); a thrown/faulted
/// handler leaves the id unmarked so a redelivery runs it again; a message with no usable
/// <c>meta.id</c> runs unchanged.
/// </summary>
public static class Idempotency
{
/// <summary>Returns <paramref name="handler"/> guarded by dedupe on <c>meta.id</c>.</summary>
public static Handler Wrap(IIdempotencyStore store, Handler handler) =>
async envelope =>
{
string? id = envelope.Meta?.Id;

// No usable id → cannot dedupe; run the handler unchanged.
if (string.IsNullOrEmpty(id))
{
await handler(envelope).ConfigureAwait(false);
return;
}

// Already processed on an earlier delivery: return so the adapter acks it.
if (store.Seen(id))
{
return;
}

// First success wins; a throw here leaves the id unmarked → retry/DLQ apply.
await handler(envelope).ConfigureAwait(false);
store.Remember(id);
};
}
16 changes: 16 additions & 0 deletions src/BabelQueue.Core/Schema/ISchemaProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
using System.Collections.Generic;

namespace BabelQueue.Schema;

/// <summary>
/// A source of per-URN <c>data</c> schemas, keyed on the message URN (ADR-0024). Returns the
/// decoded JSON Schema for a URN's <c>data</c> block, or <c>null</c> when none is registered —
/// in which case the caller skips validation (the feature is opt-in). The reference
/// <see cref="MapProvider"/> is in-memory; the I/O-free core ships no file-based provider, so a
/// .NET app reads its babelqueue-registry <c>registry.json</c> and passes the schemas in.
/// </summary>
public interface ISchemaProvider
{
/// <summary>The decoded JSON Schema registered for <paramref name="urn"/>, or null.</summary>
IReadOnlyDictionary<string, object?>? SchemaFor(string urn);
}
26 changes: 26 additions & 0 deletions src/BabelQueue.Core/Schema/InvalidPayloadException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using BabelQueue;

namespace BabelQueue.Schema;

/// <summary>
/// Raised when a message's <c>data</c> does not match the JSON Schema registered for its URN
/// (ADR-0024). The consumer-side <see cref="SchemaValidation.Wrap"/> throws it so the adapter
/// redelivers (and eventually dead-letters) a poison message; the recommended primary use is
/// producer-side (<see cref="SchemaValidation.Validate"/>) so invalid data never enters the queue.
/// </summary>
public sealed class InvalidPayloadException : BabelQueueException
{
/// <summary>Create the exception for a URN whose data violated its schema.</summary>
public InvalidPayloadException(string urn, string violation)
: base($"Message data for \"{urn}\" does not match its URN schema: {violation}.")
{
Urn = urn;
Violation = violation;
}

/// <summary>The message URN whose schema was violated.</summary>
public string Urn { get; }

/// <summary>The first <c>"&lt;json-pointer&gt;: &lt;reason&gt;"</c> mismatch.</summary>
public string Violation { get; }
}
31 changes: 31 additions & 0 deletions src/BabelQueue.Core/Schema/MapProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System.Collections.Generic;

namespace BabelQueue.Schema;

/// <summary>In-memory <see cref="ISchemaProvider"/>, for tests and for embedding schemas in code.</summary>
public sealed class MapProvider : ISchemaProvider
{
private readonly Dictionary<string, IReadOnlyDictionary<string, object?>> _schemas;

/// <summary>Build a provider from URN to already-decoded JSON Schema maps.</summary>
public MapProvider(IReadOnlyDictionary<string, IReadOnlyDictionary<string, object?>> schemas)
{
_schemas = new Dictionary<string, IReadOnlyDictionary<string, object?>>(schemas);
}

/// <summary>Build a provider from URN to raw JSON Schema strings, parsing each.</summary>
public static MapProvider FromJson(IReadOnlyDictionary<string, string> raw)
{
var schemas = new Dictionary<string, IReadOnlyDictionary<string, object?>>();
foreach (var entry in raw)
{
schemas[entry.Key] = SchemaJson.ParseObject(entry.Value);
}

return new MapProvider(schemas);
}

/// <inheritdoc/>
public IReadOnlyDictionary<string, object?>? SchemaFor(string urn) =>
_schemas.TryGetValue(urn, out var schema) ? schema : null;
}
209 changes: 209 additions & 0 deletions src/BabelQueue.Core/Schema/PayloadValidator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
using System;
using System.Collections.Generic;

namespace BabelQueue.Schema;

/// <summary>
/// Validates a message's <c>data</c> block against a per-URN JSON Schema (ADR-0024). A
/// hand-rolled subset of Draft-07 (zero dependencies) whose verdicts match the Go, PHP,
/// Python, Node and Java validators and babelqueue-registry's <c>compat</c> linter. Supported
/// keywords: <c>type</c>, <c>required</c>, <c>properties</c>, <c>additionalProperties</c>,
/// <c>items</c>, <c>enum</c>, <c>const</c>, <c>minLength</c>, <c>minimum</c>; unknown keywords
/// are ignored. It works on the materialized <c>object?</c> tree the codec produces.
/// </summary>
public static class PayloadValidator
{
private static readonly IReadOnlyDictionary<string, object?> EmptyProps =
new Dictionary<string, object?>();

/// <summary>
/// The first violation of <paramref name="value"/> against <paramref name="schema"/> as
/// <c>"&lt;json-pointer&gt;: &lt;reason&gt;"</c>, or <c>null</c> when it conforms.
/// </summary>
public static string? Validate(IReadOnlyDictionary<string, object?> schema, object? value) =>
ValidateNode(schema, value, string.Empty);

private static string? ValidateNode(IReadOnlyDictionary<string, object?> schema, object? value, string path)
{
if (schema.TryGetValue("const", out var constValue) && !Equal(value, constValue))
{
return Violation(path, "wrong_const");
}

if (schema.TryGetValue("enum", out var enumObj)
&& enumObj is IReadOnlyList<object?> enumValues
&& !Contains(enumValues, value))
{
return Violation(path, "not_in_enum");
}

var type = schema.TryGetValue("type", out var t) && t is string s ? s : string.Empty;
return type switch
{
"object" => CheckObject(schema, value, path),
"array" => CheckArray(schema, value, path),
"string" => CheckString(schema, value, path),
"integer" => IsInteger(value) ? CheckMinimum(schema, value, path) : Violation(path, "not_an_integer"),
"number" => IsNumber(value) ? CheckMinimum(schema, value, path) : Violation(path, "not_a_number"),
"boolean" => value is bool ? null : Violation(path, "not_a_boolean"),
"null" => value is null ? null : Violation(path, "not_null"),
_ => null,
};
}

private static string? CheckObject(IReadOnlyDictionary<string, object?> schema, object? value, string path)
{
if (value is not IReadOnlyDictionary<string, object?> obj)
{
return Violation(path, "not_an_object");
}

if (schema.TryGetValue("required", out var req) && req is IReadOnlyList<object?> required)
{
foreach (var key in required)
{
if (key is string name && !obj.ContainsKey(name))
{
return Violation(Join(path, name), "missing_required");
}
}
}

var properties = schema.TryGetValue("properties", out var p)
&& p is IReadOnlyDictionary<string, object?> props
? props
: EmptyProps;
var additionalAllowed = !(schema.TryGetValue("additionalProperties", out var ap) && ap is false);

foreach (var member in obj)
{
if (properties.TryGetValue(member.Key, out var ps) && ps is IReadOnlyDictionary<string, object?> propSchema)
{
var found = ValidateNode(propSchema, member.Value, Join(path, member.Key));
if (found is not null)
{
return found;
}
}
else if (!additionalAllowed)
{
return Violation(Join(path, member.Key), "additional_not_allowed");
}
}

return null;
}

private static string? CheckArray(IReadOnlyDictionary<string, object?> schema, object? value, string path)
{
if (value is not IReadOnlyList<object?> list)
{
return Violation(path, "not_an_array");
}

if (!(schema.TryGetValue("items", out var it) && it is IReadOnlyDictionary<string, object?> items))
{
return null;
}

for (var i = 0; i < list.Count; i++)
{
var found = ValidateNode(items, list[i], path + "[" + i + "]");
if (found is not null)
{
return found;
}
}

return null;
}

private static string? CheckString(IReadOnlyDictionary<string, object?> schema, object? value, string path)
{
if (value is not string str)
{
return Violation(path, "not_a_string");
}

if (schema.TryGetValue("minLength", out var ml) && ml is long min && str.Length < min)
{
return Violation(path, "below_min_length");
}

return null;
}

private static string? CheckMinimum(IReadOnlyDictionary<string, object?> schema, object? value, string path)
{
if (schema.TryGetValue("minimum", out var m)
&& TryDouble(m, out var min)
&& TryDouble(value, out var actual)
&& actual < min)
{
return Violation(path, "below_minimum");
}

return null;
}

// JSON numbers materialize to long (integers) or double (fractions); a whole-valued double
// still counts as an integer, matching the other SDKs. bool is not long/double.
private static bool IsInteger(object? value) => value switch
{
long or int => true,
double d => !double.IsInfinity(d) && d == Math.Floor(d),
_ => false,
};

private static bool IsNumber(object? value) => value is long or int or double;

private static bool TryDouble(object? value, out double result)
{
switch (value)
{
case long l:
result = l;
return true;
case int i:
result = i;
return true;
case double d:
result = d;
return true;
default:
result = 0;
return false;
}
}

// Type-aware equality: a long never equals a double, true never equals 1 — matching the
// strict comparisons in the other SDK validators.
private static bool Equal(object? a, object? b)
{
if (a is null || b is null)
{
return a is null && b is null;
}

return a.GetType() == b.GetType() && a.Equals(b);
}

private static bool Contains(IReadOnlyList<object?> values, object? value)
{
foreach (var item in values)
{
if (Equal(value, item))
{
return true;
}
}

return false;
}

private static string Violation(string path, string reason) =>
(path.Length == 0 ? "<root>" : path) + ": " + reason;

private static string Join(string path, string key) =>
path.Length == 0 ? key : path + "." + key;
}
Loading