From 5df57bc3754192e7094d4ef3fb7f392ad569805e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Muhammet=20=C5=9Eafak?= Date: Fri, 19 Jun 2026 06:31:02 +0300 Subject: [PATCH] feat(otel): optional OpenTelemetry tracing module (ADR-0025) Mirrors the Go/Python/Node otel modules: a new BabelQueue\Otel\Tracing (open-telemetry/api is an optional 'suggest' dependency, so the core stays dependency-light) emitting produce/consume spans correlated across hops via trace_id<->32-hex OTel TraceId. Tracing::wrap (consumer span, mirroring SchemaValidated/Idempotent::wrap) + Tracing::publish (producer span). Envelope untouched (GR-1); opt-in. phpunit 160 green; phpstan level 9 clean. --- composer.json | 5 +- src/Otel/Tracing.php | 190 +++++++++++++++++++++++++++++ tests/Otel/TracingTest.php | 237 +++++++++++++++++++++++++++++++++++++ 3 files changed, 431 insertions(+), 1 deletion(-) create mode 100644 src/Otel/Tracing.php create mode 100644 tests/Otel/TracingTest.php diff --git a/composer.json b/composer.json index 2c0e613..acd25df 100644 --- a/composer.json +++ b/composer.json @@ -27,6 +27,8 @@ }, "require-dev": { "mockery/mockery": "^1.6", + "open-telemetry/api": "^1.9", + "open-telemetry/sdk": "^1.14", "php-amqplib/php-amqplib": "^3.5", "phpstan/phpstan": "^2.0", "phpunit/phpunit": "^10.5|^11.0", @@ -39,7 +41,8 @@ "aws/aws-sdk-php": "For the framework-less Amazon SQS transport (BabelQueue\\Transport\\SqsTransport).", "stomp-php/stomp-php": "To produce to Apache ActiveMQ Artemis over STOMP (the \u00a77 PHP path) via StompTransport.", "ext-rdkafka": "To produce to Apache Kafka (the \u00a76 PHP path) via KafkaTransport (the php-rdkafka PECL extension over librdkafka; opt-in, relaxes GR-7 for Kafka \u2014 ADR-0019).", - "textalk/websocket": "Pure-PHP WebSocket client to produce to Apache Pulsar (the \u00a75 PHP path) via PulsarTransport over Pulsar's WebSocket API (GR-7 intact \u2014 ADR-0020)." + "textalk/websocket": "Pure-PHP WebSocket client to produce to Apache Pulsar (the \u00a75 PHP path) via PulsarTransport over Pulsar's WebSocket API (GR-7 intact \u2014 ADR-0020).", + "open-telemetry/api": "To emit OpenTelemetry produce/consume spans via BabelQueue\\Otel\\Tracing (opt-in, ADR-0025; also install open-telemetry/sdk to export)." }, "autoload": { "psr-4": { diff --git a/src/Otel/Tracing.php b/src/Otel/Tracing.php new file mode 100644 index 0000000..2fa3741 --- /dev/null +++ b/src/Otel/Tracing.php @@ -0,0 +1,190 @@ +on('urn:babel:orders:created', Tracing::wrap($tracer, $handler)); // consumer + * Tracing::publish($tracer, $transport, 'urn:babel:orders:created', $data); // producer + * + * Every hop that shares a `trace_id` shares one OTel trace. Exact cross-hop *span* + * parent-child linkage (W3C `traceparent` as a transport header) is a documented follow-up. + */ +final class Tracing +{ + private const SYSTEM = 'babelqueue'; + private const INVALID_TRACE_ID = '00000000000000000000000000000000'; + private const INVALID_SPAN_ID = '0000000000000000'; + + /** + * Map an envelope `trace_id` to a deterministic 32-hex OTel trace id: a UUID maps to its + * hex bytes; any other string is hashed (SHA-256, first 16 bytes). The inverse of + * {@see self::uuidOf()} for the UUID case. + */ + public static function traceIdOf(string $traceId): string + { + $hex = strtolower(str_replace('-', '', $traceId)); + if (preg_match('/^[0-9a-f]{32}$/', $hex) === 1 && $hex !== self::INVALID_TRACE_ID) { + return $hex; + } + + return substr(hash('sha256', $traceId), 0, 32); + } + + /** + * Format a 32-hex OTel trace id as a canonical UUID string — the form a producer stamps + * into the message's `trace_id` so a consumer can recover the same trace id via + * {@see self::traceIdOf()}. + */ + public static function uuidOf(string $traceIdHex): string + { + $hex = substr(str_pad(strtolower(str_replace('-', '', $traceIdHex)), 32, '0', STR_PAD_LEFT), 0, 32); + + return sprintf( + '%s-%s-%s-%s-%s', + substr($hex, 0, 8), + substr($hex, 8, 4), + substr($hex, 12, 4), + substr($hex, 16, 4), + substr($hex, 20, 12), + ); + } + + /** + * Wrap a consume handler so each message emits a CONSUMER span `process ` in the OTel + * trace derived from its `trace_id`, recording the handler's error/status. Mirrors the + * {@see \BabelQueue\Schema\SchemaValidated::wrap()} / {@see \BabelQueue\Idempotency\Idempotent::wrap()} + * shape and composes with the consume runtime's ack-on-return / redeliver-on-throw contract. + * + * @param callable(ConsumedMessage): void $handler + * @return callable(ConsumedMessage): void + */ + public static function wrap(TracerInterface $tracer, callable $handler): callable + { + return static function (ConsumedMessage $message) use ($tracer, $handler): void { + $meta = $message->getMeta(); + $span = $tracer->spanBuilder('process ' . $message->getUrn()) + ->setParent(self::parentContext($message->getTraceId())) + ->setSpanKind(SpanKind::KIND_CONSUMER) + ->setAttributes([ + 'messaging.system' => self::SYSTEM, + 'messaging.operation' => 'process', + 'messaging.destination.name' => self::stringMeta($meta, 'queue'), + 'messaging.message.id' => self::stringMeta($meta, 'id'), + 'messaging.message.conversation_id' => $message->getTraceId(), + 'messaging.babelqueue.attempts' => $message->attempts(), + ]) + ->startSpan(); + $scope = $span->activate(); + + try { + $handler($message); + } catch (Throwable $e) { + $span->recordException($e); + $span->setStatus(StatusCode::STATUS_ERROR, $e->getMessage()); + + throw $e; + } finally { + $scope->detach(); + $span->end(); + } + }; + } + + /** + * Publish under a PRODUCER span `publish `, carrying the active trace's id into the + * built envelope's `trace_id` so the downstream consumer recovers the same trace. Encodes + * and publishes through the given {@see Transport}; returns the transport's message id. + * + * @param array $data + */ + public static function publish( + TracerInterface $tracer, + Transport $transport, + string $urn, + array $data = [], + string $queue = 'default', + ): ?string { + $span = $tracer->spanBuilder('publish ' . $urn) + ->setSpanKind(SpanKind::KIND_PRODUCER) + ->setAttributes([ + 'messaging.system' => self::SYSTEM, + 'messaging.operation' => 'publish', + 'messaging.destination.name' => $urn, + ]) + ->startSpan(); + $scope = $span->activate(); + + try { + $traceId = self::uuidOf($span->getContext()->getTraceId()); + $envelope = EnvelopeCodec::make($urn, $data, $queue, $traceId); + $result = $transport->publish(EnvelopeCodec::encode($envelope), $queue); + $span->setAttribute('messaging.message.id', self::stringMeta($envelope['meta'], 'id')); + + return $result; + } catch (Throwable $e) { + $span->recordException($e); + $span->setStatus(StatusCode::STATUS_ERROR, $e->getMessage()); + + throw $e; + } finally { + $scope->detach(); + $span->end(); + } + } + + /** + * A context carrying a remote parent in the `trace_id`-derived trace, so a span started + * from it lands in that trace (cross-hop correlation). The parent span id is derived + * deterministically (and non-zero) so the context is valid. + */ + private static function parentContext(string $traceId): ContextInterface + { + $spanContext = SpanContext::createFromRemoteParent( + self::traceIdOf($traceId), + self::spanIdOf($traceId), + TraceFlags::SAMPLED, + ); + + return Context::getCurrent()->withContextValue(Span::wrap($spanContext)); + } + + private static function spanIdOf(string $traceId): string + { + $spanId = substr(hash('sha256', 'babelqueue-span:' . $traceId), 0, 16); + + return $spanId === self::INVALID_SPAN_ID ? '0000000000000001' : $spanId; + } + + /** + * @param array $meta + */ + private static function stringMeta(array $meta, string $key): string + { + return isset($meta[$key]) && is_string($meta[$key]) ? $meta[$key] : ''; + } +} diff --git a/tests/Otel/TracingTest.php b/tests/Otel/TracingTest.php new file mode 100644 index 0000000..f6a988b --- /dev/null +++ b/tests/Otel/TracingTest.php @@ -0,0 +1,237 @@ + TraceId mapping and + * the consumer/producer spans, verified against an in-memory span exporter. + */ +final class TracingTest extends TestCase +{ + private const TRACE_ID = '7b3f9c2a-e41d-4f88-9b2a-1c0d5e6f7a8b'; + + /** @var ArrayObject */ + private ArrayObject $storage; + + private TracerProvider $provider; + + private TracerInterface $tracer; + + protected function setUp(): void + { + /** @var ArrayObject $storage */ + $storage = new ArrayObject(); + $this->storage = $storage; + $this->provider = new TracerProvider(new SimpleSpanProcessor(new InMemoryExporter($this->storage))); + $this->tracer = $this->provider->getTracer('test'); + } + + protected function tearDown(): void + { + $this->provider->shutdown(); + } + + public function test_trace_id_round_trips_a_uuid(): void + { + $hex = Tracing::traceIdOf(self::TRACE_ID); + self::assertMatchesRegularExpression('/^[0-9a-f]{32}$/', $hex); + self::assertSame(self::TRACE_ID, Tracing::uuidOf($hex)); + } + + /** + * @return array + */ + public static function nonUuidProvider(): array + { + return [ + 'plain string' => ['not-a-uuid'], + '32 non-hex chars' => [str_repeat('z', 32)], + ]; + } + + #[DataProvider('nonUuidProvider')] + public function test_non_uuid_trace_id_is_hashed_to_a_valid_id(string $traceId): void + { + $hex = Tracing::traceIdOf($traceId); + self::assertMatchesRegularExpression('/^[0-9a-f]{32}$/', $hex); + self::assertSame($hex, Tracing::traceIdOf($traceId)); // deterministic + self::assertNotSame(Tracing::traceIdOf(self::TRACE_ID), $hex); + } + + public function test_wrap_emits_a_consumer_span_in_the_trace_id_trace(): void + { + $called = false; + $handler = Tracing::wrap($this->tracer, function (ConsumedMessage $message) use (&$called): void { + $called = true; + }); + + $handler($this->message()); + + self::assertTrue($called); + $span = $this->firstSpan(); + self::assertSame('process urn:babel:orders:created', $span->getName()); + self::assertSame(SpanKind::KIND_CONSUMER, $span->getKind()); + self::assertSame(Tracing::traceIdOf(self::TRACE_ID), $span->getContext()->getTraceId()); + + $attributes = $span->getAttributes()->toArray(); + self::assertSame('babelqueue', $attributes['messaging.system']); + self::assertSame(self::TRACE_ID, $attributes['messaging.message.conversation_id']); + self::assertSame('orders', $attributes['messaging.destination.name']); + self::assertSame('m1', $attributes['messaging.message.id']); + } + + public function test_wrap_records_handler_error_and_rethrows(): void + { + $handler = Tracing::wrap($this->tracer, function (ConsumedMessage $message): void { + throw new RuntimeException('boom'); + }); + + try { + $handler($this->message()); + self::fail('expected a RuntimeException'); + } catch (RuntimeException $e) { + self::assertSame('boom', $e->getMessage()); + } + + $span = $this->firstSpan(); + self::assertSame(StatusCode::STATUS_ERROR, $span->getStatus()->getCode()); + self::assertNotEmpty($span->getEvents()); // the recorded exception + } + + public function test_publish_emits_a_producer_span_and_stamps_trace_id(): void + { + $transport = new RecordingTransport(); + + $id = Tracing::publish($this->tracer, $transport, 'urn:babel:orders:created', ['order_id' => 7]); + + self::assertSame('msg-123', $id); + $span = $this->firstSpan(); + self::assertSame(SpanKind::KIND_PRODUCER, $span->getKind()); + + self::assertNotNull($transport->lastPayload); + $envelope = json_decode($transport->lastPayload, true); + self::assertIsArray($envelope); + self::assertIsArray($envelope['meta'] ?? null); + // the span's message id is the envelope's own id (meta.id), like every other SDK + self::assertSame($envelope['meta']['id'], $span->getAttributes()->toArray()['messaging.message.id'] ?? null); + // the published trace_id encodes the producer span's trace, so a consumer recovers it + self::assertSame(Tracing::uuidOf($span->getContext()->getTraceId()), $envelope['trace_id']); + self::assertSame($span->getContext()->getTraceId(), Tracing::traceIdOf((string) $envelope['trace_id'])); + } + + public function test_publish_records_a_failing_transport_and_rethrows(): void + { + $transport = new class implements Transport { + public function publish(string $payload, ?string $queue = null): ?string + { + throw new RuntimeException('transport down'); + } + }; + + try { + Tracing::publish($this->tracer, $transport, 'urn:babel:orders:created'); + self::fail('expected a RuntimeException'); + } catch (RuntimeException $e) { + self::assertSame('transport down', $e->getMessage()); + } + + $span = $this->firstSpan(); + self::assertSame(SpanKind::KIND_PRODUCER, $span->getKind()); + self::assertSame(StatusCode::STATUS_ERROR, $span->getStatus()->getCode()); + } + + private function firstSpan(): ImmutableSpan + { + $span = $this->storage[0] ?? null; + self::assertInstanceOf(ImmutableSpan::class, $span); + + return $span; + } + + private function message(): ConsumedMessage + { + return new OtelFakeMessage([ + 'job' => 'urn:babel:orders:created', + 'trace_id' => self::TRACE_ID, + 'data' => ['order_id' => 1], + 'meta' => ['id' => 'm1', 'queue' => 'orders', 'lang' => 'php', 'schema_version' => 1], + 'attempts' => 0, + ]); + } +} + +/** + * Records the last published payload so the test can assert the stamped trace_id. + */ +final class RecordingTransport implements Transport +{ + public ?string $lastPayload = null; + + public function publish(string $payload, ?string $queue = null): ?string + { + $this->lastPayload = $payload; + + return 'msg-123'; + } +} + +/** + * Minimal {@see ConsumedMessage} test double. + */ +final class OtelFakeMessage implements ConsumedMessage +{ + /** @param array $envelope */ + public function __construct(private array $envelope) + { + } + + public function getUrn(): string + { + return is_string($this->envelope['job'] ?? null) ? $this->envelope['job'] : ''; + } + + public function getTraceId(): string + { + return is_string($this->envelope['trace_id'] ?? null) ? $this->envelope['trace_id'] : ''; + } + + /** @return array */ + public function getData(): array + { + return is_array($this->envelope['data'] ?? null) ? $this->envelope['data'] : []; + } + + /** @return array */ + public function getMeta(): array + { + return is_array($this->envelope['meta'] ?? null) ? $this->envelope['meta'] : []; + } + + public function attempts(): int + { + return is_int($this->envelope['attempts'] ?? null) ? $this->envelope['attempts'] : 0; + } + + /** @return array */ + public function envelope(): array + { + return $this->envelope; + } +}