diff --git a/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java
new file mode 100644
index 000000000..8c87e554f
--- /dev/null
+++ b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.util;
+
+import java.time.DateTimeException;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Parses the value of a Retry-After HTTP
+ * response header, as sent by servers alongside a 429 (Too Many Requests) or 503 (Service
+ * Unavailable) response to request a back-off.
+ */
+public final class RetryAfterParser {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RetryAfterParser.class);
+
+ /**
+ * Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015
+ * 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code
+ * SimpleDateFormat}, which matters as this is shared across threads.
+ */
+ private static final DateTimeFormatter HTTP_DATE_FORMATTER =
+ DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT)
+ .withZone(ZoneOffset.UTC);
+
+ /** Pre-compiled matcher for the numeric (delta-seconds) form of the Retry-After header. */
+ private static final Pattern RETRY_AFTER_SECONDS = Pattern.compile("[0-9]+");
+
+ /**
+ * Upper bound on the length of a Retry-After header value we will attempt to parse. A
+ * delta-seconds value is at most 19 digits and an HTTP date ~29 chars; anything longer is
+ * rejected to bound the work done on this attacker-controlled value.
+ */
+ private static final int MAX_RETRY_AFTER_VALUE_LENGTH = 64;
+
+ private RetryAfterParser() {}
+
+ /**
+ * Parses a Retry-After header value into a delay expressed in milliseconds, relative to now.
+ * The value is either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed,
+ * 21 Oct 2015 07:28:00 GMT}).
+ *
+ * @return the delay in milliseconds (possibly {@code 0}), or {@code -1} if the header is
+ * absent, malformed, in the past or implausibly long.
+ */
+ public static long parseDelay(String retryAfter) {
+ if (StringUtils.isBlank(retryAfter)) {
+ return -1;
+ }
+ retryAfter = retryAfter.trim();
+ // a valid Retry-After is either a delta-seconds value or an HTTP date;
+ // reject anything implausibly long to bound the work done on this
+ // attacker-controlled header value
+ if (retryAfter.length() > MAX_RETRY_AFTER_VALUE_LENGTH) {
+ return -1;
+ }
+ // delay expressed as a number of seconds
+ if (RETRY_AFTER_SECONDS.matcher(retryAfter).matches()) {
+ try {
+ return Math.multiplyExact(Long.parseLong(retryAfter), 1000L);
+ } catch (NumberFormatException | ArithmeticException e) {
+ // value too large to fit in a long, or its conversion to ms
+ // overflows - ignore
+ return -1;
+ }
+ }
+ // delay expressed as an HTTP date (IMF-fixdate form only, as produced by
+ // virtually all servers; the legacy RFC 850 / asctime forms are not accepted)
+ try {
+ Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter));
+ long delay = date.toEpochMilli() - System.currentTimeMillis();
+ return delay > 0 ? delay : -1;
+ } catch (DateTimeException e) {
+ LOG.debug("Invalid Retry-After header value: {}", retryAfter);
+ return -1;
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java b/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java
new file mode 100644
index 000000000..726feb40e
--- /dev/null
+++ b/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Locale;
+import java.util.Random;
+import org.junit.jupiter.api.Test;
+
+/** Unit and randomised (fuzz) coverage for {@link RetryAfterParser#parseDelay(String)}. */
+class RetryAfterParserTest {
+
+ private static final DateTimeFormatter HTTP_DATE =
+ DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT)
+ .withZone(ZoneOffset.UTC);
+
+ @Test
+ void parseRetryAfterSeconds() {
+ assertEquals(120_000L, RetryAfterParser.parseDelay("120"));
+ // surrounding whitespace is tolerated
+ assertEquals(120_000L, RetryAfterParser.parseDelay(" 120 "));
+ // zero is a valid (if pointless) delay and is not treated as "absent"
+ assertEquals(0L, RetryAfterParser.parseDelay("0"));
+ }
+
+ @Test
+ void parseRetryAfterFutureDate() {
+ Instant future = Instant.now().plusSeconds(300);
+ long delay = RetryAfterParser.parseDelay(HTTP_DATE.format(future));
+ // allow a small tolerance for the time elapsed during the call
+ assertTrue(delay > 250_000 && delay <= 300_000, "delay was " + delay);
+ }
+
+ @Test
+ void parseRetryAfterPastDateReturnsMinusOne() {
+ Instant past = Instant.now().minusSeconds(300);
+ assertEquals(-1L, RetryAfterParser.parseDelay(HTTP_DATE.format(past)));
+ }
+
+ @Test
+ void parseRetryAfterBlankOrMalformedReturnsMinusOne() {
+ assertEquals(-1L, RetryAfterParser.parseDelay(null));
+ assertEquals(-1L, RetryAfterParser.parseDelay(""));
+ assertEquals(-1L, RetryAfterParser.parseDelay(" "));
+ assertEquals(-1L, RetryAfterParser.parseDelay("not-a-date"));
+ assertEquals(-1L, RetryAfterParser.parseDelay("12.5"));
+ assertEquals(-1L, RetryAfterParser.parseDelay("-5"));
+ }
+
+ @Test
+ void parseRetryAfterOverlongValueReturnsMinusOne() {
+ // an implausibly long header value is rejected without further parsing
+ String tooLong = "1".repeat(65);
+ assertEquals(-1L, RetryAfterParser.parseDelay(tooLong));
+ }
+
+ @Test
+ void parseRetryAfterParseOverflowReturnsMinusOne() {
+ // too large to fit in a long at all
+ assertEquals(-1L, RetryAfterParser.parseDelay("999999999999999999999999999"));
+ }
+
+ @Test
+ void parseRetryAfterMillisecondOverflowReturnsMinusOne() {
+ // parses as a valid long, but * 1000 overflows a long (regression: was a
+ // silent overflow yielding a negative delay)
+ assertEquals(-1L, RetryAfterParser.parseDelay("3687950425865536959"));
+ assertEquals(-1L, RetryAfterParser.parseDelay(Long.toString(Long.MAX_VALUE)));
+ }
+
+ @Test
+ void parseRetryAfterRandomSecondsRoundTrip() {
+ Random rnd = new Random(7);
+ for (int i = 0; i < 1_000; i++) {
+ long secs = rnd.nextInt(1_000_000);
+ assertEquals(secs * 1000L, RetryAfterParser.parseDelay(Long.toString(secs)));
+ }
+ }
+
+ @Test
+ void parseRetryAfterNeverThrowsForRandomInput() {
+ Random rnd = new Random(42);
+ for (int i = 0; i < 10_000; i++) {
+ String input = randomRetryAfterValue(rnd);
+ long result = RetryAfterParser.parseDelay(input);
+ // contract: never throws, never returns anything below -1
+ assertTrue(result >= -1L, "value " + result + " for input <" + input + ">");
+ }
+ }
+
+ /**
+ * Produces a wide spectrum of inputs: digit runs (including values that overflow a long or
+ * overflow when converted to ms), real HTTP dates (past and future), blanks, random printable
+ * garbage and nulls.
+ */
+ private static String randomRetryAfterValue(Random rnd) {
+ switch (rnd.nextInt(5)) {
+ case 0:
+ int len = 1 + rnd.nextInt(40);
+ StringBuilder digits = new StringBuilder();
+ for (int i = 0; i < len; i++) {
+ digits.append((char) ('0' + rnd.nextInt(10)));
+ }
+ return digits.toString();
+ case 1:
+ long offset = (rnd.nextBoolean() ? 1L : -1L) * rnd.nextInt(1_000_000);
+ return HTTP_DATE.format(Instant.now().plusSeconds(offset));
+ case 2:
+ return "";
+ case 3:
+ int glen = rnd.nextInt(30);
+ StringBuilder garbage = new StringBuilder();
+ for (int i = 0; i < glen; i++) {
+ garbage.append((char) (32 + rnd.nextInt(95)));
+ }
+ return garbage.toString();
+ default:
+ return null;
+ }
+ }
+}
diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java
index d747aafe2..67f5cdf47 100644
--- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java
+++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java
@@ -43,4 +43,12 @@ private Constants() {}
public static final String URLFRONTIER_UPDATER_MAX_MESSAGES_KEY =
"urlfrontier.updater.max.messages";
public static final String URLFRONTIER_CRAWL_ID_KEY = "urlfrontier.crawlid";
+
+ /**
+ * Maximum delay in seconds honoured when a server requests a back-off via the Retry-After HTTP
+ * response header. {@code -1} disables the cap. Defaults to 86400 (24h).
+ */
+ public static final String URLFRONTIER_MAX_RETRY_AFTER_KEY = "urlfrontier.max.retry.after";
+
+ public static final int URLFRONTIER_MAX_RETRY_AFTER_DEFAULT = 86400;
}
diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java
new file mode 100644
index 000000000..b95cb3c45
--- /dev/null
+++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.urlfrontier;
+
+import crawlercommons.urlfrontier.CrawlID;
+import crawlercommons.urlfrontier.URLFrontierGrpc;
+import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierStub;
+import crawlercommons.urlfrontier.Urlfrontier.BlockQueueParams;
+import crawlercommons.urlfrontier.Urlfrontier.Empty;
+import io.grpc.ManagedChannel;
+import io.grpc.stub.StreamObserver;
+import java.util.Map;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.protocol.ProtocolResponse;
+import org.apache.stormcrawler.util.ConfUtils;
+import org.apache.stormcrawler.util.RetryAfterParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Consumes the {@code queue} stream emitted by the status updater and blocks a queue in URLFrontier
+ * via {@code blockQueueUntil} whenever the tuple's metadata reports a rate-limit response (HTTP 429
+ * or 503) carrying a Retry-After
+ * header. See issues #867 and #784.
+ *
+ *
Wire it with a fields grouping on {@code "key"} from the status updater's {@code queue}
+ * stream. The {@code "key"} is the frontier queue key derived from {@code partition.url.mode} (the
+ * same setting the frontier uses to assign queues), so the block targets the matching queue. The
+ * honoured delay is capped by {@code urlfrontier.max.retry.after} (in seconds, default 86400, -1 to
+ * disable the cap). Connects via {@code urlfrontier.host} / {@code urlfrontier.port}; multi-node
+ * address resolution is not supported.
+ *
+ *
The block is fire-and-forget: a failed call is logged but the tuple is acked anyway. A missed
+ * block means the host is fetched once more and the next 429 re-emits the signal.
+ */
+public class HostBlockBolt extends BaseRichBolt {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HostBlockBolt.class);
+
+ /** Metadata key set by the FetcherBolt with the HTTP status code of the fetch. */
+ private static final String STATUS_CODE_KEY = "fetch.statusCode";
+
+ /** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */
+ private static final String RETRY_AFTER_HEADER = "retry-after";
+
+ private static final StreamObserver NOOP_OBSERVER =
+ new StreamObserver<>() {
+ @Override
+ public void onNext(Empty value) {}
+
+ @Override
+ public void onError(Throwable t) {
+ LOG.warn("blockQueueUntil failed", t);
+ }
+
+ @Override
+ public void onCompleted() {}
+ };
+
+ private OutputCollector collector;
+ private ManagedChannel channel;
+ private URLFrontierStub frontier;
+ private String globalCrawlID;
+
+ /** Metadata key holding the Retry-After header, including the protocol prefix. */
+ private String retryAfterKey;
+
+ /** Upper bound in ms for the honoured Retry-After delay; -1 means no cap. */
+ private long maxRetryAfterMs;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector c) {
+ this.collector = c;
+ this.globalCrawlID =
+ ConfUtils.getString(conf, Constants.URLFRONTIER_CRAWL_ID_KEY, CrawlID.DEFAULT);
+ // the protocol layer stores response headers in the metadata with this
+ // prefix; the FetcherBolt merges them into the status metadata
+ this.retryAfterKey =
+ ConfUtils.getString(conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, "")
+ + RETRY_AFTER_HEADER;
+ long maxRetryAfterSecs =
+ ConfUtils.getLong(
+ conf,
+ Constants.URLFRONTIER_MAX_RETRY_AFTER_KEY,
+ Constants.URLFRONTIER_MAX_RETRY_AFTER_DEFAULT);
+ this.maxRetryAfterMs = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L;
+ // build channel + stub as in Spout.prepare (host/port only)
+ this.channel =
+ ManagedChannelUtil.createChannel(
+ ConfUtils.getString(
+ conf,
+ Constants.URLFRONTIER_HOST_KEY,
+ Constants.URLFRONTIER_DEFAULT_HOST),
+ ConfUtils.getInt(
+ conf,
+ Constants.URLFRONTIER_PORT_KEY,
+ Constants.URLFRONTIER_DEFAULT_PORT));
+ this.frontier = URLFrontierGrpc.newStub(channel).withWaitForReady();
+ }
+
+ @Override
+ public void execute(Tuple t) {
+ final String key = t.getStringByField("key");
+ final Metadata metadata = (Metadata) t.getValueByField("metadata");
+ final long blockUntil =
+ blockUntilFor(metadata, retryAfterKey, maxRetryAfterMs, System.currentTimeMillis());
+ if (blockUntil > 0) {
+ LOG.debug("Blocking queue {} until {}", key, blockUntil);
+ BlockQueueParams params =
+ BlockQueueParams.newBuilder()
+ .setKey(key)
+ .setCrawlID(globalCrawlID)
+ .setTime(blockUntil)
+ .setLocal(false)
+ .build();
+ frontier.blockQueueUntil(params, NOOP_OBSERVER);
+ }
+ collector.ack(t);
+ }
+
+ /**
+ * Decides whether a queue-stream tuple carries a server-requested back-off worth enforcing.
+ * Only a rate-limit (429) or unavailable (503) response with a valid Retry-After header
+ * qualifies; the requested delay is capped by {@code maxRetryAfterMs} unless negative.
+ *
+ * @return the absolute time to block the queue until, in epoch seconds, or {@code -1} if the
+ * tuple does not call for a block
+ */
+ static long blockUntilFor(
+ Metadata metadata, String retryAfterKey, long maxRetryAfterMs, long nowMs) {
+ if (metadata == null) {
+ return -1L;
+ }
+ final String statusCode = metadata.getFirstValue(STATUS_CODE_KEY);
+ // only on a rate-limit (429) or unavailable (503) response does
+ // Retry-After signal a host back-off worth acting on
+ if (!"429".equals(statusCode) && !"503".equals(statusCode)) {
+ return -1L;
+ }
+ long retryAfterMs = RetryAfterParser.parseDelay(metadata.getFirstValue(retryAfterKey));
+ if (retryAfterMs <= 0) {
+ return -1L;
+ }
+ if (maxRetryAfterMs >= 0 && retryAfterMs > maxRetryAfterMs) {
+ retryAfterMs = maxRetryAfterMs;
+ }
+ return (nowMs + retryAfterMs) / 1000L;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ // terminal bolt
+ }
+
+ @Override
+ public void cleanup() {
+ if (channel != null) {
+ channel.shutdown();
+ }
+ }
+}
diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java
index 34ba3a4c9..5e5b3ca49 100644
--- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java
+++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java
@@ -59,6 +59,7 @@
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
import org.apache.stormcrawler.Metadata;
import org.apache.stormcrawler.metrics.CrawlerMetrics;
import org.apache.stormcrawler.metrics.ScopedCounter;
@@ -378,6 +379,12 @@ public void store(
partitionKey = "_DEFAULT_";
}
+ // send a tuple on the queue stream in case a bolt
+ // wants to handle it
+ super.collector.emit(
+ org.apache.stormcrawler.Constants.QUEUE_STREAM_NAME,
+ new Values(partitionKey, metadata));
+
final Map mdCopy = new HashMap<>(metadata.size());
for (String k : metadata.keySet()) {
String[] vals = metadata.getValues(k);
diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java
new file mode 100644
index 000000000..bee5c9d13
--- /dev/null
+++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.urlfrontier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import org.apache.stormcrawler.Metadata;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit coverage for {@link HostBlockBolt#blockUntilFor(Metadata, String, long, long)} - the
+ * decision whether a queue-stream tuple carries a server-requested back-off worth enforcing.
+ */
+class HostBlockBoltDecisionTest {
+
+ private static final String RETRY_AFTER_KEY = "protocol.retry-after";
+ private static final long NOW_MS = 1_700_000_000_000L;
+ private static final long NO_CAP = -1L;
+
+ private static Metadata md(String statusCode, String retryAfter) {
+ Metadata md = new Metadata();
+ if (statusCode != null) {
+ md.setValue("fetch.statusCode", statusCode);
+ }
+ if (retryAfter != null) {
+ md.setValue(RETRY_AFTER_KEY, retryAfter);
+ }
+ return md;
+ }
+
+ @Test
+ void blocksOn429WithRetryAfterSeconds() {
+ long blockUntil =
+ HostBlockBolt.blockUntilFor(md("429", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS);
+ assertEquals((NOW_MS + 120_000L) / 1000L, blockUntil);
+ }
+
+ @Test
+ void blocksOn503WithRetryAfterSeconds() {
+ long blockUntil =
+ HostBlockBolt.blockUntilFor(md("503", "60"), RETRY_AFTER_KEY, NO_CAP, NOW_MS);
+ assertEquals((NOW_MS + 60_000L) / 1000L, blockUntil);
+ }
+
+ @Test
+ void ignoresOtherStatusCodesEvenWithHeader() {
+ assertEquals(
+ -1L,
+ HostBlockBolt.blockUntilFor(md("200", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ assertEquals(
+ -1L,
+ HostBlockBolt.blockUntilFor(md("301", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ assertEquals(
+ -1L,
+ HostBlockBolt.blockUntilFor(md("403", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ }
+
+ @Test
+ void ignores429WithoutHeader() {
+ assertEquals(
+ -1L, HostBlockBolt.blockUntilFor(md("429", null), RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ }
+
+ @Test
+ void ignoresMissingStatusCode() {
+ assertEquals(
+ -1L, HostBlockBolt.blockUntilFor(md(null, "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ }
+
+ @Test
+ void ignoresMalformedHeaderAndZeroDelay() {
+ assertEquals(
+ -1L,
+ HostBlockBolt.blockUntilFor(
+ md("429", "not-a-date"), RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ // a zero delay is not worth a frontier round-trip
+ assertEquals(
+ -1L, HostBlockBolt.blockUntilFor(md("429", "0"), RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ }
+
+ @Test
+ void capsDelayAtConfiguredMaximum() {
+ long capMs = 3_600_000L; // one hour
+ long blockUntil =
+ HostBlockBolt.blockUntilFor(md("429", "86400"), RETRY_AFTER_KEY, capMs, NOW_MS);
+ assertEquals((NOW_MS + capMs) / 1000L, blockUntil);
+ }
+
+ @Test
+ void negativeCapMeansUncapped() {
+ long blockUntil =
+ HostBlockBolt.blockUntilFor(md("429", "86400"), RETRY_AFTER_KEY, NO_CAP, NOW_MS);
+ assertEquals((NOW_MS + 86_400_000L) / 1000L, blockUntil);
+ }
+
+ @Test
+ void nullMetadataIsIgnored() {
+ assertEquals(-1L, HostBlockBolt.blockUntilFor(null, RETRY_AFTER_KEY, NO_CAP, NOW_MS));
+ }
+}
diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java
new file mode 100644
index 000000000..a3540579c
--- /dev/null
+++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stormcrawler.urlfrontier;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import crawlercommons.urlfrontier.CrawlID;
+import crawlercommons.urlfrontier.URLFrontierGrpc;
+import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierBlockingStub;
+import crawlercommons.urlfrontier.Urlfrontier.BlockQueueParams;
+import crawlercommons.urlfrontier.Urlfrontier.GetParams;
+import crawlercommons.urlfrontier.Urlfrontier.URLInfo;
+import io.grpc.ManagedChannel;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.tuple.Tuple;
+import org.apache.stormcrawler.Metadata;
+import org.apache.stormcrawler.TestOutputCollector;
+import org.apache.stormcrawler.TestUtil;
+import org.apache.stormcrawler.persistence.Status;
+import org.apache.stormcrawler.protocol.ProtocolResponse;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers(disabledWithoutDocker = true)
+class HostBlockBoltTest {
+
+ private static final String HOST = "example.com";
+
+ private URLFrontierContainer container;
+ private ManagedChannel channel;
+ private URLFrontierBlockingStub blocking;
+
+ @BeforeEach
+ void before() {
+ String image = "crawlercommons/url-frontier";
+ String version = System.getProperty("urlfrontier-version");
+ if (version != null) {
+ image += ":" + version;
+ }
+ container = new URLFrontierContainer(image);
+ container.start();
+ var connection = container.getFrontierConnection();
+ channel = ManagedChannelUtil.createChannel(connection.getHost(), connection.getPort());
+ blocking = URLFrontierGrpc.newBlockingStub(channel);
+ }
+
+ @AfterEach
+ void after() {
+ if (channel != null) {
+ channel.shutdownNow();
+ }
+ container.close();
+ }
+
+ private Map frontierConfig() {
+ var connection = container.getFrontierConnection();
+ Map config = new HashMap<>();
+ config.put(Constants.URLFRONTIER_HOST_KEY, connection.getHost());
+ config.put(Constants.URLFRONTIER_PORT_KEY, connection.getPort());
+ return config;
+ }
+
+ /** Seeds one DISCOVERED url for {@link #HOST} so the queue exists and is fetchable. */
+ private void seedOneUrl() {
+ StatusUpdaterBolt seeder = new StatusUpdaterBolt();
+ TestOutputCollector out = new TestOutputCollector();
+ Map config = frontierConfig();
+ config.put(
+ "urlbuffer.class", "org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer");
+ config.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler");
+ config.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h");
+ config.put("urlfrontier.updater.max.messages", 1);
+ config.put("urlfrontier.cache.expireafter.sec", 10);
+ seeder.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(out));
+
+ String url = "http://" + HOST + "/";
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getValueByField("status")).thenReturn(Status.DISCOVERED);
+ when(tuple.getStringByField("url")).thenReturn(url);
+ when(tuple.getValueByField("metadata")).thenReturn(new Metadata());
+ seeder.execute(tuple);
+
+ await().atMost(20, TimeUnit.SECONDS)
+ .until(
+ () ->
+ out.getAckedTuples().stream()
+ .anyMatch(t -> url.equals(t.getStringByField("url"))));
+ seeder.cleanup();
+ }
+
+ /** Returns the set of queue keys currently handed out by getURLs. */
+ private Set keysFromGetURLs() {
+ GetParams params =
+ GetParams.newBuilder()
+ .setCrawlID(CrawlID.DEFAULT)
+ .setMaxQueues(10)
+ .setMaxUrlsPerQueue(10)
+ .build();
+ Set keys = new HashSet<>();
+ Iterator it = blocking.getURLs(params);
+ while (it.hasNext()) {
+ keys.add(it.next().getKey());
+ }
+ return keys;
+ }
+
+ @Test
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ void blocksHostQueueUntilTime() {
+ seedOneUrl();
+
+ long nowSecs = System.currentTimeMillis() / 1000L;
+
+ // block the host for an hour via the bolt under test: a queue-stream
+ // tuple whose metadata reports a 429 with a Retry-After of one hour
+ HostBlockBolt bolt = new HostBlockBolt();
+ Map conf = frontierConfig();
+ conf.put(ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, "protocol.");
+ bolt.prepare(conf, TestUtil.getMockedTopologyContext(), mock(OutputCollector.class));
+ Tuple t = mock(Tuple.class);
+ when(t.getStringByField("key")).thenReturn(HOST);
+ Metadata md = new Metadata();
+ md.setValue("fetch.statusCode", "429");
+ md.setValue("protocol.retry-after", "3600");
+ when(t.getValueByField("metadata")).thenReturn(md);
+ bolt.execute(t);
+
+ // while blocked, getURLs must not return the host
+ await().atMost(15, TimeUnit.SECONDS).until(() -> !keysFromGetURLs().contains(HOST));
+
+ // unblock (a past time releases the queue) and the host becomes available again
+ blocking.blockQueueUntil(
+ BlockQueueParams.newBuilder()
+ .setKey(HOST)
+ .setCrawlID(CrawlID.DEFAULT)
+ .setTime(nowSecs - 1)
+ .build());
+ await().atMost(15, TimeUnit.SECONDS).until(() -> keysFromGetURLs().contains(HOST));
+
+ bolt.cleanup();
+ }
+}
diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java
index 32d827e32..d092a8e1f 100644
--- a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java
+++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java
@@ -157,6 +157,27 @@ void canAckSimpleTupleWithMetadata()
Assertions.assertEquals(true, isAcked(url, 5));
}
+ @Test
+ @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ void emitsKeyAndMetadataOnQueueStream() {
+ final var url = "https://www.url.net/something";
+ final var meta = new Metadata();
+ meta.setValue(persistedKey, "somePersistedMetaInfo");
+ store(url, Status.DISCOVERED, meta);
+ Assertions.assertEquals(true, isAcked(url, 5));
+
+ // a (key, metadata) tuple goes out on the queue stream, as in the
+ // OpenSearch StatusUpdaterBolt (#1974)
+ var emitted = output.getEmitted(org.apache.stormcrawler.Constants.QUEUE_STREAM_NAME);
+ Assertions.assertEquals(1, emitted.size());
+ var values = emitted.get(0);
+ Assertions.assertEquals(2, values.size());
+ Assertions.assertEquals("www.url.net", values.get(0));
+ Assertions.assertTrue(values.get(1) instanceof Metadata);
+ Assertions.assertEquals(
+ "somePersistedMetaInfo", ((Metadata) values.get(1)).getFirstValue(persistedKey));
+ }
+
@Test
@Timeout(value = 2, unit = TimeUnit.MINUTES)
void exceedingMaxMessagesInFlightAfterFrontierRestart()