Skip to content

Queue stream#1974

Merged
jnioche merged 14 commits into
mainfrom
990
Jul 3, 2026
Merged

Queue stream#1974
jnioche merged 14 commits into
mainfrom
990

Conversation

@jnioche

@jnioche jnioche commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

See discussion on #990

This PR introduces a queue stream and uses it in the OpenSearch module with a new QueueBolt.
This allows decoupling the logic about queues (whether based on hostnames, domain names or anything else) from the storage of the URLs.

See #1973 for related work

Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
…e name in injection.flux

Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
@rzo1

rzo1 commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Note: Also needs to be ported to opensearch-java once merged (or design is settled)

@rzo1 rzo1 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Julien, I like the direction of decoupling the queue bookkeeping from URL storage, and the test coverage for the new bolt is nice. I have a few concerns about the reliability semantics of QueueBolt and the layering of the emit that I think are worth addressing before merging, plus a question about migration for existing crawls.

1. Failed writes are silently lost forever (QueueBolt)

execute() puts the key into knownQueue and acks before the async bulk request completes, and the bolt uses the 2-arg OpenSearchConnection.getConnection(conf, boltType), which installs the no-op BulkProcessor.Listener (empty afterBulk bodies). If OpenSearch is briefly unavailable or the bulk item is rejected when the first tuple for a key arrives, there is no log, no retry, and the cache suppresses every later attempt. That queue is then permanently missing from the index until the worker restarts. Could we use the listener variant of getConnection (as DeletionBolt / IndexerBolt do) and only populate the cache on bulk success?

2. create(true) freezes lastUpdated and produces guaranteed-failing requests

Since documents are create-only, lastUpdated never moves past first insertion. Any consumer that uses it to distinguish active from dead queues (one of the use cases in #990) will get wrong answers. In addition, after a worker restart, or eviction from the hardcoded 10k cache (a broad crawl partitioned by host easily exceeds that), every re-seen key is re-sent as a create that OpenSearch rejects with version_conflict_engine_exception, silently because of point 1. I'd suggest plain index/upsert semantics instead of create(true), and making the cache configurable via a cache spec like status.updater.cache.spec.

3. Stream declared in core, but only the OpenSearch bolt emits

AbstractStatusUpdaterBolt.declareOutputFields now declares the queue stream for all backends (SQL, Solr, urlfrontier, opensearch-java, ...), but only the OpenSearch client StatusUpdaterBolt emits on it. Wiring streamId: queue against any other backend validates and runs, but delivers nothing, with no warning. This diverges from the DELETION_STREAM_NAME pattern, where the abstract base both declares and emits so all backends behave identically. Since the partition key comes from core's own URLPartitioner, nothing OpenSearch-specific prevents emitting from the base class. Alternatively, the declaration could move into the OpenSearch subclass for now. Was the plan to add emits to the other backends in follow-ups?

4. Full Metadata shipped on every status update but never read

With the FIELDS-grouped wiring from the archetype flux, the entire Metadata map is serialized and transferred per status update on the hottest stream of the topology, while QueueBolt only ever calls getStringByField("key"). Emitting just the key would remove that cost. Related: the emit happens before waitAck.addTuple / addToProcessor, so a status write that ultimately fails still leaves a phantom entry in the queues index.

5. Smaller things

  • QueueBolt imports org.joda.time.Instant; joda-time is not declared in external/opensearch/pom.xml and only comes in transitively via the OpenSearch server artifact. java.time.Instant.now().toString() is a drop-in replacement and matches what core and the spouts use.
  • The constant is named ESIndexNameParamName; siblings in this module use the OS prefix.
  • knownQueue is Cache<String, String> storing the docID, but the value is never read; presence-only would do.
  • Both queues.mapping files declare a metadata.* dynamic template, but the bolt only indexes key and lastUpdated, so it can never match.
  • QueueBoltTest.store() stubs getValueByField("metadata") and builds a Metadata object the bolt never reads.

6. Migration path for existing crawls?

For a mature crawl with millions of URLs already in the status index, the queues index only fills as URLs pass through the status updater again. Quiescent hosts (far-future nextFetchDate, or hosts whose URLs are all in terminal states) may not show up for a long time or ever. Should we provide a backfill utility (e.g. a terms aggregation over the status index routing/hostname field bulk-loaded into queues), or at least document that the queue index is only complete for crawls started fresh?

@jnioche

jnioche commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

Thanks Claude, I mean @rzo1 ;-)

1. Failed writes are silently lost forever (QueueBolt)

execute() puts the key into knownQueue and acks before the async bulk request completes, and the bolt uses the 2-arg OpenSearchConnection.getConnection(conf, boltType), which installs the no-op BulkProcessor.Listener (empty afterBulk bodies). If OpenSearch is briefly unavailable or the bulk item is rejected when the first tuple for a key arrives, there is no log, no retry, and the cache suppresses every later attempt. That queue is then permanently missing from the index until the worker restarts. Could we use the listener variant of getConnection (as DeletionBolt / IndexerBolt do) and only populate the cache on bulk success?

yes absolutely

2. create(true) freezes lastUpdated and produces guaranteed-failing requests

Since documents are create-only, lastUpdated never moves past first insertion. Any consumer that uses it to distinguish active from dead queues (one of the use cases in #990) will get wrong answers. In addition, after a worker restart, or eviction from the hardcoded 10k cache (a broad crawl partitioned by host easily exceeds that), every re-seen key is re-sent as a create that OpenSearch rejects with version_conflict_engine_exception, silently because of point 1. I'd suggest plain index/upsert semantics instead of create(true), and making the cache configurable via a cache spec like status.updater.cache.spec.

The statusUpdaterBolt just creates the entries if they don't exist. Later on these will get updated either directly by the users or by other components of the topology. We probably don't want to overwrite the entries here.

3. Stream declared in core, but only the OpenSearch bolt emits

AbstractStatusUpdaterBolt.declareOutputFields now declares the queue stream for all backends (SQL, Solr, urlfrontier, opensearch-java, ...), but only the OpenSearch client StatusUpdaterBolt emits on it. Wiring streamId: queue against any other backend validates and runs, but delivers nothing, with no warning. This diverges from the DELETION_STREAM_NAME pattern, where the abstract base both declares and emits so all backends behave identically. Since the partition key comes from core's own URLPartitioner, nothing OpenSearch-specific prevents emitting from the base class. Alternatively, the declaration could move into the OpenSearch subclass for now. Was the plan to add emits to the other backends in follow-ups?

Other backends might adopt it in due course, let's leave it where it is

4. Full Metadata shipped on every status update but never read

With the FIELDS-grouped wiring from the archetype flux, the entire Metadata map is serialized and transferred per status update on the hottest stream of the topology, while QueueBolt only ever calls getStringByField("key"). Emitting just the key would remove that cost. Related: the emit happens before waitAck.addTuple / addToProcessor, so a status write that ultimately fails still leaves a phantom entry in the queues index.

The metadata will be used later on

5. Smaller things

  • QueueBolt imports org.joda.time.Instant; joda-time is not declared in external/opensearch/pom.xml and only comes in transitively via the OpenSearch server artifact. java.time.Instant.now().toString() is a drop-in replacement and matches what core and the spouts use.

yes good idea

  • The constant is named ESIndexNameParamName; siblings in this module use the OS prefix.

yes good idea

  • knownQueue is Cache<String, String> storing the docID, but the value is never read; presence-only would do.

sure, does it matter though?

  • Both queues.mapping files declare a metadata.* dynamic template, but the bolt only indexes key and lastUpdated, so it can never match.

The metadata will be added later on

  • QueueBoltTest.store() stubs getValueByField("metadata") and builds a Metadata object the bolt never reads.

Future proofing, mind your own business Claude, you can't understand

6. Migration path for existing crawls?

For a mature crawl with millions of URLs already in the status index, the queues index only fills as URLs pass through the status updater again. Quiescent hosts (far-future nextFetchDate, or hosts whose URLs are all in terminal states) may not show up for a long time or ever. Should we provide a backfill utility (e.g. a terms aggregation over the status index routing/hostname field bulk-loaded into queues), or at least document that the queue index is only complete for crawls started fresh?

This is a new functionality, let's get it work first and if existing crawls need it, tjen we can look at generating the entries from the status index for existing crawls

rzo1 added a commit that referenced this pull request Jul 2, 2026
Address review comments on #1974:
- use the BulkProcessor.Listener variant of getConnection and only
  cache keys once the bulk write succeeded, so failed writes are
  logged and retried when the key is next seen
- treat version conflicts (entry already exists, e.g. after a worker
  restart or cache eviction) as success, keeping create-only semantics
- make the known-queues cache configurable via
  opensearch.queues.cache.spec (default maximumSize=10000)
- replace org.joda.time.Instant with java.time.Instant
- rename ESIndexNameParamName to OSQueuesIndexNameParamName
- presence-only cache values

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
@rzo1

rzo1 commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

Pushed a commit addressing the points we agreed on: the bolt now uses the listener variant of getConnection and only caches keys after the bulk write succeeds (version conflicts after a restart or cache eviction are treated as "already known", keeping the create-only semantics). Also: joda replaced with java.time, constant renamed to the OS prefix, presence-only cache values, and the cache is now configurable via opensearch.queues.cache.spec (same idea as status.updater.cache.spec, default unchanged at maximumSize=10000). Added a test simulating a worker restart to cover the conflict path.

Address review comments on #1974:
- use the BulkProcessor.Listener variant of getConnection and only
  cache keys once the bulk write succeeded, so failed writes are
  logged and retried when the key is next seen
- treat version conflicts (entry already exists, e.g. after a worker
  restart or cache eviction) as success, keeping create-only semantics
- make the known-queues cache configurable via
  opensearch.queues.cache.spec (default maximumSize=10000)
- replace org.joda.time.Instant with java.time.Instant
- rename ESIndexNameParamName to OSQueuesIndexNameParamName
- presence-only cache values
@jnioche

jnioche commented Jul 2, 2026

Copy link
Copy Markdown
Contributor Author

thanks @rzo1 LGTM

@rzo1 rzo1 added this to the 3.6.1 milestone Jul 3, 2026

@dpol1 dpol1 left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jnioche

jnioche commented Jul 3, 2026

Copy link
Copy Markdown
Contributor Author

Thanks @rzo1 and @dpol1
since this is a new functionality, it warrants at least a minor release. I will create a new milestone for 3.7.0

@jnioche jnioche merged commit ebda177 into main Jul 3, 2026
3 of 4 checks passed
@jnioche jnioche deleted the 990 branch July 3, 2026 07:19
dpol1 added a commit that referenced this pull request Jul 3, 2026
Superseded by the generic queue stream (#1974): the Retry-After header
already reaches the status updater in the status metadata, so the
back-off signal no longer needs to be produced by the FetcherBolt.
FetcherBolt, Constants and crawler-default.yaml go back to their main
versions.
dpol1 added a commit that referenced this pull request Jul 3, 2026
…tatus updater

Mirrors the OpenSearch StatusUpdaterBolt behaviour introduced in #1974 so
that queue/host-aware bolts can be wired in URLFrontier topologies too.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants