From d8a3021d72037388378d7b0b096924158bc5a089 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Wed, 1 Jul 2026 22:20:01 +0200 Subject: [PATCH 1/6] feat: emit Retry-After host back-off and block the URLFrontier queue (#867, #784) On an HTTP 429 or 503 response, FetcherBolt parses the Retry-After header (delta-seconds or IMF-fixdate), caps it via the new fetcher.max.retry.after setting (default 24h) and emits (key, blockUntil) on the new "hostinfo" stream. The key is derived with URLPartitioner from partition.url.mode so it matches the URLFrontier queue key, which may differ from the fetcher's internal queue mode. The new HostBlockBolt (external/urlfrontier) consumes the stream and calls blockQueueUntil, so the frontier stops handing out URLs for the host until the requested time. The fetch pipeline itself never delays or holds tuples. --- .../org/apache/stormcrawler/Constants.java | 2 + .../apache/stormcrawler/bolt/FetcherBolt.java | 131 ++++++++++++++ core/src/main/resources/crawler-default.yaml | 9 + .../bolt/FetcherBoltRetryAfterParserTest.java | 141 +++++++++++++++ .../stormcrawler/bolt/FetcherBoltTest.java | 126 ++++++++++++++ .../urlfrontier/HostBlockBolt.java | 117 +++++++++++++ .../urlfrontier/HostBlockBoltTest.java | 162 ++++++++++++++++++ 7 files changed, 688 insertions(+) create mode 100644 core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java create mode 100644 external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java create mode 100644 external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java diff --git a/core/src/main/java/org/apache/stormcrawler/Constants.java b/core/src/main/java/org/apache/stormcrawler/Constants.java index 7e5ef8639..411b7bb06 100644 --- a/core/src/main/java/org/apache/stormcrawler/Constants.java +++ b/core/src/main/java/org/apache/stormcrawler/Constants.java @@ -31,6 +31,8 @@ public class Constants { public static final String StatusStreamName = "status"; + public static final String HOST_INFO_STREAM_NAME = "hostinfo"; + public static final String DELETION_STREAM_NAME = "deletion"; public static final String AllowRedirParamName = "redirections.allowed"; diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index 2439bd76c..bde8eab43 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -24,7 +24,10 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; +import java.time.DateTimeException; import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -67,6 +70,7 @@ import org.apache.stormcrawler.protocol.ProtocolResponse; import org.apache.stormcrawler.protocol.RobotRules; import org.apache.stormcrawler.util.ConfUtils; +import org.apache.stormcrawler.util.URLPartitioner; import org.apache.stormcrawler.util.URLUtil; import org.slf4j.LoggerFactory; @@ -94,6 +98,38 @@ public class FetcherBolt extends StatusEmitterBolt { */ public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout"; + /** + * Maximum delay in seconds honoured when a server requests a back-off via the Retry-After + * HTTP response header. The parsed delay (capped to this value) is emitted on the {@link + * Constants#HOST_INFO_STREAM_NAME} stream as {@code blockUntil} for a host-aware consumer to + * act on; the fetcher itself does not delay the fetch. A value of {@code -1} disables the cap + * and emits the delay as-is; the default caps at 86400 (24h). + */ + public static final String MAX_RETRY_AFTER_PARAM_KEY = "fetcher.max.retry.after"; + + /** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */ + private static final String RETRY_AFTER_KEY = "retry-after"; + + /** + * Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015 + * 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code + * SimpleDateFormat}, which matters here as it is shared across fetcher threads. + */ + private static final DateTimeFormatter HTTP_DATE_FORMATTER = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + + /** Pre-compiled matcher for the numeric (delta-seconds) form of the Retry-After header. */ + private static final Pattern RETRY_AFTER_SECONDS = Pattern.compile("[0-9]+"); + + /** + * Upper bound on the length of a Retry-After header value we will attempt to parse. A + * delta-seconds value is at most 19 digits and an HTTP date ~29 chars; anything longer is + * rejected to bound the work done on this attacker-controlled value. + */ + private static final int MAX_RETRY_AFTER_VALUE_LENGTH = 64; + /** Key name of the custom crawl delay for a queue that may be present in the metadata. */ private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay"; @@ -116,6 +152,14 @@ public class FetcherBolt extends StatusEmitterBolt { private ProtocolFactory protocolFactory; + /** + * Derives the host-info back-off key the same way the frontier keys its queues (via {@code + * partition.url.mode}), so a {@code blockUntil} signal targets the right queue. Configured once + * and shared across fetcher threads; {@link URLPartitioner#getPartition} holds no mutable + * state. + */ + private URLPartitioner partitioner; + private int taskId = -1; boolean sitemapsAutoDiscovery = false; @@ -483,6 +527,49 @@ public synchronized FetchItem getFetchItem() { } } + /** + * Parses the value of a Retry-After + * HTTP response header into a delay expressed in milliseconds, relative to now. The value is + * either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed, 21 Oct 2015 + * 07:28:00 GMT}). + * + * @return the delay in milliseconds (possibly {@code 0}), or {@code -1} if the header is + * absent, malformed, in the past or implausibly long. + */ + static long parseRetryAfterDelay(String retryAfter) { + if (StringUtils.isBlank(retryAfter)) { + return -1; + } + retryAfter = retryAfter.trim(); + // a valid Retry-After is either a delta-seconds value or an HTTP date; + // reject anything implausibly long to bound the work done on this + // attacker-controlled header value + if (retryAfter.length() > MAX_RETRY_AFTER_VALUE_LENGTH) { + return -1; + } + // delay expressed as a number of seconds + if (RETRY_AFTER_SECONDS.matcher(retryAfter).matches()) { + try { + return Math.multiplyExact(Long.parseLong(retryAfter), 1000L); + } catch (NumberFormatException | ArithmeticException e) { + // value too large to fit in a long, or its conversion to ms + // overflows - ignore + return -1; + } + } + // delay expressed as an HTTP date (IMF-fixdate form only, as produced by + // virtually all servers; the legacy RFC 850 / asctime forms are not accepted) + try { + Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter)); + long delay = date.toEpochMilli() - System.currentTimeMillis(); + return delay > 0 ? delay : -1; + } catch (DateTimeException e) { + LOG.debug("Invalid Retry-After header value: {}", retryAfter); + return -1; + } + } + /** This class picks items from queues and fetches the pages. */ private class FetcherThread extends Thread { @@ -501,6 +588,9 @@ private class FetcherThread extends Thread { /** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */ private long fetchTimeout = -1; + /** Upper bound in ms for the recorded Retry-After delay; -1 means no cap. */ + private long maxRetryAfter = -1; + /** * Single-thread executor used to run the protocol call so that it can be interrupted via * {@link Future#cancel(boolean)} when the bolt-level timeout fires. @@ -521,6 +611,8 @@ public FetcherThread(Config conf, int num) { this.threadNum = num; timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues); fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout); + long maxRetryAfterSecs = ConfUtils.getLong(conf, MAX_RETRY_AFTER_PARAM_KEY, -1L); + maxRetryAfter = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L; protocolMetadataPrefix = ConfUtils.getString( conf, @@ -801,6 +893,41 @@ public void run() { mergedMetadata.setValue("fetch.timeInQueues", Long.toString(timeInQueues)); + // report a server-requested back-off (Retry-After, e.g. on a + // 429 or 503) on the host-info stream so a host-aware consumer + // can block the host. The fetch itself is not delayed here. + // See #867 / #784. + // only on a rate-limit (429) or unavailable (503) response does + // Retry-After signal a host back-off worth acting on + int statusCode = response.getStatusCode(); + if (statusCode == 429 || statusCode == 503) { + long retryAfter = + parseRetryAfterDelay( + response.getMetadata().getFirstValue(RETRY_AFTER_KEY)); + if (retryAfter > 0 && maxRetryAfter >= 0 && retryAfter > maxRetryAfter) { + LOG.debug( + "Capping Retry-After for {} from {} to {} ms", + fit.url, + retryAfter, + maxRetryAfter); + retryAfter = maxRetryAfter; + } + if (retryAfter > 0) { + long blockUntilSecs = (System.currentTimeMillis() + retryAfter) / 1000L; + // key by the frontier's partition mode (partition.url.mode) + // so the consumer blocks the matching frontier queue. + // fit.queueId uses fetcher.queue.mode, which may differ. + String hostKey = partitioner.getPartition(fit.url, mergedMetadata); + if (hostKey == null) { + hostKey = fit.queueId; + } + collector.emit( + Constants.HOST_INFO_STREAM_NAME, + fit.tuple, + new Values(hostKey, blockUntilSecs)); + } + } + // determine the status based on the status code final Status status = Status.fromHTTPCode(response.getStatusCode()); @@ -962,6 +1089,9 @@ public void prepare( this.fetchQueues = new FetchItemQueues(conf); + this.partitioner = new URLPartitioner(); + this.partitioner.configure(conf); + this.taskId = context.getThisTaskId(); int threadCount = ConfUtils.getInt(conf, "fetcher.threads.number", 10); @@ -1008,6 +1138,7 @@ public void prepare( public void declareOutputFields(OutputFieldsDeclarer declarer) { super.declareOutputFields(declarer); declarer.declare(new Fields("url", "content", "metadata")); + declarer.declareStream(Constants.HOST_INFO_STREAM_NAME, new Fields("key", "blockUntil")); } @Override diff --git a/core/src/main/resources/crawler-default.yaml b/core/src/main/resources/crawler-default.yaml index d45cbc6b6..44e567f83 100644 --- a/core/src/main/resources/crawler-default.yaml +++ b/core/src/main/resources/crawler-default.yaml @@ -52,6 +52,15 @@ config: # use the delay specified in the robots.txt fetcher.server.delay.force: false + # max. delay in seconds honoured when a server requests a back-off via the + # Retry-After HTTP response header (e.g. on a 429 or 503). The parsed delay is + # emitted on the "hostinfo" stream (key, blockUntil-in-epoch-seconds) for a + # host-aware consumer to act on - the fetch itself is not delayed here. + # (-1) honour the delay requested by the server as-is, with no upper bound + # (>=0) cap the delay to this many seconds. The default of 24h bounds a + # misbehaving or hostile server from pinning a host queue for too long. + fetcher.max.retry.after: 86400 + # time bucket to use for the metrics sent by the Fetcher fetcher.metrics.time.bucket.secs: 10 diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java new file mode 100644 index 000000000..ffc674e98 --- /dev/null +++ b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.bolt; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Random; +import org.junit.jupiter.api.Test; + +/** Unit and randomised (fuzz) coverage for {@link FetcherBolt#parseRetryAfterDelay(String)}. */ +class FetcherBoltRetryAfterParserTest { + + private static final DateTimeFormatter HTTP_DATE = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + + @Test + void parseRetryAfterSeconds() { + assertEquals(120_000L, FetcherBolt.parseRetryAfterDelay("120")); + // surrounding whitespace is tolerated + assertEquals(120_000L, FetcherBolt.parseRetryAfterDelay(" 120 ")); + // zero is a valid (if pointless) delay and is not treated as "absent" + assertEquals(0L, FetcherBolt.parseRetryAfterDelay("0")); + } + + @Test + void parseRetryAfterFutureDate() { + Instant future = Instant.now().plusSeconds(300); + long delay = FetcherBolt.parseRetryAfterDelay(HTTP_DATE.format(future)); + // allow a small tolerance for the time elapsed during the call + assertTrue(delay > 250_000 && delay <= 300_000, "delay was " + delay); + } + + @Test + void parseRetryAfterPastDateReturnsMinusOne() { + Instant past = Instant.now().minusSeconds(300); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(HTTP_DATE.format(past))); + } + + @Test + void parseRetryAfterBlankOrMalformedReturnsMinusOne() { + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(null)); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("")); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(" ")); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("not-a-date")); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("12.5")); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("-5")); + } + + @Test + void parseRetryAfterOverlongValueReturnsMinusOne() { + // an implausibly long header value is rejected without further parsing + String tooLong = "1".repeat(65); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(tooLong)); + } + + @Test + void parseRetryAfterParseOverflowReturnsMinusOne() { + // too large to fit in a long at all + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("999999999999999999999999999")); + } + + @Test + void parseRetryAfterMillisecondOverflowReturnsMinusOne() { + // parses as a valid long, but * 1000 overflows a long (regression: was a + // silent overflow yielding a negative delay) + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("3687950425865536959")); + assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(Long.toString(Long.MAX_VALUE))); + } + + @Test + void parseRetryAfterRandomSecondsRoundTrip() { + Random rnd = new Random(7); + for (int i = 0; i < 1_000; i++) { + long secs = rnd.nextInt(1_000_000); + assertEquals(secs * 1000L, FetcherBolt.parseRetryAfterDelay(Long.toString(secs))); + } + } + + @Test + void parseRetryAfterNeverThrowsForRandomInput() { + Random rnd = new Random(42); + for (int i = 0; i < 10_000; i++) { + String input = randomRetryAfterValue(rnd); + long result = FetcherBolt.parseRetryAfterDelay(input); + // contract: never throws, never returns anything below -1 + assertTrue(result >= -1L, "value " + result + " for input <" + input + ">"); + } + } + + /** + * Produces a wide spectrum of inputs: digit runs (including values that overflow a long or + * overflow when converted to ms), real HTTP dates (past and future), blanks, random printable + * garbage and nulls. + */ + private static String randomRetryAfterValue(Random rnd) { + switch (rnd.nextInt(5)) { + case 0: + int len = 1 + rnd.nextInt(40); + StringBuilder digits = new StringBuilder(); + for (int i = 0; i < len; i++) { + digits.append((char) ('0' + rnd.nextInt(10))); + } + return digits.toString(); + case 1: + long offset = (rnd.nextBoolean() ? 1L : -1L) * rnd.nextInt(1_000_000); + return HTTP_DATE.format(Instant.now().plusSeconds(offset)); + case 2: + return ""; + case 3: + int glen = rnd.nextInt(30); + StringBuilder garbage = new StringBuilder(); + for (int i = 0; i < glen; i++) { + garbage.append((char) (32 + rnd.nextInt(95))); + } + return garbage.toString(); + default: + return null; + } + } +} diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java index 8019bcf34..69ea6b957 100644 --- a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java +++ b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java @@ -17,7 +17,31 @@ package org.apache.stormcrawler.bolt; +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; +import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Constants; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.util.URLPartitioner; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; public class FetcherBoltTest extends AbstractFetcherBoltTest { @@ -25,4 +49,106 @@ public class FetcherBoltTest extends AbstractFetcherBoltTest { void setUpContext() throws Exception { bolt = new FetcherBolt(); } + + // --- a rate-limited server emits a host back-off signal on the host-info stream --- + + @Test + void retryAfterEmitsHostInfo(WireMockRuntimeInfo wmRuntimeInfo) { + stubFor( + get(urlMatching(".+")) + .willReturn(aResponse().withStatus(429).withHeader("Retry-After", "120"))); + + TestOutputCollector output = runFetch(wmRuntimeInfo, new HashMap<>()); + + List> hostInfo = output.getEmitted(Constants.HOST_INFO_STREAM_NAME); + assertEquals(1, hostInfo.size()); + // key is the lower-cased host of the fetched URL + assertEquals("localhost", hostInfo.get(0).get(0)); + long blockUntil = (Long) hostInfo.get(0).get(1); + long nowSecs = System.currentTimeMillis() / 1000L; + // ~120s in the future, with a few seconds of slack + assertTrue( + blockUntil >= nowSecs + 110 && blockUntil <= nowSecs + 125, + "blockUntil was " + blockUntil + " now " + nowSecs); + } + + @Test + void retryAfterKeyUsesFrontierPartitionMode(WireMockRuntimeInfo wmRuntimeInfo) { + stubFor( + get(urlMatching(".+")) + .willReturn(aResponse().withStatus(429).withHeader("Retry-After", "120"))); + // the frontier partitions its queues by partition.url.mode; the emitted + // host-info key must match that queue key so blockQueueUntil hits the + // right queue - not the fetcher's own queue mode (fetcher.queue.mode, + // byHost by default). Set the two to different modes to prove it. + Map extra = new HashMap<>(); + extra.put(Constants.PARTITION_MODEParamName, "byIP"); + + TestOutputCollector output = runFetch(wmRuntimeInfo, extra); + + List> hostInfo = output.getEmitted(Constants.HOST_INFO_STREAM_NAME); + assertEquals(1, hostInfo.size()); + String url = "http://localhost:" + wmRuntimeInfo.getHttpPort() + "/"; + // expected key is what the frontier used to create the queue: the + // partition key under partition.url.mode (here the resolved IP) + String expectedKey = URLPartitioner.getPartition(url, new Metadata(), "byIP"); + assertEquals(expectedKey, hostInfo.get(0).get(0)); + } + + @Test + void retryAfterCappedByConfig(WireMockRuntimeInfo wmRuntimeInfo) { + stubFor( + get(urlMatching(".+")) + .willReturn(aResponse().withStatus(429).withHeader("Retry-After", "3600"))); + Map extra = new HashMap<>(); + extra.put("fetcher.max.retry.after", 60L); + + TestOutputCollector output = runFetch(wmRuntimeInfo, extra); + + List> hostInfo = output.getEmitted(Constants.HOST_INFO_STREAM_NAME); + assertEquals(1, hostInfo.size()); + long blockUntil = (Long) hostInfo.get(0).get(1); + long nowSecs = System.currentTimeMillis() / 1000L; + // capped to 60s, not 3600 + assertTrue(blockUntil <= nowSecs + 65, "blockUntil was " + blockUntil); + } + + @Test + void noHostInfoWhenHeaderAbsent(WireMockRuntimeInfo wmRuntimeInfo) { + stubFor(get(urlMatching(".+")).willReturn(aResponse().withStatus(429))); + + TestOutputCollector output = runFetch(wmRuntimeInfo, new HashMap<>()); + + assertTrue(output.getEmitted(Constants.HOST_INFO_STREAM_NAME).isEmpty()); + } + + /** Fetches a single URL against the WireMock server and returns the collector. */ + private TestOutputCollector runFetch( + WireMockRuntimeInfo wmRuntimeInfo, Map extraConfig) { + // allow robots.txt explicitly: the tests use a catch-all 429 stub, and a + // 429/error on robots.txt makes crawler-commons treat the host as + // "deny all", so the page would never be fetched. Higher priority than + // the catch-all so it always wins for /robots.txt. + stubFor( + get(urlPathEqualTo("/robots.txt")) + .atPriority(1) + .willReturn(aResponse().withStatus(404))); + + TestOutputCollector output = new TestOutputCollector(); + Map config = new HashMap<>(); + config.put("http.agent.name", "this_is_only_a_test"); + config.putAll(extraConfig); + bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); + + Tuple tuple = mock(Tuple.class); + when(tuple.getSourceComponent()).thenReturn("source"); + when(tuple.getStringByField("url")) + .thenReturn("http://localhost:" + wmRuntimeInfo.getHttpPort() + "/"); + when(tuple.getValueByField("metadata")).thenReturn(null); + bolt.execute(tuple); + + await().atMost(30, TimeUnit.SECONDS) + .until(() -> !output.getEmitted(Constants.StatusStreamName).isEmpty()); + return output; + } } diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java new file mode 100644 index 000000000..d882d8a1a --- /dev/null +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.urlfrontier; + +import crawlercommons.urlfrontier.CrawlID; +import crawlercommons.urlfrontier.URLFrontierGrpc; +import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierStub; +import crawlercommons.urlfrontier.Urlfrontier.BlockQueueParams; +import crawlercommons.urlfrontier.Urlfrontier.Empty; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.Map; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.util.ConfUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Consumes host back-off signals emitted by the {@code FetcherBolt} on the {@code hostinfo} stream + * and blocks the corresponding queue in URLFrontier until the requested time, via {@code + * blockQueueUntil}. See issues #867 and #784. Wire it with a fields grouping on {@code "key"} from + * the FetcherBolt's {@code hostinfo} stream. The {@code "key"} is the frontier queue key derived + * from {@code partition.url.mode} (the same setting the frontier uses to assign queues), so the + * block targets the matching queue. Connects via {@code urlfrontier.host} / {@code + * urlfrontier.port}; multi-node address resolution is not supported. + * + *

The block is fire-and-forget: a failed call is logged but the tuple is acked anyway. A missed + * block means the host is fetched once more and the next 429 re-emits the signal. + */ +public class HostBlockBolt extends BaseRichBolt { + + private static final Logger LOG = LoggerFactory.getLogger(HostBlockBolt.class); + + private static final StreamObserver NOOP_OBSERVER = + new StreamObserver<>() { + @Override + public void onNext(Empty value) {} + + @Override + public void onError(Throwable t) { + LOG.warn("blockQueueUntil failed", t); + } + + @Override + public void onCompleted() {} + }; + + private OutputCollector collector; + private ManagedChannel channel; + private URLFrontierStub frontier; + private String globalCrawlID; + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector c) { + this.collector = c; + this.globalCrawlID = + ConfUtils.getString(conf, Constants.URLFRONTIER_CRAWL_ID_KEY, CrawlID.DEFAULT); + // build channel + stub as in Spout.prepare (host/port only) + this.channel = + ManagedChannelUtil.createChannel( + ConfUtils.getString( + conf, + Constants.URLFRONTIER_HOST_KEY, + Constants.URLFRONTIER_DEFAULT_HOST), + ConfUtils.getInt( + conf, + Constants.URLFRONTIER_PORT_KEY, + Constants.URLFRONTIER_DEFAULT_PORT)); + this.frontier = URLFrontierGrpc.newStub(channel).withWaitForReady(); + } + + @Override + public void execute(Tuple t) { + String key = t.getStringByField("key"); + long blockUntil = t.getLongByField("blockUntil"); + BlockQueueParams params = + BlockQueueParams.newBuilder() + .setKey(key) + .setCrawlID(globalCrawlID) + .setTime(blockUntil) + .setLocal(false) + .build(); + frontier.blockQueueUntil(params, NOOP_OBSERVER); + collector.ack(t); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // terminal bolt + } + + @Override + public void cleanup() { + if (channel != null) { + channel.shutdown(); + } + } +} diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java new file mode 100644 index 000000000..a3e87d16c --- /dev/null +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.urlfrontier; + +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import crawlercommons.urlfrontier.CrawlID; +import crawlercommons.urlfrontier.URLFrontierGrpc; +import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierBlockingStub; +import crawlercommons.urlfrontier.Urlfrontier.BlockQueueParams; +import crawlercommons.urlfrontier.Urlfrontier.GetParams; +import crawlercommons.urlfrontier.Urlfrontier.URLInfo; +import io.grpc.ManagedChannel; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.Status; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers(disabledWithoutDocker = true) +class HostBlockBoltTest { + + private static final String HOST = "example.com"; + + private URLFrontierContainer container; + private ManagedChannel channel; + private URLFrontierBlockingStub blocking; + + @BeforeEach + void before() { + String image = "crawlercommons/url-frontier"; + String version = System.getProperty("urlfrontier-version"); + if (version != null) { + image += ":" + version; + } + container = new URLFrontierContainer(image); + container.start(); + var connection = container.getFrontierConnection(); + channel = ManagedChannelUtil.createChannel(connection.getHost(), connection.getPort()); + blocking = URLFrontierGrpc.newBlockingStub(channel); + } + + @AfterEach + void after() { + if (channel != null) { + channel.shutdownNow(); + } + container.close(); + } + + private Map frontierConfig() { + var connection = container.getFrontierConnection(); + Map config = new HashMap<>(); + config.put(Constants.URLFRONTIER_HOST_KEY, connection.getHost()); + config.put(Constants.URLFRONTIER_PORT_KEY, connection.getPort()); + return config; + } + + /** Seeds one DISCOVERED url for {@link #HOST} so the queue exists and is fetchable. */ + private void seedOneUrl() { + StatusUpdaterBolt seeder = new StatusUpdaterBolt(); + TestOutputCollector out = new TestOutputCollector(); + Map config = frontierConfig(); + config.put( + "urlbuffer.class", "org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer"); + config.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler"); + config.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h"); + config.put("urlfrontier.updater.max.messages", 1); + config.put("urlfrontier.cache.expireafter.sec", 10); + seeder.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(out)); + + String url = "http://" + HOST + "/"; + Tuple tuple = mock(Tuple.class); + when(tuple.getValueByField("status")).thenReturn(Status.DISCOVERED); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getValueByField("metadata")).thenReturn(new Metadata()); + seeder.execute(tuple); + + await().atMost(20, TimeUnit.SECONDS) + .until( + () -> + out.getAckedTuples().stream() + .anyMatch(t -> url.equals(t.getStringByField("url")))); + seeder.cleanup(); + } + + /** Returns the set of queue keys currently handed out by getURLs. */ + private Set keysFromGetURLs() { + GetParams params = + GetParams.newBuilder() + .setCrawlID(CrawlID.DEFAULT) + .setMaxQueues(10) + .setMaxUrlsPerQueue(10) + .build(); + Set keys = new HashSet<>(); + Iterator it = blocking.getURLs(params); + while (it.hasNext()) { + keys.add(it.next().getKey()); + } + return keys; + } + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + void blocksHostQueueUntilTime() { + seedOneUrl(); + + long nowSecs = System.currentTimeMillis() / 1000L; + + // block the host for an hour via the bolt under test + HostBlockBolt bolt = new HostBlockBolt(); + bolt.prepare( + frontierConfig(), TestUtil.getMockedTopologyContext(), mock(OutputCollector.class)); + Tuple t = mock(Tuple.class); + when(t.getStringByField("key")).thenReturn(HOST); + when(t.getLongByField("blockUntil")).thenReturn(nowSecs + 3600); + bolt.execute(t); + + // while blocked, getURLs must not return the host + await().atMost(15, TimeUnit.SECONDS).until(() -> !keysFromGetURLs().contains(HOST)); + + // unblock (a past time releases the queue) and the host becomes available again + blocking.blockQueueUntil( + BlockQueueParams.newBuilder() + .setKey(HOST) + .setCrawlID(CrawlID.DEFAULT) + .setTime(nowSecs - 1) + .build()); + await().atMost(15, TimeUnit.SECONDS).until(() -> keysFromGetURLs().contains(HOST)); + + bolt.cleanup(); + } +} From 7dd94f43e4e600319ee7637da87803b60f5b06c5 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 3 Jul 2026 13:50:56 +0200 Subject: [PATCH 2/6] refactor: extract Retry-After parsing into RetryAfterParser util Moves the parsing logic out of FetcherBolt so that any bolt can reuse it; behaviour and test coverage unchanged. --- .../stormcrawler/util/RetryAfterParser.java | 101 +++++++++++++ .../util/RetryAfterParserTest.java | 141 ++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java create mode 100644 core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java diff --git a/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java new file mode 100644 index 000000000..c51fcb1e3 --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.util; + +import java.time.DateTimeException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parses the value of a Retry-After + * HTTP response header, as sent by servers alongside a 429 (Too Many Requests) or 503 (Service + * Unavailable) response to request a back-off. + */ +public final class RetryAfterParser { + + private static final Logger LOG = LoggerFactory.getLogger(RetryAfterParser.class); + + /** + * Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015 + * 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code + * SimpleDateFormat}, which matters as this is shared across threads. + */ + private static final DateTimeFormatter HTTP_DATE_FORMATTER = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + + /** Pre-compiled matcher for the numeric (delta-seconds) form of the Retry-After header. */ + private static final Pattern RETRY_AFTER_SECONDS = Pattern.compile("[0-9]+"); + + /** + * Upper bound on the length of a Retry-After header value we will attempt to parse. A + * delta-seconds value is at most 19 digits and an HTTP date ~29 chars; anything longer is + * rejected to bound the work done on this attacker-controlled value. + */ + private static final int MAX_RETRY_AFTER_VALUE_LENGTH = 64; + + private RetryAfterParser() {} + + /** + * Parses a Retry-After header value into a delay expressed in milliseconds, relative to now. + * The value is either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed, + * 21 Oct 2015 07:28:00 GMT}). + * + * @return the delay in milliseconds (possibly {@code 0}), or {@code -1} if the header is + * absent, malformed, in the past or implausibly long. + */ + public static long parseDelay(String retryAfter) { + if (StringUtils.isBlank(retryAfter)) { + return -1; + } + retryAfter = retryAfter.trim(); + // a valid Retry-After is either a delta-seconds value or an HTTP date; + // reject anything implausibly long to bound the work done on this + // attacker-controlled header value + if (retryAfter.length() > MAX_RETRY_AFTER_VALUE_LENGTH) { + return -1; + } + // delay expressed as a number of seconds + if (RETRY_AFTER_SECONDS.matcher(retryAfter).matches()) { + try { + return Math.multiplyExact(Long.parseLong(retryAfter), 1000L); + } catch (NumberFormatException | ArithmeticException e) { + // value too large to fit in a long, or its conversion to ms + // overflows - ignore + return -1; + } + } + // delay expressed as an HTTP date (IMF-fixdate form only, as produced by + // virtually all servers; the legacy RFC 850 / asctime forms are not accepted) + try { + Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter)); + long delay = date.toEpochMilli() - System.currentTimeMillis(); + return delay > 0 ? delay : -1; + } catch (DateTimeException e) { + LOG.debug("Invalid Retry-After header value: {}", retryAfter); + return -1; + } + } +} diff --git a/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java b/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java new file mode 100644 index 000000000..726feb40e --- /dev/null +++ b/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Random; +import org.junit.jupiter.api.Test; + +/** Unit and randomised (fuzz) coverage for {@link RetryAfterParser#parseDelay(String)}. */ +class RetryAfterParserTest { + + private static final DateTimeFormatter HTTP_DATE = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + + @Test + void parseRetryAfterSeconds() { + assertEquals(120_000L, RetryAfterParser.parseDelay("120")); + // surrounding whitespace is tolerated + assertEquals(120_000L, RetryAfterParser.parseDelay(" 120 ")); + // zero is a valid (if pointless) delay and is not treated as "absent" + assertEquals(0L, RetryAfterParser.parseDelay("0")); + } + + @Test + void parseRetryAfterFutureDate() { + Instant future = Instant.now().plusSeconds(300); + long delay = RetryAfterParser.parseDelay(HTTP_DATE.format(future)); + // allow a small tolerance for the time elapsed during the call + assertTrue(delay > 250_000 && delay <= 300_000, "delay was " + delay); + } + + @Test + void parseRetryAfterPastDateReturnsMinusOne() { + Instant past = Instant.now().minusSeconds(300); + assertEquals(-1L, RetryAfterParser.parseDelay(HTTP_DATE.format(past))); + } + + @Test + void parseRetryAfterBlankOrMalformedReturnsMinusOne() { + assertEquals(-1L, RetryAfterParser.parseDelay(null)); + assertEquals(-1L, RetryAfterParser.parseDelay("")); + assertEquals(-1L, RetryAfterParser.parseDelay(" ")); + assertEquals(-1L, RetryAfterParser.parseDelay("not-a-date")); + assertEquals(-1L, RetryAfterParser.parseDelay("12.5")); + assertEquals(-1L, RetryAfterParser.parseDelay("-5")); + } + + @Test + void parseRetryAfterOverlongValueReturnsMinusOne() { + // an implausibly long header value is rejected without further parsing + String tooLong = "1".repeat(65); + assertEquals(-1L, RetryAfterParser.parseDelay(tooLong)); + } + + @Test + void parseRetryAfterParseOverflowReturnsMinusOne() { + // too large to fit in a long at all + assertEquals(-1L, RetryAfterParser.parseDelay("999999999999999999999999999")); + } + + @Test + void parseRetryAfterMillisecondOverflowReturnsMinusOne() { + // parses as a valid long, but * 1000 overflows a long (regression: was a + // silent overflow yielding a negative delay) + assertEquals(-1L, RetryAfterParser.parseDelay("3687950425865536959")); + assertEquals(-1L, RetryAfterParser.parseDelay(Long.toString(Long.MAX_VALUE))); + } + + @Test + void parseRetryAfterRandomSecondsRoundTrip() { + Random rnd = new Random(7); + for (int i = 0; i < 1_000; i++) { + long secs = rnd.nextInt(1_000_000); + assertEquals(secs * 1000L, RetryAfterParser.parseDelay(Long.toString(secs))); + } + } + + @Test + void parseRetryAfterNeverThrowsForRandomInput() { + Random rnd = new Random(42); + for (int i = 0; i < 10_000; i++) { + String input = randomRetryAfterValue(rnd); + long result = RetryAfterParser.parseDelay(input); + // contract: never throws, never returns anything below -1 + assertTrue(result >= -1L, "value " + result + " for input <" + input + ">"); + } + } + + /** + * Produces a wide spectrum of inputs: digit runs (including values that overflow a long or + * overflow when converted to ms), real HTTP dates (past and future), blanks, random printable + * garbage and nulls. + */ + private static String randomRetryAfterValue(Random rnd) { + switch (rnd.nextInt(5)) { + case 0: + int len = 1 + rnd.nextInt(40); + StringBuilder digits = new StringBuilder(); + for (int i = 0; i < len; i++) { + digits.append((char) ('0' + rnd.nextInt(10))); + } + return digits.toString(); + case 1: + long offset = (rnd.nextBoolean() ? 1L : -1L) * rnd.nextInt(1_000_000); + return HTTP_DATE.format(Instant.now().plusSeconds(offset)); + case 2: + return ""; + case 3: + int glen = rnd.nextInt(30); + StringBuilder garbage = new StringBuilder(); + for (int i = 0; i < glen; i++) { + garbage.append((char) (32 + rnd.nextInt(95))); + } + return garbage.toString(); + default: + return null; + } + } +} From d3e33b8fd9c2eebccc3a32b4bd92634cb5ef4851 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 3 Jul 2026 13:42:16 +0200 Subject: [PATCH 3/6] refactor: drop the hostinfo stream and FetcherBolt Retry-After emission Superseded by the generic queue stream (#1974): the Retry-After header already reaches the status updater in the status metadata, so the back-off signal no longer needs to be produced by the FetcherBolt. FetcherBolt, Constants and crawler-default.yaml go back to their main versions. --- .../org/apache/stormcrawler/Constants.java | 2 - .../apache/stormcrawler/bolt/FetcherBolt.java | 131 ---------------- core/src/main/resources/crawler-default.yaml | 9 -- .../bolt/FetcherBoltRetryAfterParserTest.java | 141 ------------------ .../stormcrawler/bolt/FetcherBoltTest.java | 126 ---------------- 5 files changed, 409 deletions(-) delete mode 100644 core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java diff --git a/core/src/main/java/org/apache/stormcrawler/Constants.java b/core/src/main/java/org/apache/stormcrawler/Constants.java index 069aee252..e06552a6a 100644 --- a/core/src/main/java/org/apache/stormcrawler/Constants.java +++ b/core/src/main/java/org/apache/stormcrawler/Constants.java @@ -33,8 +33,6 @@ public class Constants { public static final String StatusStreamName = "status"; - public static final String HOST_INFO_STREAM_NAME = "hostinfo"; - public static final String DELETION_STREAM_NAME = "deletion"; public static final String AllowRedirParamName = "redirections.allowed"; diff --git a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java index bde8eab43..2439bd76c 100644 --- a/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/org/apache/stormcrawler/bolt/FetcherBolt.java @@ -24,10 +24,7 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; -import java.time.DateTimeException; import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -70,7 +67,6 @@ import org.apache.stormcrawler.protocol.ProtocolResponse; import org.apache.stormcrawler.protocol.RobotRules; import org.apache.stormcrawler.util.ConfUtils; -import org.apache.stormcrawler.util.URLPartitioner; import org.apache.stormcrawler.util.URLUtil; import org.slf4j.LoggerFactory; @@ -98,38 +94,6 @@ public class FetcherBolt extends StatusEmitterBolt { */ public static final String FETCH_TIMEOUT_PARAM_KEY = "fetcher.thread.timeout"; - /** - * Maximum delay in seconds honoured when a server requests a back-off via the Retry-After - * HTTP response header. The parsed delay (capped to this value) is emitted on the {@link - * Constants#HOST_INFO_STREAM_NAME} stream as {@code blockUntil} for a host-aware consumer to - * act on; the fetcher itself does not delay the fetch. A value of {@code -1} disables the cap - * and emits the delay as-is; the default caps at 86400 (24h). - */ - public static final String MAX_RETRY_AFTER_PARAM_KEY = "fetcher.max.retry.after"; - - /** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */ - private static final String RETRY_AFTER_KEY = "retry-after"; - - /** - * Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015 - * 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code - * SimpleDateFormat}, which matters here as it is shared across fetcher threads. - */ - private static final DateTimeFormatter HTTP_DATE_FORMATTER = - DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) - .withZone(ZoneOffset.UTC); - - /** Pre-compiled matcher for the numeric (delta-seconds) form of the Retry-After header. */ - private static final Pattern RETRY_AFTER_SECONDS = Pattern.compile("[0-9]+"); - - /** - * Upper bound on the length of a Retry-After header value we will attempt to parse. A - * delta-seconds value is at most 19 digits and an HTTP date ~29 chars; anything longer is - * rejected to bound the work done on this attacker-controlled value. - */ - private static final int MAX_RETRY_AFTER_VALUE_LENGTH = 64; - /** Key name of the custom crawl delay for a queue that may be present in the metadata. */ private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay"; @@ -152,14 +116,6 @@ public class FetcherBolt extends StatusEmitterBolt { private ProtocolFactory protocolFactory; - /** - * Derives the host-info back-off key the same way the frontier keys its queues (via {@code - * partition.url.mode}), so a {@code blockUntil} signal targets the right queue. Configured once - * and shared across fetcher threads; {@link URLPartitioner#getPartition} holds no mutable - * state. - */ - private URLPartitioner partitioner; - private int taskId = -1; boolean sitemapsAutoDiscovery = false; @@ -527,49 +483,6 @@ public synchronized FetchItem getFetchItem() { } } - /** - * Parses the value of a Retry-After - * HTTP response header into a delay expressed in milliseconds, relative to now. The value is - * either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed, 21 Oct 2015 - * 07:28:00 GMT}). - * - * @return the delay in milliseconds (possibly {@code 0}), or {@code -1} if the header is - * absent, malformed, in the past or implausibly long. - */ - static long parseRetryAfterDelay(String retryAfter) { - if (StringUtils.isBlank(retryAfter)) { - return -1; - } - retryAfter = retryAfter.trim(); - // a valid Retry-After is either a delta-seconds value or an HTTP date; - // reject anything implausibly long to bound the work done on this - // attacker-controlled header value - if (retryAfter.length() > MAX_RETRY_AFTER_VALUE_LENGTH) { - return -1; - } - // delay expressed as a number of seconds - if (RETRY_AFTER_SECONDS.matcher(retryAfter).matches()) { - try { - return Math.multiplyExact(Long.parseLong(retryAfter), 1000L); - } catch (NumberFormatException | ArithmeticException e) { - // value too large to fit in a long, or its conversion to ms - // overflows - ignore - return -1; - } - } - // delay expressed as an HTTP date (IMF-fixdate form only, as produced by - // virtually all servers; the legacy RFC 850 / asctime forms are not accepted) - try { - Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter)); - long delay = date.toEpochMilli() - System.currentTimeMillis(); - return delay > 0 ? delay : -1; - } catch (DateTimeException e) { - LOG.debug("Invalid Retry-After header value: {}", retryAfter); - return -1; - } - } - /** This class picks items from queues and fetches the pages. */ private class FetcherThread extends Thread { @@ -588,9 +501,6 @@ private class FetcherThread extends Thread { /** Hard timeout in seconds for a single protocol fetch. -1 means disabled. */ private long fetchTimeout = -1; - /** Upper bound in ms for the recorded Retry-After delay; -1 means no cap. */ - private long maxRetryAfter = -1; - /** * Single-thread executor used to run the protocol call so that it can be interrupted via * {@link Future#cancel(boolean)} when the bolt-level timeout fires. @@ -611,8 +521,6 @@ public FetcherThread(Config conf, int num) { this.threadNum = num; timeoutInQueues = ConfUtils.getLong(conf, QUEUED_TIMEOUT_PARAM_KEY, timeoutInQueues); fetchTimeout = ConfUtils.getLong(conf, FETCH_TIMEOUT_PARAM_KEY, fetchTimeout); - long maxRetryAfterSecs = ConfUtils.getLong(conf, MAX_RETRY_AFTER_PARAM_KEY, -1L); - maxRetryAfter = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L; protocolMetadataPrefix = ConfUtils.getString( conf, @@ -893,41 +801,6 @@ public void run() { mergedMetadata.setValue("fetch.timeInQueues", Long.toString(timeInQueues)); - // report a server-requested back-off (Retry-After, e.g. on a - // 429 or 503) on the host-info stream so a host-aware consumer - // can block the host. The fetch itself is not delayed here. - // See #867 / #784. - // only on a rate-limit (429) or unavailable (503) response does - // Retry-After signal a host back-off worth acting on - int statusCode = response.getStatusCode(); - if (statusCode == 429 || statusCode == 503) { - long retryAfter = - parseRetryAfterDelay( - response.getMetadata().getFirstValue(RETRY_AFTER_KEY)); - if (retryAfter > 0 && maxRetryAfter >= 0 && retryAfter > maxRetryAfter) { - LOG.debug( - "Capping Retry-After for {} from {} to {} ms", - fit.url, - retryAfter, - maxRetryAfter); - retryAfter = maxRetryAfter; - } - if (retryAfter > 0) { - long blockUntilSecs = (System.currentTimeMillis() + retryAfter) / 1000L; - // key by the frontier's partition mode (partition.url.mode) - // so the consumer blocks the matching frontier queue. - // fit.queueId uses fetcher.queue.mode, which may differ. - String hostKey = partitioner.getPartition(fit.url, mergedMetadata); - if (hostKey == null) { - hostKey = fit.queueId; - } - collector.emit( - Constants.HOST_INFO_STREAM_NAME, - fit.tuple, - new Values(hostKey, blockUntilSecs)); - } - } - // determine the status based on the status code final Status status = Status.fromHTTPCode(response.getStatusCode()); @@ -1089,9 +962,6 @@ public void prepare( this.fetchQueues = new FetchItemQueues(conf); - this.partitioner = new URLPartitioner(); - this.partitioner.configure(conf); - this.taskId = context.getThisTaskId(); int threadCount = ConfUtils.getInt(conf, "fetcher.threads.number", 10); @@ -1138,7 +1008,6 @@ public void prepare( public void declareOutputFields(OutputFieldsDeclarer declarer) { super.declareOutputFields(declarer); declarer.declare(new Fields("url", "content", "metadata")); - declarer.declareStream(Constants.HOST_INFO_STREAM_NAME, new Fields("key", "blockUntil")); } @Override diff --git a/core/src/main/resources/crawler-default.yaml b/core/src/main/resources/crawler-default.yaml index 44e567f83..d45cbc6b6 100644 --- a/core/src/main/resources/crawler-default.yaml +++ b/core/src/main/resources/crawler-default.yaml @@ -52,15 +52,6 @@ config: # use the delay specified in the robots.txt fetcher.server.delay.force: false - # max. delay in seconds honoured when a server requests a back-off via the - # Retry-After HTTP response header (e.g. on a 429 or 503). The parsed delay is - # emitted on the "hostinfo" stream (key, blockUntil-in-epoch-seconds) for a - # host-aware consumer to act on - the fetch itself is not delayed here. - # (-1) honour the delay requested by the server as-is, with no upper bound - # (>=0) cap the delay to this many seconds. The default of 24h bounds a - # misbehaving or hostile server from pinning a host queue for too long. - fetcher.max.retry.after: 86400 - # time bucket to use for the metrics sent by the Fetcher fetcher.metrics.time.bucket.secs: 10 diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java deleted file mode 100644 index ffc674e98..000000000 --- a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltRetryAfterParserTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.stormcrawler.bolt; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; - -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; -import java.util.Locale; -import java.util.Random; -import org.junit.jupiter.api.Test; - -/** Unit and randomised (fuzz) coverage for {@link FetcherBolt#parseRetryAfterDelay(String)}. */ -class FetcherBoltRetryAfterParserTest { - - private static final DateTimeFormatter HTTP_DATE = - DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) - .withZone(ZoneOffset.UTC); - - @Test - void parseRetryAfterSeconds() { - assertEquals(120_000L, FetcherBolt.parseRetryAfterDelay("120")); - // surrounding whitespace is tolerated - assertEquals(120_000L, FetcherBolt.parseRetryAfterDelay(" 120 ")); - // zero is a valid (if pointless) delay and is not treated as "absent" - assertEquals(0L, FetcherBolt.parseRetryAfterDelay("0")); - } - - @Test - void parseRetryAfterFutureDate() { - Instant future = Instant.now().plusSeconds(300); - long delay = FetcherBolt.parseRetryAfterDelay(HTTP_DATE.format(future)); - // allow a small tolerance for the time elapsed during the call - assertTrue(delay > 250_000 && delay <= 300_000, "delay was " + delay); - } - - @Test - void parseRetryAfterPastDateReturnsMinusOne() { - Instant past = Instant.now().minusSeconds(300); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(HTTP_DATE.format(past))); - } - - @Test - void parseRetryAfterBlankOrMalformedReturnsMinusOne() { - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(null)); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("")); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(" ")); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("not-a-date")); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("12.5")); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("-5")); - } - - @Test - void parseRetryAfterOverlongValueReturnsMinusOne() { - // an implausibly long header value is rejected without further parsing - String tooLong = "1".repeat(65); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(tooLong)); - } - - @Test - void parseRetryAfterParseOverflowReturnsMinusOne() { - // too large to fit in a long at all - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("999999999999999999999999999")); - } - - @Test - void parseRetryAfterMillisecondOverflowReturnsMinusOne() { - // parses as a valid long, but * 1000 overflows a long (regression: was a - // silent overflow yielding a negative delay) - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay("3687950425865536959")); - assertEquals(-1L, FetcherBolt.parseRetryAfterDelay(Long.toString(Long.MAX_VALUE))); - } - - @Test - void parseRetryAfterRandomSecondsRoundTrip() { - Random rnd = new Random(7); - for (int i = 0; i < 1_000; i++) { - long secs = rnd.nextInt(1_000_000); - assertEquals(secs * 1000L, FetcherBolt.parseRetryAfterDelay(Long.toString(secs))); - } - } - - @Test - void parseRetryAfterNeverThrowsForRandomInput() { - Random rnd = new Random(42); - for (int i = 0; i < 10_000; i++) { - String input = randomRetryAfterValue(rnd); - long result = FetcherBolt.parseRetryAfterDelay(input); - // contract: never throws, never returns anything below -1 - assertTrue(result >= -1L, "value " + result + " for input <" + input + ">"); - } - } - - /** - * Produces a wide spectrum of inputs: digit runs (including values that overflow a long or - * overflow when converted to ms), real HTTP dates (past and future), blanks, random printable - * garbage and nulls. - */ - private static String randomRetryAfterValue(Random rnd) { - switch (rnd.nextInt(5)) { - case 0: - int len = 1 + rnd.nextInt(40); - StringBuilder digits = new StringBuilder(); - for (int i = 0; i < len; i++) { - digits.append((char) ('0' + rnd.nextInt(10))); - } - return digits.toString(); - case 1: - long offset = (rnd.nextBoolean() ? 1L : -1L) * rnd.nextInt(1_000_000); - return HTTP_DATE.format(Instant.now().plusSeconds(offset)); - case 2: - return ""; - case 3: - int glen = rnd.nextInt(30); - StringBuilder garbage = new StringBuilder(); - for (int i = 0; i < glen; i++) { - garbage.append((char) (32 + rnd.nextInt(95))); - } - return garbage.toString(); - default: - return null; - } - } -} diff --git a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java index 69ea6b957..8019bcf34 100644 --- a/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java +++ b/core/src/test/java/org/apache/stormcrawler/bolt/FetcherBoltTest.java @@ -17,31 +17,7 @@ package org.apache.stormcrawler.bolt; -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.get; -import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; -import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo; -import static org.awaitility.Awaitility.await; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import org.apache.storm.task.OutputCollector; -import org.apache.storm.tuple.Tuple; -import org.apache.stormcrawler.Constants; -import org.apache.stormcrawler.Metadata; -import org.apache.stormcrawler.TestOutputCollector; -import org.apache.stormcrawler.TestUtil; -import org.apache.stormcrawler.util.URLPartitioner; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; public class FetcherBoltTest extends AbstractFetcherBoltTest { @@ -49,106 +25,4 @@ public class FetcherBoltTest extends AbstractFetcherBoltTest { void setUpContext() throws Exception { bolt = new FetcherBolt(); } - - // --- a rate-limited server emits a host back-off signal on the host-info stream --- - - @Test - void retryAfterEmitsHostInfo(WireMockRuntimeInfo wmRuntimeInfo) { - stubFor( - get(urlMatching(".+")) - .willReturn(aResponse().withStatus(429).withHeader("Retry-After", "120"))); - - TestOutputCollector output = runFetch(wmRuntimeInfo, new HashMap<>()); - - List> hostInfo = output.getEmitted(Constants.HOST_INFO_STREAM_NAME); - assertEquals(1, hostInfo.size()); - // key is the lower-cased host of the fetched URL - assertEquals("localhost", hostInfo.get(0).get(0)); - long blockUntil = (Long) hostInfo.get(0).get(1); - long nowSecs = System.currentTimeMillis() / 1000L; - // ~120s in the future, with a few seconds of slack - assertTrue( - blockUntil >= nowSecs + 110 && blockUntil <= nowSecs + 125, - "blockUntil was " + blockUntil + " now " + nowSecs); - } - - @Test - void retryAfterKeyUsesFrontierPartitionMode(WireMockRuntimeInfo wmRuntimeInfo) { - stubFor( - get(urlMatching(".+")) - .willReturn(aResponse().withStatus(429).withHeader("Retry-After", "120"))); - // the frontier partitions its queues by partition.url.mode; the emitted - // host-info key must match that queue key so blockQueueUntil hits the - // right queue - not the fetcher's own queue mode (fetcher.queue.mode, - // byHost by default). Set the two to different modes to prove it. - Map extra = new HashMap<>(); - extra.put(Constants.PARTITION_MODEParamName, "byIP"); - - TestOutputCollector output = runFetch(wmRuntimeInfo, extra); - - List> hostInfo = output.getEmitted(Constants.HOST_INFO_STREAM_NAME); - assertEquals(1, hostInfo.size()); - String url = "http://localhost:" + wmRuntimeInfo.getHttpPort() + "/"; - // expected key is what the frontier used to create the queue: the - // partition key under partition.url.mode (here the resolved IP) - String expectedKey = URLPartitioner.getPartition(url, new Metadata(), "byIP"); - assertEquals(expectedKey, hostInfo.get(0).get(0)); - } - - @Test - void retryAfterCappedByConfig(WireMockRuntimeInfo wmRuntimeInfo) { - stubFor( - get(urlMatching(".+")) - .willReturn(aResponse().withStatus(429).withHeader("Retry-After", "3600"))); - Map extra = new HashMap<>(); - extra.put("fetcher.max.retry.after", 60L); - - TestOutputCollector output = runFetch(wmRuntimeInfo, extra); - - List> hostInfo = output.getEmitted(Constants.HOST_INFO_STREAM_NAME); - assertEquals(1, hostInfo.size()); - long blockUntil = (Long) hostInfo.get(0).get(1); - long nowSecs = System.currentTimeMillis() / 1000L; - // capped to 60s, not 3600 - assertTrue(blockUntil <= nowSecs + 65, "blockUntil was " + blockUntil); - } - - @Test - void noHostInfoWhenHeaderAbsent(WireMockRuntimeInfo wmRuntimeInfo) { - stubFor(get(urlMatching(".+")).willReturn(aResponse().withStatus(429))); - - TestOutputCollector output = runFetch(wmRuntimeInfo, new HashMap<>()); - - assertTrue(output.getEmitted(Constants.HOST_INFO_STREAM_NAME).isEmpty()); - } - - /** Fetches a single URL against the WireMock server and returns the collector. */ - private TestOutputCollector runFetch( - WireMockRuntimeInfo wmRuntimeInfo, Map extraConfig) { - // allow robots.txt explicitly: the tests use a catch-all 429 stub, and a - // 429/error on robots.txt makes crawler-commons treat the host as - // "deny all", so the page would never be fetched. Higher priority than - // the catch-all so it always wins for /robots.txt. - stubFor( - get(urlPathEqualTo("/robots.txt")) - .atPriority(1) - .willReturn(aResponse().withStatus(404))); - - TestOutputCollector output = new TestOutputCollector(); - Map config = new HashMap<>(); - config.put("http.agent.name", "this_is_only_a_test"); - config.putAll(extraConfig); - bolt.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(output)); - - Tuple tuple = mock(Tuple.class); - when(tuple.getSourceComponent()).thenReturn("source"); - when(tuple.getStringByField("url")) - .thenReturn("http://localhost:" + wmRuntimeInfo.getHttpPort() + "/"); - when(tuple.getValueByField("metadata")).thenReturn(null); - bolt.execute(tuple); - - await().atMost(30, TimeUnit.SECONDS) - .until(() -> !output.getEmitted(Constants.StatusStreamName).isEmpty()); - return output; - } } From 8288458fc7454da4a07d18a9c72b58f247e4140d Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 3 Jul 2026 14:00:07 +0200 Subject: [PATCH 4/6] feat: emit (key, metadata) on the queue stream from the URLFrontier status updater Mirrors the OpenSearch StatusUpdaterBolt behaviour introduced in #1974 so that queue/host-aware bolts can be wired in URLFrontier topologies too. --- .../urlfrontier/StatusUpdaterBolt.java | 7 +++++++ .../urlfrontier/StatusUpdaterBoltTest.java | 21 +++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java index 34ba3a4c9..5e5b3ca49 100644 --- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java @@ -59,6 +59,7 @@ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.metrics.ScopedCounter; @@ -378,6 +379,12 @@ public void store( partitionKey = "_DEFAULT_"; } + // send a tuple on the queue stream in case a bolt + // wants to handle it + super.collector.emit( + org.apache.stormcrawler.Constants.QUEUE_STREAM_NAME, + new Values(partitionKey, metadata)); + final Map mdCopy = new HashMap<>(metadata.size()); for (String k : metadata.keySet()) { String[] vals = metadata.getValues(k); diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java index 32d827e32..d092a8e1f 100644 --- a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java @@ -157,6 +157,27 @@ void canAckSimpleTupleWithMetadata() Assertions.assertEquals(true, isAcked(url, 5)); } + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + void emitsKeyAndMetadataOnQueueStream() { + final var url = "https://www.url.net/something"; + final var meta = new Metadata(); + meta.setValue(persistedKey, "somePersistedMetaInfo"); + store(url, Status.DISCOVERED, meta); + Assertions.assertEquals(true, isAcked(url, 5)); + + // a (key, metadata) tuple goes out on the queue stream, as in the + // OpenSearch StatusUpdaterBolt (#1974) + var emitted = output.getEmitted(org.apache.stormcrawler.Constants.QUEUE_STREAM_NAME); + Assertions.assertEquals(1, emitted.size()); + var values = emitted.get(0); + Assertions.assertEquals(2, values.size()); + Assertions.assertEquals("www.url.net", values.get(0)); + Assertions.assertTrue(values.get(1) instanceof Metadata); + Assertions.assertEquals( + "somePersistedMetaInfo", ((Metadata) values.get(1)).getFirstValue(persistedKey)); + } + @Test @Timeout(value = 2, unit = TimeUnit.MINUTES) void exceedingMaxMessagesInFlightAfterFrontierRestart() From 26ba79d28033831afba4c219624f65faaf05eab1 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 3 Jul 2026 14:05:02 +0200 Subject: [PATCH 5/6] feat: block the URLFrontier queue on Retry-After via the queue stream HostBlockBolt now consumes the (key, metadata) tuples of the generic queue stream: on a 429/503 whose metadata carries a valid Retry-After header it calls blockQueueUntil on the frontier for that queue key. The honoured delay is capped by urlfrontier.max.retry.after (default 24h, -1 to disable). Closes the loop described in #867 / #784 without touching the FetcherBolt. --- .../stormcrawler/urlfrontier/Constants.java | 8 ++ .../urlfrontier/HostBlockBolt.java | 98 +++++++++++++--- .../HostBlockBoltDecisionTest.java | 111 ++++++++++++++++++ .../urlfrontier/HostBlockBoltTest.java | 14 ++- 4 files changed, 210 insertions(+), 21 deletions(-) create mode 100644 external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java index d747aafe2..67f5cdf47 100644 --- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java @@ -43,4 +43,12 @@ private Constants() {} public static final String URLFRONTIER_UPDATER_MAX_MESSAGES_KEY = "urlfrontier.updater.max.messages"; public static final String URLFRONTIER_CRAWL_ID_KEY = "urlfrontier.crawlid"; + + /** + * Maximum delay in seconds honoured when a server requests a back-off via the Retry-After HTTP + * response header. {@code -1} disables the cap. Defaults to 86400 (24h). + */ + public static final String URLFRONTIER_MAX_RETRY_AFTER_KEY = "urlfrontier.max.retry.after"; + + public static final int URLFRONTIER_MAX_RETRY_AFTER_DEFAULT = 86400; } diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java index d882d8a1a..5908bb71b 100644 --- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java @@ -30,18 +30,26 @@ import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.protocol.ProtocolResponse; import org.apache.stormcrawler.util.ConfUtils; +import org.apache.stormcrawler.util.RetryAfterParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Consumes host back-off signals emitted by the {@code FetcherBolt} on the {@code hostinfo} stream - * and blocks the corresponding queue in URLFrontier until the requested time, via {@code - * blockQueueUntil}. See issues #867 and #784. Wire it with a fields grouping on {@code "key"} from - * the FetcherBolt's {@code hostinfo} stream. The {@code "key"} is the frontier queue key derived - * from {@code partition.url.mode} (the same setting the frontier uses to assign queues), so the - * block targets the matching queue. Connects via {@code urlfrontier.host} / {@code - * urlfrontier.port}; multi-node address resolution is not supported. + * Consumes the {@code queue} stream emitted by the status updater and blocks a queue in + * URLFrontier via {@code blockQueueUntil} whenever the tuple's metadata reports a rate-limit + * response (HTTP 429 or 503) carrying a Retry-After + * header. See issues #867 and #784. + * + *

Wire it with a fields grouping on {@code "key"} from the status updater's {@code queue} + * stream. The {@code "key"} is the frontier queue key derived from {@code partition.url.mode} (the + * same setting the frontier uses to assign queues), so the block targets the matching queue. The + * honoured delay is capped by {@code urlfrontier.max.retry.after} (in seconds, default 86400, -1 + * to disable the cap). Connects via {@code urlfrontier.host} / {@code urlfrontier.port}; + * multi-node address resolution is not supported. * *

The block is fire-and-forget: a failed call is logged but the tuple is acked anyway. A missed * block means the host is fetched once more and the next 429 re-emits the signal. @@ -50,6 +58,12 @@ public class HostBlockBolt extends BaseRichBolt { private static final Logger LOG = LoggerFactory.getLogger(HostBlockBolt.class); + /** Metadata key set by the FetcherBolt with the HTTP status code of the fetch. */ + private static final String STATUS_CODE_KEY = "fetch.statusCode"; + + /** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */ + private static final String RETRY_AFTER_HEADER = "retry-after"; + private static final StreamObserver NOOP_OBSERVER = new StreamObserver<>() { @Override @@ -69,11 +83,28 @@ public void onCompleted() {} private URLFrontierStub frontier; private String globalCrawlID; + /** Metadata key holding the Retry-After header, including the protocol prefix. */ + private String retryAfterKey; + + /** Upper bound in ms for the honoured Retry-After delay; -1 means no cap. */ + private long maxRetryAfterMs; + @Override public void prepare(Map conf, TopologyContext context, OutputCollector c) { this.collector = c; this.globalCrawlID = ConfUtils.getString(conf, Constants.URLFRONTIER_CRAWL_ID_KEY, CrawlID.DEFAULT); + // the protocol layer stores response headers in the metadata with this + // prefix; the FetcherBolt merges them into the status metadata + this.retryAfterKey = + ConfUtils.getString(conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, "") + + RETRY_AFTER_HEADER; + long maxRetryAfterSecs = + ConfUtils.getLong( + conf, + Constants.URLFRONTIER_MAX_RETRY_AFTER_KEY, + Constants.URLFRONTIER_MAX_RETRY_AFTER_DEFAULT); + this.maxRetryAfterMs = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L; // build channel + stub as in Spout.prepare (host/port only) this.channel = ManagedChannelUtil.createChannel( @@ -90,19 +121,52 @@ public void prepare(Map conf, TopologyContext context, OutputCol @Override public void execute(Tuple t) { - String key = t.getStringByField("key"); - long blockUntil = t.getLongByField("blockUntil"); - BlockQueueParams params = - BlockQueueParams.newBuilder() - .setKey(key) - .setCrawlID(globalCrawlID) - .setTime(blockUntil) - .setLocal(false) - .build(); - frontier.blockQueueUntil(params, NOOP_OBSERVER); + final String key = t.getStringByField("key"); + final Metadata metadata = (Metadata) t.getValueByField("metadata"); + final long blockUntil = + blockUntilFor(metadata, retryAfterKey, maxRetryAfterMs, System.currentTimeMillis()); + if (blockUntil > 0) { + LOG.debug("Blocking queue {} until {}", key, blockUntil); + BlockQueueParams params = + BlockQueueParams.newBuilder() + .setKey(key) + .setCrawlID(globalCrawlID) + .setTime(blockUntil) + .setLocal(false) + .build(); + frontier.blockQueueUntil(params, NOOP_OBSERVER); + } collector.ack(t); } + /** + * Decides whether a queue-stream tuple carries a server-requested back-off worth enforcing. + * Only a rate-limit (429) or unavailable (503) response with a valid Retry-After header + * qualifies; the requested delay is capped by {@code maxRetryAfterMs} unless negative. + * + * @return the absolute time to block the queue until, in epoch seconds, or {@code -1} if the + * tuple does not call for a block + */ + static long blockUntilFor(Metadata metadata, String retryAfterKey, long maxRetryAfterMs, long nowMs) { + if (metadata == null) { + return -1L; + } + final String statusCode = metadata.getFirstValue(STATUS_CODE_KEY); + // only on a rate-limit (429) or unavailable (503) response does + // Retry-After signal a host back-off worth acting on + if (!"429".equals(statusCode) && !"503".equals(statusCode)) { + return -1L; + } + long retryAfterMs = RetryAfterParser.parseDelay(metadata.getFirstValue(retryAfterKey)); + if (retryAfterMs <= 0) { + return -1L; + } + if (maxRetryAfterMs >= 0 && retryAfterMs > maxRetryAfterMs) { + retryAfterMs = maxRetryAfterMs; + } + return (nowMs + retryAfterMs) / 1000L; + } + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // terminal bolt diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java new file mode 100644 index 000000000..4a8a30883 --- /dev/null +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.urlfrontier; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.stormcrawler.Metadata; +import org.junit.jupiter.api.Test; + +/** + * Unit coverage for {@link HostBlockBolt#blockUntilFor(Metadata, String, long, long)} - the + * decision whether a queue-stream tuple carries a server-requested back-off worth enforcing. + */ +class HostBlockBoltDecisionTest { + + private static final String RETRY_AFTER_KEY = "protocol.retry-after"; + private static final long NOW_MS = 1_700_000_000_000L; + private static final long NO_CAP = -1L; + + private static Metadata md(String statusCode, String retryAfter) { + Metadata md = new Metadata(); + if (statusCode != null) { + md.setValue("fetch.statusCode", statusCode); + } + if (retryAfter != null) { + md.setValue(RETRY_AFTER_KEY, retryAfter); + } + return md; + } + + @Test + void blocksOn429WithRetryAfterSeconds() { + long blockUntil = + HostBlockBolt.blockUntilFor(md("429", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS); + assertEquals((NOW_MS + 120_000L) / 1000L, blockUntil); + } + + @Test + void blocksOn503WithRetryAfterSeconds() { + long blockUntil = + HostBlockBolt.blockUntilFor(md("503", "60"), RETRY_AFTER_KEY, NO_CAP, NOW_MS); + assertEquals((NOW_MS + 60_000L) / 1000L, blockUntil); + } + + @Test + void ignoresOtherStatusCodesEvenWithHeader() { + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md("200", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md("301", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md("403", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void ignores429WithoutHeader() { + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md("429", null), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void ignoresMissingStatusCode() { + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md(null, "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void ignoresMalformedHeaderAndZeroDelay() { + assertEquals( + -1L, + HostBlockBolt.blockUntilFor(md("429", "not-a-date"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + // a zero delay is not worth a frontier round-trip + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md("429", "0"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void capsDelayAtConfiguredMaximum() { + long capMs = 3_600_000L; // one hour + long blockUntil = + HostBlockBolt.blockUntilFor(md("429", "86400"), RETRY_AFTER_KEY, capMs, NOW_MS); + assertEquals((NOW_MS + capMs) / 1000L, blockUntil); + } + + @Test + void negativeCapMeansUncapped() { + long blockUntil = + HostBlockBolt.blockUntilFor(md("429", "86400"), RETRY_AFTER_KEY, NO_CAP, NOW_MS); + assertEquals((NOW_MS + 86_400_000L) / 1000L, blockUntil); + } + + @Test + void nullMetadataIsIgnored() { + assertEquals(-1L, HostBlockBolt.blockUntilFor(null, RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } +} diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java index a3e87d16c..a3540579c 100644 --- a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java @@ -40,6 +40,7 @@ import org.apache.stormcrawler.TestOutputCollector; import org.apache.stormcrawler.TestUtil; import org.apache.stormcrawler.persistence.Status; +import org.apache.stormcrawler.protocol.ProtocolResponse; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -136,13 +137,18 @@ void blocksHostQueueUntilTime() { long nowSecs = System.currentTimeMillis() / 1000L; - // block the host for an hour via the bolt under test + // block the host for an hour via the bolt under test: a queue-stream + // tuple whose metadata reports a 429 with a Retry-After of one hour HostBlockBolt bolt = new HostBlockBolt(); - bolt.prepare( - frontierConfig(), TestUtil.getMockedTopologyContext(), mock(OutputCollector.class)); + Map conf = frontierConfig(); + conf.put(ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, "protocol."); + bolt.prepare(conf, TestUtil.getMockedTopologyContext(), mock(OutputCollector.class)); Tuple t = mock(Tuple.class); when(t.getStringByField("key")).thenReturn(HOST); - when(t.getLongByField("blockUntil")).thenReturn(nowSecs + 3600); + Metadata md = new Metadata(); + md.setValue("fetch.statusCode", "429"); + md.setValue("protocol.retry-after", "3600"); + when(t.getValueByField("metadata")).thenReturn(md); bolt.execute(t); // while blocked, getURLs must not return the host From 2a350d190043d586aa206281b471462cb0e16b7d Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Fri, 3 Jul 2026 14:10:42 +0200 Subject: [PATCH 6/6] Reformat --- .../stormcrawler/util/RetryAfterParser.java | 4 ++-- .../stormcrawler/urlfrontier/HostBlockBolt.java | 15 ++++++++------- .../urlfrontier/HostBlockBoltDecisionTest.java | 12 ++++++++---- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java index c51fcb1e3..8c87e554f 100644 --- a/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java +++ b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java @@ -29,8 +29,8 @@ /** * Parses the value of a Retry-After - * HTTP response header, as sent by servers alongside a 429 (Too Many Requests) or 503 (Service + * href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After">Retry-After HTTP + * response header, as sent by servers alongside a 429 (Too Many Requests) or 503 (Service * Unavailable) response to request a back-off. */ public final class RetryAfterParser { diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java index 5908bb71b..b95cb3c45 100644 --- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java @@ -38,18 +38,18 @@ import org.slf4j.LoggerFactory; /** - * Consumes the {@code queue} stream emitted by the status updater and blocks a queue in - * URLFrontier via {@code blockQueueUntil} whenever the tuple's metadata reports a rate-limit - * response (HTTP 429 or 503) carrying a Retry-After * header. See issues #867 and #784. * *

Wire it with a fields grouping on {@code "key"} from the status updater's {@code queue} * stream. The {@code "key"} is the frontier queue key derived from {@code partition.url.mode} (the * same setting the frontier uses to assign queues), so the block targets the matching queue. The - * honoured delay is capped by {@code urlfrontier.max.retry.after} (in seconds, default 86400, -1 - * to disable the cap). Connects via {@code urlfrontier.host} / {@code urlfrontier.port}; - * multi-node address resolution is not supported. + * honoured delay is capped by {@code urlfrontier.max.retry.after} (in seconds, default 86400, -1 to + * disable the cap). Connects via {@code urlfrontier.host} / {@code urlfrontier.port}; multi-node + * address resolution is not supported. * *

The block is fire-and-forget: a failed call is logged but the tuple is acked anyway. A missed * block means the host is fetched once more and the next 429 re-emits the signal. @@ -147,7 +147,8 @@ public void execute(Tuple t) { * @return the absolute time to block the queue until, in epoch seconds, or {@code -1} if the * tuple does not call for a block */ - static long blockUntilFor(Metadata metadata, String retryAfterKey, long maxRetryAfterMs, long nowMs) { + static long blockUntilFor( + Metadata metadata, String retryAfterKey, long maxRetryAfterMs, long nowMs) { if (metadata == null) { return -1L; } diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java index 4a8a30883..bee5c9d13 100644 --- a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java @@ -60,11 +60,14 @@ void blocksOn503WithRetryAfterSeconds() { @Test void ignoresOtherStatusCodesEvenWithHeader() { assertEquals( - -1L, HostBlockBolt.blockUntilFor(md("200", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + -1L, + HostBlockBolt.blockUntilFor(md("200", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); assertEquals( - -1L, HostBlockBolt.blockUntilFor(md("301", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + -1L, + HostBlockBolt.blockUntilFor(md("301", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); assertEquals( - -1L, HostBlockBolt.blockUntilFor(md("403", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + -1L, + HostBlockBolt.blockUntilFor(md("403", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); } @Test @@ -83,7 +86,8 @@ void ignoresMissingStatusCode() { void ignoresMalformedHeaderAndZeroDelay() { assertEquals( -1L, - HostBlockBolt.blockUntilFor(md("429", "not-a-date"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + HostBlockBolt.blockUntilFor( + md("429", "not-a-date"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); // a zero delay is not worth a frontier round-trip assertEquals( -1L, HostBlockBolt.blockUntilFor(md("429", "0"), RETRY_AFTER_KEY, NO_CAP, NOW_MS));