Skip to content

BabelQueue/babelqueue-java-pulsar

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

BabelQueue — Apache Pulsar (Java)

com.babelqueue:babelqueue-pulsar — an Apache Pulsar transport for BabelQueue, built on pulsar-client and the framework-agnostic babelqueue-core.

A canonical-envelope publisher and a URN-routed consumer, so a Pulsar-based Java service speaks the same wire contract (envelope shape, URN identity, trace propagation) as the .NET, Python, Go and Node SDKs. Implements §5 of the broker-bindings contract.

Install (Maven)

<dependency>
  <groupId>com.babelqueue</groupId>
  <artifactId>babelqueue-pulsar</artifactId>
  <version>1.0.0</version>
</dependency>

It pulls babelqueue-core and org.apache.pulsar:pulsar-client transitively.

Use

PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();

// produce
Producer<byte[]> producer = client.newProducer().topic("orders").create();
String id = PulsarPublisher.create(producer)
    .publish("urn:babel:orders:created", Map.of("order_id", 1042));

// consume (Shared subscription)
Consumer<byte[]> sub = client.newConsumer()
    .topic("orders").subscriptionName("babelqueue")
    .subscriptionType(SubscriptionType.Shared).subscribe();
PulsarConsumer consumer = PulsarConsumer.builder(sub)
    .handler("urn:babel:orders:created", (env, msg) -> {
        // env.data(), env.traceId(), env.attempts() ...
    })
    .onError((err, env, msg) -> err.printStackTrace())
    .build();
consumer.run(); // poll until the thread is interrupted

Delayed delivery: publish(urn, data, traceId, Duration.ofMinutes(5)) → native deliverAfter. The consumer routes purely on the bq-job property, so it never decodes a message it cannot handle.

Contract mapping (§5)

Envelope Apache Pulsar
body message payload (byte-identical across SDKs)
job (URN) property bq-job (consumer routes on this)
trace_id property bq-trace-id
meta.id property bq-message-id
meta.schema_version property bq-schema-version
meta.lang property bq-source-lang
meta.created_at publishTimestamp (mirror; body authoritative)
attempts property bq-attempts (authoritative), cross-checked against getRedeliveryCount()
reserve / ack / retry acknowledge / negativeAcknowledge

Pulsar properties are string→string, so bq-attempts carries the contract attempts and is authoritative. The consumer reconciles to max(bq-attempts, getRedeliveryCount()): getRedeliveryCount() is 0-based (0 on first delivery) so it maps directly to attempts with no −1, and the max never lowers a higher body count — so a republish-driven retry (Go/Python transports increment bq-attempts and re-send) and a native negative-ack redelivery both converge on the same number. A throwing handler negativeAcknowledges the message, so the broker redelivers it (at-least-once); with a native DeadLetterPolicy it eventually moves to the cross-language <queue>.dlq topic. The poll loop never stops on a bad message — observe via onError/onUnknownUrn. The envelope is unchanged (schema_version stays 1); Pulsar is purely additive.

Build & test

mvn verify

The Producer / Consumer / Message / TypedMessageBuilder Pulsar interfaces are mocked with Mockito 5 — no Pulsar, no network. JUnit 5, JaCoCo ≥90% line coverage.

License

MIT

About

Apache Pulsar transport for BabelQueue Java — produce and consume the canonical envelope over Pulsar.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages