Conversation
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
…e name in injection.flux Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
|
Note: Also needs to be ported to |
rzo1
left a comment
There was a problem hiding this comment.
Thanks Julien, I like the direction of decoupling the queue bookkeeping from URL storage, and the test coverage for the new bolt is nice. I have a few concerns about the reliability semantics of QueueBolt and the layering of the emit that I think are worth addressing before merging, plus a question about migration for existing crawls.
1. Failed writes are silently lost forever (QueueBolt)
execute() puts the key into knownQueue and acks before the async bulk request completes, and the bolt uses the 2-arg OpenSearchConnection.getConnection(conf, boltType), which installs the no-op BulkProcessor.Listener (empty afterBulk bodies). If OpenSearch is briefly unavailable or the bulk item is rejected when the first tuple for a key arrives, there is no log, no retry, and the cache suppresses every later attempt. That queue is then permanently missing from the index until the worker restarts. Could we use the listener variant of getConnection (as DeletionBolt / IndexerBolt do) and only populate the cache on bulk success?
2. create(true) freezes lastUpdated and produces guaranteed-failing requests
Since documents are create-only, lastUpdated never moves past first insertion. Any consumer that uses it to distinguish active from dead queues (one of the use cases in #990) will get wrong answers. In addition, after a worker restart, or eviction from the hardcoded 10k cache (a broad crawl partitioned by host easily exceeds that), every re-seen key is re-sent as a create that OpenSearch rejects with version_conflict_engine_exception, silently because of point 1. I'd suggest plain index/upsert semantics instead of create(true), and making the cache configurable via a cache spec like status.updater.cache.spec.
3. Stream declared in core, but only the OpenSearch bolt emits
AbstractStatusUpdaterBolt.declareOutputFields now declares the queue stream for all backends (SQL, Solr, urlfrontier, opensearch-java, ...), but only the OpenSearch client StatusUpdaterBolt emits on it. Wiring streamId: queue against any other backend validates and runs, but delivers nothing, with no warning. This diverges from the DELETION_STREAM_NAME pattern, where the abstract base both declares and emits so all backends behave identically. Since the partition key comes from core's own URLPartitioner, nothing OpenSearch-specific prevents emitting from the base class. Alternatively, the declaration could move into the OpenSearch subclass for now. Was the plan to add emits to the other backends in follow-ups?
4. Full Metadata shipped on every status update but never read
With the FIELDS-grouped wiring from the archetype flux, the entire Metadata map is serialized and transferred per status update on the hottest stream of the topology, while QueueBolt only ever calls getStringByField("key"). Emitting just the key would remove that cost. Related: the emit happens before waitAck.addTuple / addToProcessor, so a status write that ultimately fails still leaves a phantom entry in the queues index.
5. Smaller things
QueueBoltimportsorg.joda.time.Instant; joda-time is not declared inexternal/opensearch/pom.xmland only comes in transitively via the OpenSearch server artifact.java.time.Instant.now().toString()is a drop-in replacement and matches what core and the spouts use.- The constant is named
ESIndexNameParamName; siblings in this module use theOSprefix. knownQueueisCache<String, String>storing the docID, but the value is never read; presence-only would do.- Both
queues.mappingfiles declare ametadata.*dynamic template, but the bolt only indexeskeyandlastUpdated, so it can never match. QueueBoltTest.store()stubsgetValueByField("metadata")and builds aMetadataobject the bolt never reads.
6. Migration path for existing crawls?
For a mature crawl with millions of URLs already in the status index, the queues index only fills as URLs pass through the status updater again. Quiescent hosts (far-future nextFetchDate, or hosts whose URLs are all in terminal states) may not show up for a long time or ever. Should we provide a backfill utility (e.g. a terms aggregation over the status index routing/hostname field bulk-loaded into queues), or at least document that the queue index is only complete for crawls started fresh?
|
Thanks
yes absolutely
The statusUpdaterBolt just creates the entries if they don't exist. Later on these will get updated either directly by the users or by other components of the topology. We probably don't want to overwrite the entries here.
Other backends might adopt it in due course, let's leave it where it is
The metadata will be used later on
yes good idea
yes good idea
sure, does it matter though?
The metadata will be added later on
Future proofing, mind your own business Claude, you can't understand
This is a new functionality, let's get it work first and if existing crawls need it, tjen we can look at generating the entries from the status index for existing crawls |
Address review comments on #1974: - use the BulkProcessor.Listener variant of getConnection and only cache keys once the bulk write succeeded, so failed writes are logged and retried when the key is next seen - treat version conflicts (entry already exists, e.g. after a worker restart or cache eviction) as success, keeping create-only semantics - make the known-queues cache configurable via opensearch.queues.cache.spec (default maximumSize=10000) - replace org.joda.time.Instant with java.time.Instant - rename ESIndexNameParamName to OSQueuesIndexNameParamName - presence-only cache values Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
|
Pushed a commit addressing the points we agreed on: the bolt now uses the listener variant of |
Address review comments on #1974: - use the BulkProcessor.Listener variant of getConnection and only cache keys once the bulk write succeeded, so failed writes are logged and retried when the key is next seen - treat version conflicts (entry already exists, e.g. after a worker restart or cache eviction) as success, keeping create-only semantics - make the known-queues cache configurable via opensearch.queues.cache.spec (default maximumSize=10000) - replace org.joda.time.Instant with java.time.Instant - rename ESIndexNameParamName to OSQueuesIndexNameParamName - presence-only cache values
|
thanks @rzo1 LGTM |
Superseded by the generic queue stream (#1974): the Retry-After header already reaches the status updater in the status metadata, so the back-off signal no longer needs to be produced by the FetcherBolt. FetcherBolt, Constants and crawler-default.yaml go back to their main versions.
…tatus updater Mirrors the OpenSearch StatusUpdaterBolt behaviour introduced in #1974 so that queue/host-aware bolts can be wired in URLFrontier topologies too.
See discussion on #990
This PR introduces a
queuestream and uses it in the OpenSearch module with a new QueueBolt.This allows decoupling the logic about queues (whether based on hostnames, domain names or anything else) from the storage of the URLs.
See #1973 for related work