diff --git a/src/BabelQueue.Core/Tracing/Telemetry.cs b/src/BabelQueue.Core/Tracing/Telemetry.cs new file mode 100644 index 0000000..16c7e81 --- /dev/null +++ b/src/BabelQueue.Core/Tracing/Telemetry.cs @@ -0,0 +1,186 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; +using System.Security.Cryptography; +using System.Text; +using System.Threading.Tasks; +using BabelQueue; + +namespace BabelQueue.Tracing; + +/// +/// Optional OpenTelemetry tracing for a babelqueue producer or consumer (ADR-0025) — the .NET +/// mirror of the Go otel, Python babelqueue.otel and Node @babelqueue/core/otel +/// helpers. It emits a CONSUMER per handled message and a PRODUCER activity +/// per publish, correlating them across every hop and SDK through the envelope's trace_id — +/// a UUID, which maps 1:1 to a 16-byte . +/// +/// +/// +/// It is built on — the BCL primitive that the +/// OpenTelemetry .NET SDK consumes — so the core takes no OpenTelemetry package dependency. +/// To export, a consumer wires an OpenTelemetry TracerProvider that listens to the source +/// named (e.g. builder.AddSource("BabelQueue")); with no +/// listener the helpers are nearly free and emit nothing. The wire envelope is untouched (GR-1). +/// +/// +/// Honest limit: every hop that shares a trace_id shares one trace, but exact cross-hop +/// span parent-child linkage (propagating a W3C traceparent as a transport header) +/// is a documented follow-up; this maps trace_id to the trace id only. +/// +/// +public static class Telemetry +{ + /// + /// The name of the these helpers emit on. + /// Add it to an OpenTelemetry TracerProvider (AddSource("BabelQueue")) to export. + /// + public const string ActivitySourceName = "BabelQueue"; + + private const string System = "babelqueue"; + private const string ZeroTraceId = "00000000000000000000000000000000"; + + private static readonly ActivitySource Source = new(ActivitySourceName); + + /// + /// Maps an envelope trace_id to a deterministic : a UUID + /// maps to its 16 hex bytes; any other string is hashed (SHA-256, first 16 bytes). The inverse + /// of for the UUID case. + /// + public static ActivityTraceId TraceIdOf(string traceId) + { + string hex = (traceId ?? string.Empty).Replace("-", string.Empty, StringComparison.Ordinal).ToLowerInvariant(); + if (hex.Length == 32 && IsHex(hex) && hex != ZeroTraceId) + { + return ActivityTraceId.CreateFromString(hex.AsSpan()); + } + + byte[] digest = SHA256.HashData(Encoding.UTF8.GetBytes(traceId ?? string.Empty)); + return ActivityTraceId.CreateFromBytes(digest.AsSpan(0, 16)); + } + + /// + /// Formats an as a canonical UUID string — the form a producer + /// stamps into the message's trace_id so a consumer can recover the same trace id via + /// . + /// + public static string UuidOf(ActivityTraceId traceId) + { + string h = traceId.ToHexString(); + return string.Create(CultureInfo.InvariantCulture, $"{h[..8]}-{h[8..12]}-{h[12..16]}-{h[16..20]}-{h[20..32]}"); + } + + /// + /// Returns wrapped to emit a CONSUMER activity process <urn> + /// in the trace derived from the envelope's trace_id, recording the handler's error/status. + /// The handler still receives the full ; a throw is recorded and re-thrown so + /// the runtime's retry / dead-letter path still applies. + /// + public static Handler Wrap(Handler handler) => + async envelope => + { + string traceId = envelope.TraceId ?? string.Empty; + string urn = EnvelopeCodec.Urn(envelope); + + ActivityContext parent = traceId.Length == 0 + ? default + : new ActivityContext(TraceIdOf(traceId), SpanIdOf(traceId), ActivityTraceFlags.Recorded, isRemote: true); + + using Activity? activity = Source.StartActivity($"process {urn}", ActivityKind.Consumer, parent); + ApplyConsumeTags(activity, envelope, traceId); + + try + { + await handler(envelope).ConfigureAwait(false); + } + catch (Exception ex) + { + RecordError(activity, ex); + throw; + } + }; + + /// + /// Runs a publish under a PRODUCER activity publish <urn>, carrying the active trace's + /// id into the built envelope's trace_id so the downstream consumer recovers the same trace. + /// performs the real transport write and its result is returned. + /// + public static async Task PublishAsync( + string urn, + IReadOnlyDictionary? data, + Func> send, + string queue = "default") + { + ArgumentNullException.ThrowIfNull(send); + + using Activity? activity = Source.StartActivity($"publish {urn}", ActivityKind.Producer); + activity?.SetTag("messaging.system", System); + activity?.SetTag("messaging.operation", "publish"); + activity?.SetTag("messaging.destination.name", urn); + + try + { + string traceId = activity is not null ? UuidOf(activity.TraceId) : Guid.NewGuid().ToString(); + Envelope envelope = EnvelopeCodec.Make(urn, data, queue, traceId); + TResult result = await send(envelope).ConfigureAwait(false); + activity?.SetTag("messaging.message.id", envelope.Meta?.Id); + return result; + } + catch (Exception ex) + { + RecordError(activity, ex); + throw; + } + } + + private static void ApplyConsumeTags(Activity? activity, Envelope envelope, string traceId) + { + if (activity is null) + { + return; + } + + activity.SetTag("messaging.system", System); + activity.SetTag("messaging.operation", "process"); + activity.SetTag("messaging.destination.name", envelope.Meta?.Queue ?? string.Empty); + activity.SetTag("messaging.message.id", envelope.Meta?.Id ?? string.Empty); + activity.SetTag("messaging.message.conversation_id", traceId); + activity.SetTag("messaging.babelqueue.attempts", envelope.Attempts); + } + + private static void RecordError(Activity? activity, Exception ex) + { + if (activity is null) + { + return; + } + + activity.SetStatus(ActivityStatusCode.Error, ex.Message); + activity.AddEvent(new ActivityEvent("exception", tags: new ActivityTagsCollection + { + { "exception.type", ex.GetType().FullName }, + { "exception.message", ex.Message }, + { "exception.stacktrace", ex.ToString() }, + })); + } + + private static ActivitySpanId SpanIdOf(string traceId) + { + byte[] digest = SHA256.HashData(Encoding.UTF8.GetBytes("babelqueue-span:" + traceId)); + return ActivitySpanId.CreateFromBytes(digest.AsSpan(0, 8)); + } + + private static bool IsHex(string value) + { + foreach (char c in value) + { + if (!Uri.IsHexDigit(c)) + { + return false; + } + } + + return true; + } +} diff --git a/tests/BabelQueue.Core.Tests/TelemetryTests.cs b/tests/BabelQueue.Core.Tests/TelemetryTests.cs new file mode 100644 index 0000000..8a3f315 --- /dev/null +++ b/tests/BabelQueue.Core.Tests/TelemetryTests.cs @@ -0,0 +1,136 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading.Tasks; +using BabelQueue; +using BabelQueue.Tracing; +using Xunit; + +namespace BabelQueue.Tests; + +public sealed class TelemetryTests : IDisposable +{ + private const string TraceId = "7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b"; + + private readonly List _activities = new(); + private readonly ActivityListener _listener; + + public TelemetryTests() + { + _listener = new ActivityListener + { + ShouldListenTo = source => source.Name == Telemetry.ActivitySourceName, + Sample = static (ref ActivityCreationOptions _) => ActivitySamplingResult.AllData, + ActivityStopped = activity => _activities.Add(activity), + }; + ActivitySource.AddActivityListener(_listener); + } + + public void Dispose() => _listener.Dispose(); + + private static Envelope MakeEnvelope(int attempts = 0) => + EnvelopeCodec.Make( + "urn:babel:orders:created", + new Dictionary { ["order_id"] = 1 }, + "orders", + TraceId) with + { Attempts = attempts }; + + [Fact] + public void TraceIdRoundTripsThroughUuid() + { + ActivityTraceId tid = Telemetry.TraceIdOf(TraceId); + Assert.Equal(TraceId, Telemetry.UuidOf(tid)); + + // a non-uuid trace_id maps deterministically to a valid, distinct trace id + Assert.Equal(Telemetry.TraceIdOf("not-a-uuid"), Telemetry.TraceIdOf("not-a-uuid")); + Assert.NotEqual(tid, Telemetry.TraceIdOf("not-a-uuid")); + + // 32 chars but not hex → not a UUID, so it is hashed (non-zero) + Assert.NotEqual(default, Telemetry.TraceIdOf(new string('z', 32))); + } + + [Fact] + public async Task WrapEmitsConsumerActivityInTheTraceIdTrace() + { + bool called = false; + Handler handler = Telemetry.Wrap(_ => + { + called = true; + return Task.CompletedTask; + }); + + await handler(MakeEnvelope(attempts: 2)); + + Assert.True(called); + Activity activity = Assert.Single(_activities); + Assert.Equal("process urn:babel:orders:created", activity.DisplayName); + Assert.Equal(ActivityKind.Consumer, activity.Kind); + Assert.Equal(Telemetry.TraceIdOf(TraceId), activity.TraceId); + Assert.Equal(TraceId, activity.GetTagItem("messaging.message.conversation_id")); + Assert.Equal("orders", activity.GetTagItem("messaging.destination.name")); + Assert.Equal(2, activity.GetTagItem("messaging.babelqueue.attempts")); + } + + [Fact] + public async Task WrapToleratesAnEnvelopeMissingOptionalMeta() + { + var partial = new Envelope("urn:babel:orders:created", TraceId, null, null, 0, null); + Handler handler = Telemetry.Wrap(_ => Task.CompletedTask); + + await handler(partial); + + Activity activity = Assert.Single(_activities); + Assert.Equal(string.Empty, activity.GetTagItem("messaging.destination.name")); + Assert.Equal(string.Empty, activity.GetTagItem("messaging.message.id")); + } + + [Fact] + public async Task WrapRecordsHandlerErrorAndRethrows() + { + Handler handler = Telemetry.Wrap(_ => Task.FromException(new InvalidOperationException("boom"))); + + await Assert.ThrowsAsync(() => handler(MakeEnvelope())); + + Activity activity = Assert.Single(_activities); + Assert.Equal(ActivityStatusCode.Error, activity.Status); + Assert.Contains(activity.Events, e => e.Name == "exception"); + } + + [Fact] + public async Task PublishStampsTraceIdFromTheProducerActivity() + { + Envelope? sent = null; + string id = await Telemetry.PublishAsync( + "urn:babel:orders:created", + new Dictionary { ["order_id"] = 7 }, + env => + { + sent = env; + return Task.FromResult(env.Meta!.Id!); + }); + + Activity activity = Assert.Single(_activities); + Assert.Equal(ActivityKind.Producer, activity.Kind); + Assert.NotNull(sent); + + // the published trace_id encodes the producer activity's trace, so a consumer recovers it + Assert.Equal(Telemetry.UuidOf(activity.TraceId), sent!.TraceId); + Assert.Equal(activity.TraceId, Telemetry.TraceIdOf(sent.TraceId!)); + Assert.Equal(id, activity.GetTagItem("messaging.message.id")); + } + + [Fact] + public async Task PublishRecordsAFailingSend() + { + await Assert.ThrowsAsync(() => + Telemetry.PublishAsync( + "urn:babel:orders:created", + null, + _ => Task.FromException(new InvalidOperationException("send failed")))); + + Activity activity = Assert.Single(_activities); + Assert.Equal(ActivityKind.Producer, activity.Kind); + Assert.Equal(ActivityStatusCode.Error, activity.Status); + } +}