diff --git a/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java new file mode 100644 index 000000000..8c87e554f --- /dev/null +++ b/core/src/main/java/org/apache/stormcrawler/util/RetryAfterParser.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.util; + +import java.time.DateTimeException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Parses the value of a Retry-After HTTP + * response header, as sent by servers alongside a 429 (Too Many Requests) or 503 (Service + * Unavailable) response to request a back-off. + */ +public final class RetryAfterParser { + + private static final Logger LOG = LoggerFactory.getLogger(RetryAfterParser.class); + + /** + * Formatter for the HTTP-date form of the Retry-After header, e.g. {@code Wed, 21 Oct 2015 + * 07:28:00 GMT}. {@link DateTimeFormatter} is immutable and thread-safe, unlike {@code + * SimpleDateFormat}, which matters as this is shared across threads. + */ + private static final DateTimeFormatter HTTP_DATE_FORMATTER = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + + /** Pre-compiled matcher for the numeric (delta-seconds) form of the Retry-After header. */ + private static final Pattern RETRY_AFTER_SECONDS = Pattern.compile("[0-9]+"); + + /** + * Upper bound on the length of a Retry-After header value we will attempt to parse. A + * delta-seconds value is at most 19 digits and an HTTP date ~29 chars; anything longer is + * rejected to bound the work done on this attacker-controlled value. + */ + private static final int MAX_RETRY_AFTER_VALUE_LENGTH = 64; + + private RetryAfterParser() {} + + /** + * Parses a Retry-After header value into a delay expressed in milliseconds, relative to now. + * The value is either a number of seconds (e.g. {@code 120}) or an HTTP date (e.g. {@code Wed, + * 21 Oct 2015 07:28:00 GMT}). + * + * @return the delay in milliseconds (possibly {@code 0}), or {@code -1} if the header is + * absent, malformed, in the past or implausibly long. + */ + public static long parseDelay(String retryAfter) { + if (StringUtils.isBlank(retryAfter)) { + return -1; + } + retryAfter = retryAfter.trim(); + // a valid Retry-After is either a delta-seconds value or an HTTP date; + // reject anything implausibly long to bound the work done on this + // attacker-controlled header value + if (retryAfter.length() > MAX_RETRY_AFTER_VALUE_LENGTH) { + return -1; + } + // delay expressed as a number of seconds + if (RETRY_AFTER_SECONDS.matcher(retryAfter).matches()) { + try { + return Math.multiplyExact(Long.parseLong(retryAfter), 1000L); + } catch (NumberFormatException | ArithmeticException e) { + // value too large to fit in a long, or its conversion to ms + // overflows - ignore + return -1; + } + } + // delay expressed as an HTTP date (IMF-fixdate form only, as produced by + // virtually all servers; the legacy RFC 850 / asctime forms are not accepted) + try { + Instant date = Instant.from(HTTP_DATE_FORMATTER.parse(retryAfter)); + long delay = date.toEpochMilli() - System.currentTimeMillis(); + return delay > 0 ? delay : -1; + } catch (DateTimeException e) { + LOG.debug("Invalid Retry-After header value: {}", retryAfter); + return -1; + } + } +} diff --git a/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java b/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java new file mode 100644 index 000000000..726feb40e --- /dev/null +++ b/core/src/test/java/org/apache/stormcrawler/util/RetryAfterParserTest.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Locale; +import java.util.Random; +import org.junit.jupiter.api.Test; + +/** Unit and randomised (fuzz) coverage for {@link RetryAfterParser#parseDelay(String)}. */ +class RetryAfterParserTest { + + private static final DateTimeFormatter HTTP_DATE = + DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss 'GMT'", Locale.ROOT) + .withZone(ZoneOffset.UTC); + + @Test + void parseRetryAfterSeconds() { + assertEquals(120_000L, RetryAfterParser.parseDelay("120")); + // surrounding whitespace is tolerated + assertEquals(120_000L, RetryAfterParser.parseDelay(" 120 ")); + // zero is a valid (if pointless) delay and is not treated as "absent" + assertEquals(0L, RetryAfterParser.parseDelay("0")); + } + + @Test + void parseRetryAfterFutureDate() { + Instant future = Instant.now().plusSeconds(300); + long delay = RetryAfterParser.parseDelay(HTTP_DATE.format(future)); + // allow a small tolerance for the time elapsed during the call + assertTrue(delay > 250_000 && delay <= 300_000, "delay was " + delay); + } + + @Test + void parseRetryAfterPastDateReturnsMinusOne() { + Instant past = Instant.now().minusSeconds(300); + assertEquals(-1L, RetryAfterParser.parseDelay(HTTP_DATE.format(past))); + } + + @Test + void parseRetryAfterBlankOrMalformedReturnsMinusOne() { + assertEquals(-1L, RetryAfterParser.parseDelay(null)); + assertEquals(-1L, RetryAfterParser.parseDelay("")); + assertEquals(-1L, RetryAfterParser.parseDelay(" ")); + assertEquals(-1L, RetryAfterParser.parseDelay("not-a-date")); + assertEquals(-1L, RetryAfterParser.parseDelay("12.5")); + assertEquals(-1L, RetryAfterParser.parseDelay("-5")); + } + + @Test + void parseRetryAfterOverlongValueReturnsMinusOne() { + // an implausibly long header value is rejected without further parsing + String tooLong = "1".repeat(65); + assertEquals(-1L, RetryAfterParser.parseDelay(tooLong)); + } + + @Test + void parseRetryAfterParseOverflowReturnsMinusOne() { + // too large to fit in a long at all + assertEquals(-1L, RetryAfterParser.parseDelay("999999999999999999999999999")); + } + + @Test + void parseRetryAfterMillisecondOverflowReturnsMinusOne() { + // parses as a valid long, but * 1000 overflows a long (regression: was a + // silent overflow yielding a negative delay) + assertEquals(-1L, RetryAfterParser.parseDelay("3687950425865536959")); + assertEquals(-1L, RetryAfterParser.parseDelay(Long.toString(Long.MAX_VALUE))); + } + + @Test + void parseRetryAfterRandomSecondsRoundTrip() { + Random rnd = new Random(7); + for (int i = 0; i < 1_000; i++) { + long secs = rnd.nextInt(1_000_000); + assertEquals(secs * 1000L, RetryAfterParser.parseDelay(Long.toString(secs))); + } + } + + @Test + void parseRetryAfterNeverThrowsForRandomInput() { + Random rnd = new Random(42); + for (int i = 0; i < 10_000; i++) { + String input = randomRetryAfterValue(rnd); + long result = RetryAfterParser.parseDelay(input); + // contract: never throws, never returns anything below -1 + assertTrue(result >= -1L, "value " + result + " for input <" + input + ">"); + } + } + + /** + * Produces a wide spectrum of inputs: digit runs (including values that overflow a long or + * overflow when converted to ms), real HTTP dates (past and future), blanks, random printable + * garbage and nulls. + */ + private static String randomRetryAfterValue(Random rnd) { + switch (rnd.nextInt(5)) { + case 0: + int len = 1 + rnd.nextInt(40); + StringBuilder digits = new StringBuilder(); + for (int i = 0; i < len; i++) { + digits.append((char) ('0' + rnd.nextInt(10))); + } + return digits.toString(); + case 1: + long offset = (rnd.nextBoolean() ? 1L : -1L) * rnd.nextInt(1_000_000); + return HTTP_DATE.format(Instant.now().plusSeconds(offset)); + case 2: + return ""; + case 3: + int glen = rnd.nextInt(30); + StringBuilder garbage = new StringBuilder(); + for (int i = 0; i < glen; i++) { + garbage.append((char) (32 + rnd.nextInt(95))); + } + return garbage.toString(); + default: + return null; + } + } +} diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java index d747aafe2..67f5cdf47 100644 --- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/Constants.java @@ -43,4 +43,12 @@ private Constants() {} public static final String URLFRONTIER_UPDATER_MAX_MESSAGES_KEY = "urlfrontier.updater.max.messages"; public static final String URLFRONTIER_CRAWL_ID_KEY = "urlfrontier.crawlid"; + + /** + * Maximum delay in seconds honoured when a server requests a back-off via the Retry-After HTTP + * response header. {@code -1} disables the cap. Defaults to 86400 (24h). + */ + public static final String URLFRONTIER_MAX_RETRY_AFTER_KEY = "urlfrontier.max.retry.after"; + + public static final int URLFRONTIER_MAX_RETRY_AFTER_DEFAULT = 86400; } diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java new file mode 100644 index 000000000..b95cb3c45 --- /dev/null +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/HostBlockBolt.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.urlfrontier; + +import crawlercommons.urlfrontier.CrawlID; +import crawlercommons.urlfrontier.URLFrontierGrpc; +import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierStub; +import crawlercommons.urlfrontier.Urlfrontier.BlockQueueParams; +import crawlercommons.urlfrontier.Urlfrontier.Empty; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import java.util.Map; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.protocol.ProtocolResponse; +import org.apache.stormcrawler.util.ConfUtils; +import org.apache.stormcrawler.util.RetryAfterParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Consumes the {@code queue} stream emitted by the status updater and blocks a queue in URLFrontier + * via {@code blockQueueUntil} whenever the tuple's metadata reports a rate-limit response (HTTP 429 + * or 503) carrying a Retry-After + * header. See issues #867 and #784. + * + *

Wire it with a fields grouping on {@code "key"} from the status updater's {@code queue} + * stream. The {@code "key"} is the frontier queue key derived from {@code partition.url.mode} (the + * same setting the frontier uses to assign queues), so the block targets the matching queue. The + * honoured delay is capped by {@code urlfrontier.max.retry.after} (in seconds, default 86400, -1 to + * disable the cap). Connects via {@code urlfrontier.host} / {@code urlfrontier.port}; multi-node + * address resolution is not supported. + * + *

The block is fire-and-forget: a failed call is logged but the tuple is acked anyway. A missed + * block means the host is fetched once more and the next 429 re-emits the signal. + */ +public class HostBlockBolt extends BaseRichBolt { + + private static final Logger LOG = LoggerFactory.getLogger(HostBlockBolt.class); + + /** Metadata key set by the FetcherBolt with the HTTP status code of the fetch. */ + private static final String STATUS_CODE_KEY = "fetch.statusCode"; + + /** Name of the Retry-After HTTP header, lower-cased as stored by the protocol layer. */ + private static final String RETRY_AFTER_HEADER = "retry-after"; + + private static final StreamObserver NOOP_OBSERVER = + new StreamObserver<>() { + @Override + public void onNext(Empty value) {} + + @Override + public void onError(Throwable t) { + LOG.warn("blockQueueUntil failed", t); + } + + @Override + public void onCompleted() {} + }; + + private OutputCollector collector; + private ManagedChannel channel; + private URLFrontierStub frontier; + private String globalCrawlID; + + /** Metadata key holding the Retry-After header, including the protocol prefix. */ + private String retryAfterKey; + + /** Upper bound in ms for the honoured Retry-After delay; -1 means no cap. */ + private long maxRetryAfterMs; + + @Override + public void prepare(Map conf, TopologyContext context, OutputCollector c) { + this.collector = c; + this.globalCrawlID = + ConfUtils.getString(conf, Constants.URLFRONTIER_CRAWL_ID_KEY, CrawlID.DEFAULT); + // the protocol layer stores response headers in the metadata with this + // prefix; the FetcherBolt merges them into the status metadata + this.retryAfterKey = + ConfUtils.getString(conf, ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, "") + + RETRY_AFTER_HEADER; + long maxRetryAfterSecs = + ConfUtils.getLong( + conf, + Constants.URLFRONTIER_MAX_RETRY_AFTER_KEY, + Constants.URLFRONTIER_MAX_RETRY_AFTER_DEFAULT); + this.maxRetryAfterMs = maxRetryAfterSecs < 0 ? -1L : maxRetryAfterSecs * 1000L; + // build channel + stub as in Spout.prepare (host/port only) + this.channel = + ManagedChannelUtil.createChannel( + ConfUtils.getString( + conf, + Constants.URLFRONTIER_HOST_KEY, + Constants.URLFRONTIER_DEFAULT_HOST), + ConfUtils.getInt( + conf, + Constants.URLFRONTIER_PORT_KEY, + Constants.URLFRONTIER_DEFAULT_PORT)); + this.frontier = URLFrontierGrpc.newStub(channel).withWaitForReady(); + } + + @Override + public void execute(Tuple t) { + final String key = t.getStringByField("key"); + final Metadata metadata = (Metadata) t.getValueByField("metadata"); + final long blockUntil = + blockUntilFor(metadata, retryAfterKey, maxRetryAfterMs, System.currentTimeMillis()); + if (blockUntil > 0) { + LOG.debug("Blocking queue {} until {}", key, blockUntil); + BlockQueueParams params = + BlockQueueParams.newBuilder() + .setKey(key) + .setCrawlID(globalCrawlID) + .setTime(blockUntil) + .setLocal(false) + .build(); + frontier.blockQueueUntil(params, NOOP_OBSERVER); + } + collector.ack(t); + } + + /** + * Decides whether a queue-stream tuple carries a server-requested back-off worth enforcing. + * Only a rate-limit (429) or unavailable (503) response with a valid Retry-After header + * qualifies; the requested delay is capped by {@code maxRetryAfterMs} unless negative. + * + * @return the absolute time to block the queue until, in epoch seconds, or {@code -1} if the + * tuple does not call for a block + */ + static long blockUntilFor( + Metadata metadata, String retryAfterKey, long maxRetryAfterMs, long nowMs) { + if (metadata == null) { + return -1L; + } + final String statusCode = metadata.getFirstValue(STATUS_CODE_KEY); + // only on a rate-limit (429) or unavailable (503) response does + // Retry-After signal a host back-off worth acting on + if (!"429".equals(statusCode) && !"503".equals(statusCode)) { + return -1L; + } + long retryAfterMs = RetryAfterParser.parseDelay(metadata.getFirstValue(retryAfterKey)); + if (retryAfterMs <= 0) { + return -1L; + } + if (maxRetryAfterMs >= 0 && retryAfterMs > maxRetryAfterMs) { + retryAfterMs = maxRetryAfterMs; + } + return (nowMs + retryAfterMs) / 1000L; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // terminal bolt + } + + @Override + public void cleanup() { + if (channel != null) { + channel.shutdown(); + } + } +} diff --git a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java index 34ba3a4c9..5e5b3ca49 100644 --- a/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java +++ b/external/urlfrontier/src/main/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBolt.java @@ -59,6 +59,7 @@ import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; import org.apache.stormcrawler.Metadata; import org.apache.stormcrawler.metrics.CrawlerMetrics; import org.apache.stormcrawler.metrics.ScopedCounter; @@ -378,6 +379,12 @@ public void store( partitionKey = "_DEFAULT_"; } + // send a tuple on the queue stream in case a bolt + // wants to handle it + super.collector.emit( + org.apache.stormcrawler.Constants.QUEUE_STREAM_NAME, + new Values(partitionKey, metadata)); + final Map mdCopy = new HashMap<>(metadata.size()); for (String k : metadata.keySet()) { String[] vals = metadata.getValues(k); diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java new file mode 100644 index 000000000..bee5c9d13 --- /dev/null +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltDecisionTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.urlfrontier; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import org.apache.stormcrawler.Metadata; +import org.junit.jupiter.api.Test; + +/** + * Unit coverage for {@link HostBlockBolt#blockUntilFor(Metadata, String, long, long)} - the + * decision whether a queue-stream tuple carries a server-requested back-off worth enforcing. + */ +class HostBlockBoltDecisionTest { + + private static final String RETRY_AFTER_KEY = "protocol.retry-after"; + private static final long NOW_MS = 1_700_000_000_000L; + private static final long NO_CAP = -1L; + + private static Metadata md(String statusCode, String retryAfter) { + Metadata md = new Metadata(); + if (statusCode != null) { + md.setValue("fetch.statusCode", statusCode); + } + if (retryAfter != null) { + md.setValue(RETRY_AFTER_KEY, retryAfter); + } + return md; + } + + @Test + void blocksOn429WithRetryAfterSeconds() { + long blockUntil = + HostBlockBolt.blockUntilFor(md("429", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS); + assertEquals((NOW_MS + 120_000L) / 1000L, blockUntil); + } + + @Test + void blocksOn503WithRetryAfterSeconds() { + long blockUntil = + HostBlockBolt.blockUntilFor(md("503", "60"), RETRY_AFTER_KEY, NO_CAP, NOW_MS); + assertEquals((NOW_MS + 60_000L) / 1000L, blockUntil); + } + + @Test + void ignoresOtherStatusCodesEvenWithHeader() { + assertEquals( + -1L, + HostBlockBolt.blockUntilFor(md("200", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + assertEquals( + -1L, + HostBlockBolt.blockUntilFor(md("301", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + assertEquals( + -1L, + HostBlockBolt.blockUntilFor(md("403", "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void ignores429WithoutHeader() { + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md("429", null), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void ignoresMissingStatusCode() { + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md(null, "120"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void ignoresMalformedHeaderAndZeroDelay() { + assertEquals( + -1L, + HostBlockBolt.blockUntilFor( + md("429", "not-a-date"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + // a zero delay is not worth a frontier round-trip + assertEquals( + -1L, HostBlockBolt.blockUntilFor(md("429", "0"), RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } + + @Test + void capsDelayAtConfiguredMaximum() { + long capMs = 3_600_000L; // one hour + long blockUntil = + HostBlockBolt.blockUntilFor(md("429", "86400"), RETRY_AFTER_KEY, capMs, NOW_MS); + assertEquals((NOW_MS + capMs) / 1000L, blockUntil); + } + + @Test + void negativeCapMeansUncapped() { + long blockUntil = + HostBlockBolt.blockUntilFor(md("429", "86400"), RETRY_AFTER_KEY, NO_CAP, NOW_MS); + assertEquals((NOW_MS + 86_400_000L) / 1000L, blockUntil); + } + + @Test + void nullMetadataIsIgnored() { + assertEquals(-1L, HostBlockBolt.blockUntilFor(null, RETRY_AFTER_KEY, NO_CAP, NOW_MS)); + } +} diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java new file mode 100644 index 000000000..a3540579c --- /dev/null +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/HostBlockBoltTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.stormcrawler.urlfrontier; + +import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import crawlercommons.urlfrontier.CrawlID; +import crawlercommons.urlfrontier.URLFrontierGrpc; +import crawlercommons.urlfrontier.URLFrontierGrpc.URLFrontierBlockingStub; +import crawlercommons.urlfrontier.Urlfrontier.BlockQueueParams; +import crawlercommons.urlfrontier.Urlfrontier.GetParams; +import crawlercommons.urlfrontier.Urlfrontier.URLInfo; +import io.grpc.ManagedChannel; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.tuple.Tuple; +import org.apache.stormcrawler.Metadata; +import org.apache.stormcrawler.TestOutputCollector; +import org.apache.stormcrawler.TestUtil; +import org.apache.stormcrawler.persistence.Status; +import org.apache.stormcrawler.protocol.ProtocolResponse; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers(disabledWithoutDocker = true) +class HostBlockBoltTest { + + private static final String HOST = "example.com"; + + private URLFrontierContainer container; + private ManagedChannel channel; + private URLFrontierBlockingStub blocking; + + @BeforeEach + void before() { + String image = "crawlercommons/url-frontier"; + String version = System.getProperty("urlfrontier-version"); + if (version != null) { + image += ":" + version; + } + container = new URLFrontierContainer(image); + container.start(); + var connection = container.getFrontierConnection(); + channel = ManagedChannelUtil.createChannel(connection.getHost(), connection.getPort()); + blocking = URLFrontierGrpc.newBlockingStub(channel); + } + + @AfterEach + void after() { + if (channel != null) { + channel.shutdownNow(); + } + container.close(); + } + + private Map frontierConfig() { + var connection = container.getFrontierConnection(); + Map config = new HashMap<>(); + config.put(Constants.URLFRONTIER_HOST_KEY, connection.getHost()); + config.put(Constants.URLFRONTIER_PORT_KEY, connection.getPort()); + return config; + } + + /** Seeds one DISCOVERED url for {@link #HOST} so the queue exists and is fetchable. */ + private void seedOneUrl() { + StatusUpdaterBolt seeder = new StatusUpdaterBolt(); + TestOutputCollector out = new TestOutputCollector(); + Map config = frontierConfig(); + config.put( + "urlbuffer.class", "org.apache.stormcrawler.persistence.urlbuffer.SimpleURLBuffer"); + config.put("scheduler.class", "org.apache.stormcrawler.persistence.DefaultScheduler"); + config.put("status.updater.cache.spec", "maximumSize=10000,expireAfterAccess=1h"); + config.put("urlfrontier.updater.max.messages", 1); + config.put("urlfrontier.cache.expireafter.sec", 10); + seeder.prepare(config, TestUtil.getMockedTopologyContext(), new OutputCollector(out)); + + String url = "http://" + HOST + "/"; + Tuple tuple = mock(Tuple.class); + when(tuple.getValueByField("status")).thenReturn(Status.DISCOVERED); + when(tuple.getStringByField("url")).thenReturn(url); + when(tuple.getValueByField("metadata")).thenReturn(new Metadata()); + seeder.execute(tuple); + + await().atMost(20, TimeUnit.SECONDS) + .until( + () -> + out.getAckedTuples().stream() + .anyMatch(t -> url.equals(t.getStringByField("url")))); + seeder.cleanup(); + } + + /** Returns the set of queue keys currently handed out by getURLs. */ + private Set keysFromGetURLs() { + GetParams params = + GetParams.newBuilder() + .setCrawlID(CrawlID.DEFAULT) + .setMaxQueues(10) + .setMaxUrlsPerQueue(10) + .build(); + Set keys = new HashSet<>(); + Iterator it = blocking.getURLs(params); + while (it.hasNext()) { + keys.add(it.next().getKey()); + } + return keys; + } + + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + void blocksHostQueueUntilTime() { + seedOneUrl(); + + long nowSecs = System.currentTimeMillis() / 1000L; + + // block the host for an hour via the bolt under test: a queue-stream + // tuple whose metadata reports a 429 with a Retry-After of one hour + HostBlockBolt bolt = new HostBlockBolt(); + Map conf = frontierConfig(); + conf.put(ProtocolResponse.PROTOCOL_MD_PREFIX_PARAM, "protocol."); + bolt.prepare(conf, TestUtil.getMockedTopologyContext(), mock(OutputCollector.class)); + Tuple t = mock(Tuple.class); + when(t.getStringByField("key")).thenReturn(HOST); + Metadata md = new Metadata(); + md.setValue("fetch.statusCode", "429"); + md.setValue("protocol.retry-after", "3600"); + when(t.getValueByField("metadata")).thenReturn(md); + bolt.execute(t); + + // while blocked, getURLs must not return the host + await().atMost(15, TimeUnit.SECONDS).until(() -> !keysFromGetURLs().contains(HOST)); + + // unblock (a past time releases the queue) and the host becomes available again + blocking.blockQueueUntil( + BlockQueueParams.newBuilder() + .setKey(HOST) + .setCrawlID(CrawlID.DEFAULT) + .setTime(nowSecs - 1) + .build()); + await().atMost(15, TimeUnit.SECONDS).until(() -> keysFromGetURLs().contains(HOST)); + + bolt.cleanup(); + } +} diff --git a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java index 32d827e32..d092a8e1f 100644 --- a/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java +++ b/external/urlfrontier/src/test/java/org/apache/stormcrawler/urlfrontier/StatusUpdaterBoltTest.java @@ -157,6 +157,27 @@ void canAckSimpleTupleWithMetadata() Assertions.assertEquals(true, isAcked(url, 5)); } + @Test + @Timeout(value = 2, unit = TimeUnit.MINUTES) + void emitsKeyAndMetadataOnQueueStream() { + final var url = "https://www.url.net/something"; + final var meta = new Metadata(); + meta.setValue(persistedKey, "somePersistedMetaInfo"); + store(url, Status.DISCOVERED, meta); + Assertions.assertEquals(true, isAcked(url, 5)); + + // a (key, metadata) tuple goes out on the queue stream, as in the + // OpenSearch StatusUpdaterBolt (#1974) + var emitted = output.getEmitted(org.apache.stormcrawler.Constants.QUEUE_STREAM_NAME); + Assertions.assertEquals(1, emitted.size()); + var values = emitted.get(0); + Assertions.assertEquals(2, values.size()); + Assertions.assertEquals("www.url.net", values.get(0)); + Assertions.assertTrue(values.get(1) instanceof Metadata); + Assertions.assertEquals( + "somePersistedMetaInfo", ((Metadata) values.get(1)).getFirstValue(persistedKey)); + } + @Test @Timeout(value = 2, unit = TimeUnit.MINUTES) void exceedingMaxMessagesInFlightAfterFrontierRestart()