diff --git a/conf/nutch-default.xml b/conf/nutch-default.xml index cc0e8d4388..dd00848b7b 100644 --- a/conf/nutch-default.xml +++ b/conf/nutch-default.xml @@ -1503,6 +1503,14 @@ + + indexer.delete + false + Whether the indexer will delete documents that are gone. Gone pages include redirects and duplicates. + See also: 'link.delete.gone'. + + + indexer.indexwriters.file index-writers.xml @@ -1797,6 +1805,15 @@ CAUTION: Set the parser.timeout to -1 or a bigger value than 30, when using this + + parser.delete.failed.parse + false + Boolean value for whether we should delete a page from the index when parsing the page fails. + By default this property is deactivated, because it will delete an existing page from the index, where a + previous fetch produced content that was successfully parsed. + + + parser.store.text true @@ -2507,7 +2524,7 @@ CAUTION: Set the parser.timeout to -1 or a bigger value than 30, when using this link.delete.gone false - Whether to delete gone pages from the web graph. + Whether to delete gone pages from the web graph. Gone pages include redirects and duplicates. diff --git a/src/java/org/apache/nutch/crawl/CrawlDatum.java b/src/java/org/apache/nutch/crawl/CrawlDatum.java index c76fc0f336..14f134089e 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDatum.java +++ b/src/java/org/apache/nutch/crawl/CrawlDatum.java @@ -75,6 +75,8 @@ public class CrawlDatum implements WritableComparable, Cloneable { public static final byte STATUS_DB_DUPLICATE = 0x07; /** Page was marked as orphan, e.g. has no inlinks anymore */ public static final byte STATUS_DB_ORPHAN = 0x08; + /** Page parsing failed */ + public static final byte STATUS_DB_PARSE_FAILED = 0x09; /** Maximum value of DB-related status. */ public static final byte STATUS_DB_MAX = 0x1f; @@ -103,6 +105,8 @@ public class CrawlDatum implements WritableComparable, Cloneable { public static final byte STATUS_LINKED = 0x43; /** Page got metadata from a parser */ public static final byte STATUS_PARSE_META = 0x44; + /** Page parse failed */ + public static final byte STATUS_PARSE_FAILED = 0x45; public static final HashMap statNames = new HashMap<>(); static { @@ -114,6 +118,7 @@ public class CrawlDatum implements WritableComparable, Cloneable { statNames.put(STATUS_DB_NOTMODIFIED, "db_notmodified"); statNames.put(STATUS_DB_DUPLICATE, "db_duplicate"); statNames.put(STATUS_DB_ORPHAN, "db_orphan"); + statNames.put(STATUS_DB_PARSE_FAILED, "db_parse_failed"); statNames.put(STATUS_SIGNATURE, "signature"); statNames.put(STATUS_INJECTED, "injected"); statNames.put(STATUS_LINKED, "linked"); @@ -124,6 +129,7 @@ public class CrawlDatum implements WritableComparable, Cloneable { statNames.put(STATUS_FETCH_GONE, "fetch_gone"); statNames.put(STATUS_FETCH_NOTMODIFIED, "fetch_notmodified"); statNames.put(STATUS_PARSE_META, "parse_metadata"); + statNames.put(STATUS_PARSE_FAILED, "parse_failed"); oldToNew.put(OLD_STATUS_DB_UNFETCHED, STATUS_DB_UNFETCHED); oldToNew.put(OLD_STATUS_DB_FETCHED, STATUS_DB_FETCHED); @@ -144,16 +150,14 @@ public class CrawlDatum implements WritableComparable, Cloneable { private long modifiedTime; private org.apache.hadoop.io.MapWritable metaData; + /** Validate DB Status (ref.: CrawlDbReducer, IndexerReducer) */ public static boolean hasDbStatus(CrawlDatum datum) { - if (datum.status <= STATUS_DB_MAX) - return true; - return false; + return (datum.status <= STATUS_DB_MAX) || CrawlDatum.STATUS_DB_PARSE_FAILED == datum.getStatus(); } - + /** Validate Fetch Status (ref.: CrawlDbReducer, IndexerReducer, SegmentMergerReducer) */ public static boolean hasFetchStatus(CrawlDatum datum) { - if (datum.status > STATUS_DB_MAX && datum.status <= STATUS_FETCH_MAX) - return true; - return false; + return (datum.status > STATUS_DB_MAX && datum.status <= STATUS_FETCH_MAX) + || CrawlDatum.STATUS_PARSE_FAILED == datum.getStatus(); } public CrawlDatum() { diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java index 3454116575..cd553af174 100644 --- a/src/java/org/apache/nutch/crawl/CrawlDbReducer.java +++ b/src/java/org/apache/nutch/crawl/CrawlDbReducer.java @@ -130,7 +130,7 @@ public void reduce(Text key, Iterable values, } continue; } - + switch (datum.getStatus()) { // collect other info case CrawlDatum.STATUS_LINKED: CrawlDatum link; @@ -233,7 +233,7 @@ public void reduce(Text key, Iterable values, } break; - case CrawlDatum.STATUS_FETCH_SUCCESS: // succesful fetch + case CrawlDatum.STATUS_FETCH_SUCCESS: // successful fetch case CrawlDatum.STATUS_FETCH_REDIR_TEMP: // successful fetch, redirected case CrawlDatum.STATUS_FETCH_REDIR_PERM: case CrawlDatum.STATUS_FETCH_NOTMODIFIED: // successful fetch, notmodified @@ -320,6 +320,14 @@ public void reduce(Text key, Iterable values, } break; + case CrawlDatum.STATUS_PARSE_FAILED: // successful fetch, but parse failed + if (oldSet) + result.setSignature(old.getSignature()); // use old signature + result.setStatus(CrawlDatum.STATUS_DB_PARSE_FAILED); + result = schedule.setPageGoneSchedule(key, result, prevFetchTime, + prevModifiedTime, fetch.getFetchTime()); + break; + case CrawlDatum.STATUS_FETCH_GONE: // permanent failure if (oldSet) result.setSignature(old.getSignature()); // use old signature diff --git a/src/java/org/apache/nutch/fetcher/Fetcher.java b/src/java/org/apache/nutch/fetcher/Fetcher.java index 029d95ff7c..712c7a711e 100644 --- a/src/java/org/apache/nutch/fetcher/Fetcher.java +++ b/src/java/org/apache/nutch/fetcher/Fetcher.java @@ -577,11 +577,11 @@ public void fetch(Path segment, int threads) throws IOException, } catch (InterruptedException | ClassNotFoundException e) { LOG.error(StringUtils.stringifyException(e)); throw e; + } finally { + stopWatch.stop(); + LOG.info("Fetcher: finished, elapsed: {} ms", stopWatch.getTime( + TimeUnit.MILLISECONDS)); } - - stopWatch.stop(); - LOG.info("Fetcher: finished, elapsed: {} ms", stopWatch.getTime( - TimeUnit.MILLISECONDS)); } /** diff --git a/src/java/org/apache/nutch/fetcher/FetcherThread.java b/src/java/org/apache/nutch/fetcher/FetcherThread.java index e3ee092b44..ce451d03a2 100644 --- a/src/java/org/apache/nutch/fetcher/FetcherThread.java +++ b/src/java/org/apache/nutch/fetcher/FetcherThread.java @@ -112,6 +112,7 @@ public class FetcherThread extends Thread { URLNormalizers normalizersForOutlinks; private boolean skipTruncated; + private boolean deleteFailedParse; private boolean halted = false; @@ -181,6 +182,7 @@ public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQ this.scfilters = new ScoringFilters(conf); this.parseUtil = new ParseUtil(conf); this.skipTruncated = conf.getBoolean(ParseSegment.SKIP_TRUNCATED, true); + this.deleteFailedParse = conf.getBoolean(ParseSegment.DELETE_FAILED_PARSE, false); this.signatureWithoutParsing = conf.getBoolean("fetcher.signature", false); this.protocolFactory = new ProtocolFactory(conf); this.normalizers = new URLNormalizers(conf, URLNormalizers.SCOPE_FETCHER); @@ -221,7 +223,7 @@ public FetcherThread(Configuration conf, AtomicInteger activeThreads, FetchItemQ .getInt("http.robots.503.defer.visits.retries", 3); } - if((activatePublisher=conf.getBoolean("fetcher.publisher", false))) + if ((activatePublisher = conf.getBoolean("fetcher.publisher", false))) this.publisher = new FetcherThreadPublisher(conf); queueMode = conf.get("fetcher.queue.mode", @@ -442,7 +444,7 @@ public void run() { case ProtocolStatus.SUCCESS: // got a page pstatus = output(fit.url, fit.datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS, fit.outlinkDepth); - updateStatus(content.getContent().length); + updateStatus(content.getContent() != null ? content.getContent().length : 0); if (pstatus != null && pstatus.isSuccess() && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) { String newUrl = pstatus.getMessage(); @@ -731,14 +733,18 @@ private ParseStatus output(Text key, CrawlDatum datum, Content content, .calculate(content, new ParseStatus().getEmptyParse(conf)); datum.setSignature(signature); } + + if (parseResult == null && parsing && deleteFailedParse) { + datum.setStatus(CrawlDatum.STATUS_PARSE_FAILED); + status = CrawlDatum.STATUS_PARSE_FAILED; + } } /* * Store status code in content So we can read this value during parsing * (as a separate job) and decide to parse or not. */ - content.getMetadata().add(Nutch.FETCH_STATUS_KEY, - Integer.toString(status)); + content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status)); } try { @@ -756,6 +762,10 @@ private ParseStatus output(Text key, CrawlDatum datum, Content content, LOG.warn("{} {} Error parsing: {}: {}", getName(), Thread.currentThread().getId(), key, parseStatus); parse = parseStatus.getEmptyParse(conf); + if (deleteFailedParse && content != null) { + // forward the failure status in the content + content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(CrawlDatum.STATUS_PARSE_FAILED)); + } } // Calculate page signature. For non-parsing fetchers this will diff --git a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java index 50da12b8a2..14ef3de678 100644 --- a/src/java/org/apache/nutch/indexer/IndexerMapReduce.java +++ b/src/java/org/apache/nutch/indexer/IndexerMapReduce.java @@ -51,6 +51,7 @@ import org.apache.nutch.parse.Parse; import org.apache.nutch.parse.ParseData; import org.apache.nutch.parse.ParseImpl; +import org.apache.nutch.parse.ParseSegment; import org.apache.nutch.parse.ParseText; import org.apache.nutch.protocol.Content; import org.apache.nutch.scoring.ScoringFilterException; @@ -206,6 +207,7 @@ public static class IndexerReducer extends private boolean delete = false; private boolean deleteRobotsNoIndex = false; private boolean deleteSkippedByIndexingFilter = false; + private boolean deleteFailedParse = false; private boolean base64 = false; private IndexingFilters filters; private ScoringFilters scfilters; @@ -226,6 +228,7 @@ public static class IndexerReducer extends private Counter deletedGoneCounter; private Counter deletedRedirectsCounter; private Counter deletedDuplicatesCounter; + private Counter deletedFailedParseCounter; private Counter skippedNotModifiedCounter; private Counter deletedByIndexingFilterCounter; private Counter skippedByIndexingFilterCounter; @@ -244,6 +247,7 @@ public void setup(Reducer.Context c false); deleteSkippedByIndexingFilter = conf.getBoolean(INDEXER_DELETE_SKIPPED, false); + deleteFailedParse = conf.getBoolean(ParseSegment.DELETE_FAILED_PARSE, false); skip = conf.getBoolean(INDEXER_SKIP_NOTMODIFIED, false); base64 = conf.getBoolean(INDEXER_BINARY_AS_BASE64, false); @@ -279,6 +283,8 @@ private void initCounters(Reducer.C NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_DELETED_REDIRECTS_TOTAL); deletedDuplicatesCounter = context.getCounter( NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_DELETED_DUPLICATES_TOTAL); + deletedFailedParseCounter = context.getCounter( + NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_DELETED_FAILED_PARSE_TOTAL); skippedNotModifiedCounter = context.getCounter( NutchMetrics.GROUP_INDEXER, NutchMetrics.INDEXER_SKIPPED_NOT_MODIFIED_TOTAL); deletedByIndexingFilterCounter = context.getCounter( @@ -354,6 +360,15 @@ public void reduce(Text key, Iterable values, } } + // Whether to delete pages where parsing failed + if (deleteFailedParse && fetchDatum != null) { + if (fetchDatum.getStatus() == CrawlDatum.STATUS_PARSE_FAILED + || dbDatum != null && dbDatum.getStatus() == CrawlDatum.STATUS_DB_PARSE_FAILED) { + deletedFailedParseCounter.increment(1); + context.write(key, DELETE_ACTION); + return; + } + } // Whether to delete GONE or REDIRECTS if (delete && fetchDatum != null) { if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE diff --git a/src/java/org/apache/nutch/metadata/Nutch.java b/src/java/org/apache/nutch/metadata/Nutch.java index ce4cf519c3..a229666cf9 100644 --- a/src/java/org/apache/nutch/metadata/Nutch.java +++ b/src/java/org/apache/nutch/metadata/Nutch.java @@ -26,31 +26,31 @@ */ public interface Nutch { - public static final String ORIGINAL_CHAR_ENCODING = "OriginalCharEncoding"; + public static final String ORIGINAL_CHAR_ENCODING = "OriginalCharEncoding"; - public static final String CHAR_ENCODING_FOR_CONVERSION = "CharEncodingForConversion"; + public static final String CHAR_ENCODING_FOR_CONVERSION = "CharEncodingForConversion"; - public static final String SIGNATURE_KEY = "nutch.content.digest"; + public static final String SIGNATURE_KEY = "nutch.content.digest"; - public static final String SEGMENT_NAME_KEY = "nutch.segment.name"; + public static final String SEGMENT_NAME_KEY = "nutch.segment.name"; - public static final String SCORE_KEY = "nutch.crawl.score"; + public static final String SCORE_KEY = "nutch.crawl.score"; - public static final String GENERATE_TIME_KEY = "_ngt_"; + public static final String GENERATE_TIME_KEY = "_ngt_"; - public static final Text WRITABLE_GENERATE_TIME_KEY = new Text( - GENERATE_TIME_KEY); + public static final Text WRITABLE_GENERATE_TIME_KEY = new Text( + GENERATE_TIME_KEY); - public static final Text PROTOCOL_STATUS_CODE_KEY = new Text("nutch.protocol.code"); + public static final Text PROTOCOL_STATUS_CODE_KEY = new Text("nutch.protocol.code"); - public static final String PROTO_STATUS_KEY = "_pst_"; + public static final String PROTO_STATUS_KEY = "_pst_"; - public static final Text WRITABLE_PROTO_STATUS_KEY = new Text( - PROTO_STATUS_KEY); + public static final Text WRITABLE_PROTO_STATUS_KEY = new Text( + PROTO_STATUS_KEY); - public static final String FETCH_TIME_KEY = "_ftk_"; + public static final String FETCH_TIME_KEY = "_ftk_"; - public static final String FETCH_STATUS_KEY = "_fst_"; + public static final String FETCH_STATUS_KEY = "_fst_"; /** * Name to store the robots @@ -58,30 +58,30 @@ public interface Nutch { */ public static final String ROBOTS_METATAG = "robots"; - /** - * Sites may request that search engines don't provide access to cached - * documents. - */ - public static final String CACHING_FORBIDDEN_KEY = "caching.forbidden"; + /** + * Sites may request that search engines don't provide access to cached + * documents. + */ + public static final String CACHING_FORBIDDEN_KEY = "caching.forbidden"; - /** Show both original forbidden content and summaries (default). */ - public static final String CACHING_FORBIDDEN_NONE = "none"; + /** Show both original forbidden content and summaries (default). */ + public static final String CACHING_FORBIDDEN_NONE = "none"; - /** Don't show either original forbidden content or summaries. */ - public static final String CACHING_FORBIDDEN_ALL = "all"; + /** Don't show either original forbidden content or summaries. */ + public static final String CACHING_FORBIDDEN_ALL = "all"; - /** Don't show original forbidden content, but show summaries. */ - public static final String CACHING_FORBIDDEN_CONTENT = "content"; + /** Don't show original forbidden content, but show summaries. */ + public static final String CACHING_FORBIDDEN_CONTENT = "content"; - public static final String REPR_URL_KEY = "_repr_"; + public static final String REPR_URL_KEY = "_repr_"; - public static final Text WRITABLE_REPR_URL_KEY = new Text(REPR_URL_KEY); + public static final Text WRITABLE_REPR_URL_KEY = new Text(REPR_URL_KEY); - /** Used by AdaptiveFetchSchedule to maintain custom fetch interval */ - public static final String FIXED_INTERVAL_KEY = "fixedInterval"; + /** Used by AdaptiveFetchSchedule to maintain custom fetch interval */ + public static final String FIXED_INTERVAL_KEY = "fixedInterval"; - public static final Text WRITABLE_FIXED_INTERVAL_KEY = new Text( - FIXED_INTERVAL_KEY); + public static final Text WRITABLE_FIXED_INTERVAL_KEY = new Text( + FIXED_INTERVAL_KEY); /** For progress of job (programmatic / tooling). */ public static final String STAT_PROGRESS = "progress"; diff --git a/src/java/org/apache/nutch/metrics/NutchMetrics.java b/src/java/org/apache/nutch/metrics/NutchMetrics.java index ef4fe79e57..32767c04e3 100644 --- a/src/java/org/apache/nutch/metrics/NutchMetrics.java +++ b/src/java/org/apache/nutch/metrics/NutchMetrics.java @@ -188,6 +188,9 @@ private NutchMetrics() { /** Documents deleted as duplicates. */ public static final String INDEXER_DELETED_DUPLICATES_TOTAL = "deleted_duplicates_total"; + /** Documents deleted because parsing failed. */ + public static final String INDEXER_DELETED_FAILED_PARSE_TOTAL = "deleted_failed_parse_total"; + /** Documents deleted by indexing filter. */ public static final String INDEXER_DELETED_BY_INDEXING_FILTER_TOTAL = "deleted_by_indexing_filter_total"; diff --git a/src/java/org/apache/nutch/parse/ParseSegment.java b/src/java/org/apache/nutch/parse/ParseSegment.java index f4cb5e17b6..7c6465baff 100644 --- a/src/java/org/apache/nutch/parse/ParseSegment.java +++ b/src/java/org/apache/nutch/parse/ParseSegment.java @@ -68,6 +68,12 @@ public class ParseSegment extends NutchTool implements Tool { public static final String SKIP_TRUNCATED = "parser.skip.truncated"; + /** + * Configuration property to delete documents that failed to be parsed. Also used + * in Fetcher and Indexer. + */ + public static final String DELETE_FAILED_PARSE = "parser.delete.failed.parse"; + public ParseSegment() { this(null); } @@ -83,6 +89,7 @@ public static class ParseSegmentMapper extends private Text newKey = new Text(); private ScoringFilters scfilters; private boolean skipTruncated; + private boolean deleteFailedParse; private LatencyTracker parseLatencyTracker; private ErrorTracker errorTracker; @@ -91,6 +98,7 @@ public void setup(Mapper, Content, Text, ParseImpl>.Contex Configuration conf = context.getConfiguration(); scfilters = new ScoringFilters(conf); skipTruncated = conf.getBoolean(SKIP_TRUNCATED, true); + deleteFailedParse = conf.getBoolean(DELETE_FAILED_PARSE, false); parseLatencyTracker = new LatencyTracker( NutchMetrics.GROUP_PARSER, NutchMetrics.PARSER_LATENCY); // Initialize error tracker with cached counters @@ -119,6 +127,9 @@ public void map(WritableComparable key, Content content, // no fetch status, skip document LOG.debug("Skipping {} as content has no fetch status", key); return; + } else if(deleteFailedParse && Integer.parseInt(fetchStatus) == CrawlDatum.STATUS_PARSE_FAILED) { + LOG.debug("Skipping {} as un-parseable content will be deleted", key); + return; } else if (Integer.parseInt(fetchStatus) != CrawlDatum.STATUS_FETCH_SUCCESS) { // content not fetched successfully, skip document LOG.debug("Skipping {} as content is not fetched successfully", key); @@ -126,6 +137,7 @@ public void map(WritableComparable key, Content content, } if (skipTruncated && isTruncated(content)) { + LOG.debug("Skipping {} as content is truncated", key); return; } diff --git a/src/java/org/apache/nutch/parse/ParseStatus.java b/src/java/org/apache/nutch/parse/ParseStatus.java index 25b8ae1b47..5d4e846800 100644 --- a/src/java/org/apache/nutch/parse/ParseStatus.java +++ b/src/java/org/apache/nutch/parse/ParseStatus.java @@ -188,6 +188,10 @@ public void write(DataOutput out) throws IOException { public boolean isSuccess() { return majorCode == SUCCESS; } + + public boolean isFailed() { + return majorCode == FAILED; + } /** * @return a String representation of the first argument, diff --git a/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java b/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java index d11634bbd1..547c373f1f 100644 --- a/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java +++ b/src/test/org/apache/nutch/crawl/CrawlDBTestUtil.java @@ -500,4 +500,33 @@ private static String probeContentType(java.nio.file.Path file) String ct = Files.probeContentType(file); return ct != null ? ct : "application/octet-stream"; } + + /** + * Creates a new JettyServer with one static root context and the provided resource handler. + * + * @param port + * port to listen to + * @param staticContent + * folder where static content lives + * @param resourceHandler + * resource handler to override the default behavior if needed. + * @return configured Jetty server instance + * @throws UnknownHostException + */ + @NonNull + public static Server getServer(int port, @NonNull String staticContent, ResourceHandler resourceHandler) + throws UnknownHostException { + Server webServer = new Server(); + + ServerConnector listener = new ServerConnector(webServer); + listener.setPort(port); + listener.setHost("127.0.0.1"); + webServer.addConnector(listener); + ContextHandler staticContext = new ContextHandler(); + staticContext.setContextPath("/"); + staticContext.setResourceBase(staticContent); + staticContext.insertHandler(resourceHandler); + webServer.insertHandler(staticContext); + return webServer; + } } diff --git a/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java b/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java index 34743811a1..217039767c 100644 --- a/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java +++ b/src/test/org/apache/nutch/crawl/TestCrawlDbStates.java @@ -21,9 +21,11 @@ import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.util.StringUtils; +import org.apache.nutch.parse.ParseSegment; import org.apache.nutch.scoring.ScoringFilterException; import org.apache.nutch.scoring.ScoringFilters; import org.apache.nutch.util.ReducerContextWrapper; +import org.apache.nutch.util.TimingUtil; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -323,7 +325,149 @@ public void testCrawlDbReducerNotModified() { } } - protected class CrawlTestFetchNotModified extends ContinuousCrawlTestUtil { + + /** + * NUTCH-1245: a fetch_gone should always result in a db_gone. + *

+ * Even in a long-running continuous crawl, when a gone page is re-fetched + * several times over time. + *

+ */ + @Test + public void testCrawlDbReducerPageGoneSchedule1() { + LOG.info("NUTCH-1245: test long running continuous crawl"); + ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil( + STATUS_FETCH_GONE, STATUS_DB_GONE); + try { + if (!crawlUtil.run(20)) { + fail("fetch_gone did not result in a db_gone (NUTCH-1245)"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * NUTCH-1245: a fetch_gone should always result in a db_gone. + *

+ * As some kind of misconfiguration set db.fetch.interval.default to a value + * > (fetchIntervalMax * 1.5). + *

+ */ + @Test + public void testCrawlDbReducerPageGoneSchedule2() { + LOG.info("NUTCH-1245 (misconfiguration): test with db.fetch.interval.default > (1.5 * db.fetch.interval.max)"); + Context context = CrawlDBTestUtil.createContext(); + Configuration conf = context.getConfiguration(); + int fetchIntervalMax = conf.getInt("db.fetch.interval.max", 0); + conf.setInt("db.fetch.interval.default", 3 + (int) (fetchIntervalMax * 1.5)); + ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil(context, + STATUS_FETCH_GONE, STATUS_DB_GONE); + try { + if (!crawlUtil.run(0)) { + fail("fetch_gone did not result in a db_gone (NUTCH-1245)"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + + @Test + public void testCrawlDbReducerParseFailed() { + LOG.info("NUTCH-1732: allow deleting un-parsable documents"); + Context context = CrawlDBTestUtil.createContext(); + Configuration conf = context.getConfiguration(); + conf.setBoolean(ParseSegment.DELETE_FAILED_PARSE, true); + CrawlTestParserFailure crawlUtil = new CrawlTestParserFailure(context); + try { + if (!crawlUtil.run(20)) { + fail("parse failure did not result in a parse_fail (NUTCH-1732)"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * Test whether signatures are reset for "content-less" states (gone, + * redirect, etc.): otherwise, if this state is temporary and the document + * appears again with the old content, it may get marked as not_modified in + * CrawlDb just after the redirect state. In this case we cannot expect + * content in segments. Cf. NUTCH-1422: reset signature for redirects. + */ + // TODO: can only test if solution is done in CrawlDbReducer + @Test + public void testSignatureReset() { + LOG.info("NUTCH-1422 must reset signature for redirects and similar states"); + Context context = CrawlDBTestUtil.createContext(); + Configuration conf = context.getConfiguration(); + for (String sched : schedules) { + LOG.info("Testing reset signature with {}", sched); + conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl." + sched); + ContinuousCrawlTestUtil crawlUtil = new CrawlTestSignatureReset(context); + try { + if (!crawlUtil.run(20)) { + fail("failed: signature not reset"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + /** + * NUTCH-578: a fetch_retry should result in a db_gone if db.fetch.retry.max + * is reached. Retry counter has to be reset appropriately. + */ + @Test + public void testCrawlDbReducerPageRetrySchedule() { + LOG.info("NUTCH-578: test long running continuous crawl with fetch_retry"); + ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestFetchRetry(); + // keep going for long, to "provoke" a retry counter overflow + try { + if (!crawlUtil.run(150)) { + fail("fetch_retry did not result in a db_gone if retry counter > maxRetries (NUTCH-578)"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + /** + * NUTCH-1564 AdaptiveFetchSchedule: sync_delta forces immediate re-fetch for + * documents not modified + *

+ * Problem: documents not modified for a longer time are fetched in every + * cycle because of an error in the SYNC_DELTA calculation of + * {@link AdaptiveFetchSchedule}.
+ * The next fetch time should always be in the future, never in the past. + *

+ */ + @Test + public void testAdaptiveFetchScheduleSyncDelta() { + LOG.info("NUTCH-1564 test SYNC_DELTA calculation of AdaptiveFetchSchedule"); + Context context = CrawlDBTestUtil.createContext(); + Configuration conf = context.getConfiguration(); + conf.setLong("db.fetch.interval.default", 172800); // 2 days + conf.setLong("db.fetch.schedule.adaptive.min_interval", 86400); // 1 day + conf.setLong("db.fetch.schedule.adaptive.max_interval", 604800); // 7 days + conf.setLong("db.fetch.interval.max", 604800); // 7 days + conf.set("db.fetch.schedule.class", + "org.apache.nutch.crawl.AdaptiveFetchSchedule"); + ContinuousCrawlTestUtil crawlUtil = new CrawlTestFetchScheduleNotModifiedFetchTime( + context); + crawlUtil.setInterval(FetchSchedule.SECONDS_PER_DAY / 3); + try { + if (!crawlUtil.run(100)) { + fail("failed: sync_delta calculation with AdaptiveFetchSchedule"); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private class CrawlTestFetchNotModified extends ContinuousCrawlTestUtil { /** time of the current fetch */ protected long currFetchTime; @@ -451,7 +595,7 @@ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { } } - protected class CrawlTestFetchNotModifiedHttp304 extends + private class CrawlTestFetchNotModifiedHttp304 extends CrawlTestFetchNotModified { CrawlTestFetchNotModifiedHttp304(Context context) { @@ -487,79 +631,6 @@ protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { } } - /** - * NUTCH-1245: a fetch_gone should always result in a db_gone. - *

- * Even in a long-running continuous crawl, when a gone page is re-fetched - * several times over time. - *

- */ - @Test - public void testCrawlDbReducerPageGoneSchedule1() { - LOG.info("NUTCH-1245: test long running continuous crawl"); - ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil( - STATUS_FETCH_GONE, STATUS_DB_GONE); - try { - if (!crawlUtil.run(20)) { - fail("fetch_gone did not result in a db_gone (NUTCH-1245)"); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * NUTCH-1245: a fetch_gone should always result in a db_gone. - *

- * As some kind of misconfiguration set db.fetch.interval.default to a value - * > (fetchIntervalMax * 1.5). - *

- */ - @Test - public void testCrawlDbReducerPageGoneSchedule2() { - LOG.info("NUTCH-1245 (misconfiguration): test with db.fetch.interval.default > (1.5 * db.fetch.interval.max)"); - Context context = CrawlDBTestUtil.createContext(); - Configuration conf = context.getConfiguration(); - int fetchIntervalMax = conf.getInt("db.fetch.interval.max", 0); - conf.setInt("db.fetch.interval.default", 3 + (int) (fetchIntervalMax * 1.5)); - ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestUtil(context, - STATUS_FETCH_GONE, STATUS_DB_GONE); - try { - if (!crawlUtil.run(0)) { - fail("fetch_gone did not result in a db_gone (NUTCH-1245)"); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - /** - * Test whether signatures are reset for "content-less" states (gone, - * redirect, etc.): otherwise, if this state is temporary and the document - * appears again with the old content, it may get marked as not_modified in - * CrawlDb just after the redirect state. In this case we cannot expect - * content in segments. Cf. NUTCH-1422: reset signature for redirects. - */ - // TODO: can only test if solution is done in CrawlDbReducer - @Test - public void testSignatureReset() { - LOG.info("NUTCH-1422 must reset signature for redirects and similar states"); - Context context = CrawlDBTestUtil.createContext(); - Configuration conf = context.getConfiguration(); - for (String sched : schedules) { - LOG.info("Testing reset signature with {}", sched); - conf.set("db.fetch.schedule.class", "org.apache.nutch.crawl." + sched); - ContinuousCrawlTestUtil crawlUtil = new CrawlTestSignatureReset(context); - try { - if (!crawlUtil.run(20)) { - fail("failed: signature not reset"); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - } - private class CrawlTestSignatureReset extends ContinuousCrawlTestUtil { byte[][] noContentStates = { { STATUS_FETCH_GONE, STATUS_DB_GONE }, @@ -607,4 +678,166 @@ protected boolean check(CrawlDatum result) { } + + private class ContinuousCrawlTestFetchRetry extends ContinuousCrawlTestUtil { + + private int retryMax = 3; + private int totalRetries = 0; + + ContinuousCrawlTestFetchRetry() { + super(); + fetchStatus = STATUS_FETCH_RETRY; + retryMax = context.getConfiguration().getInt("db.fetch.retry.max", retryMax); + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + datum.setStatus(fetchStatus); + datum.setFetchTime(currentTime); + totalRetries++; + return datum; + } + + @Override + protected boolean check(CrawlDatum result) { + if (result.getRetriesSinceFetch() > retryMax) { + LOG.warn("Retry counter > db.fetch.retry.max: {}", result); + } else if (result.getRetriesSinceFetch() == Byte.MAX_VALUE) { + LOG.warn("Retry counter max. value reached (overflow imminent): {}", result); + } else if (result.getRetriesSinceFetch() < 0) { + LOG.error("Retry counter overflow: {}", result); + return false; + } + // use retry counter bound to this class (totalRetries) + // instead of result.getRetriesSinceFetch() because the retry counter + // in CrawlDatum could be reset (eg. NUTCH-578_v5.patch) + if (totalRetries < retryMax) { + if (result.getStatus() == STATUS_DB_UNFETCHED) { + LOG.info("ok: {}", result); + result.getRetriesSinceFetch(); + return true; + } + } else { + if (result.getStatus() == STATUS_DB_GONE) { + LOG.info("ok: {}", result); + return true; + } + } + LOG.warn("wrong: {}", result); + return false; + } + + } + + private class CrawlTestFetchScheduleNotModifiedFetchTime extends + CrawlTestFetchNotModified { + + // time of current fetch + private long fetchTime; + + private long minInterval; + private long maxInterval; + + CrawlTestFetchScheduleNotModifiedFetchTime(Context context) { + super(context); + Configuration conf = context.getConfiguration(); + minInterval = conf.getLong("db.fetch.schedule.adaptive.min_interval", + 86400); // 1 day + maxInterval = conf.getLong("db.fetch.schedule.adaptive.max_interval", + 604800); // 7 days + if (conf.getLong("db.fetch.interval.max", 604800) < maxInterval) { + maxInterval = conf.getLong("db.fetch.interval.max", 604800); + } + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + // remember time of fetching + fetchTime = currentTime; + return super.fetch(datum, currentTime); + } + + @Override + protected boolean check(CrawlDatum result) { + if (result.getStatus() == STATUS_DB_NOTMODIFIED) { + // check only status notmodified here + long secondsUntilNextFetch = (result.getFetchTime() - fetchTime) / 1000L; + if (secondsUntilNextFetch < -1) { + // next fetch time is in the past (more than one second) + LOG.error("Next fetch time is in the past: {}", result); + return false; + } + if (secondsUntilNextFetch < 60) { + // next fetch time is in less than one minute + // (critical: Nutch can hardly be so fast) + LOG.error("Less then one minute until next fetch: {}", result); + } + // Next fetch time should be within min. and max. (tolerance: 60 sec.) + if (secondsUntilNextFetch + 60 < minInterval + || secondsUntilNextFetch - 60 > maxInterval) { + LOG.error( + "Interval until next fetch time ({}) is not within min. and max. interval: {}", + TimingUtil.elapsedTime(fetchTime, result.getFetchTime()), result); + // TODO: is this a failure? + } + } + return true; + } + + } + + private class CrawlTestParserFailure extends ContinuousCrawlTestUtil { + + int counter = 0; + boolean failing = false; + + public CrawlTestParserFailure(Context context) { + super(context); + } + + @Override + protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { + counter++; + if(counter % 2 == 0) { + failing = true; + // STATUS_PARSE_FAILED is normally set in FetcherThread.run from a fetch success + // This test is for CrawlDbReducer, after fetch + datum.setStatus(STATUS_PARSE_FAILED); + LOG.info("expect parse failed"); + } else { + failing = false; + datum.setStatus(STATUS_FETCH_SUCCESS); + LOG.info("expect fetch success"); + } + datum.setFetchTime(currentTime); + return datum; + } + + @Override + protected List parse(CrawlDatum fetchDatum) { + List parseDatums = new ArrayList(0); + if (failing){ + LOG.info("set parse failed"); + parseDatums.add(new CrawlDatum(STATUS_PARSE_FAILED, 0)); + } else { + LOG.info("set signature"); + CrawlDatum signed = new CrawlDatum(STATUS_SIGNATURE, 0); + signed.setSignature(getSignature()); + parseDatums.add(signed); + } + return parseDatums; + } + + @Override + protected boolean check(CrawlDatum result) { + if (failing) { + LOG.info("check parse failed"); + return result.getStatus() == STATUS_DB_PARSE_FAILED; + } else { + LOG.info("check fetched"); + return result.getStatus() == STATUS_DB_FETCHED || result.getStatus() == STATUS_DB_NOTMODIFIED; + } + } + } + } diff --git a/src/test/org/apache/nutch/crawl/TestCrawlDbStatesExtended.java b/src/test/org/apache/nutch/crawl/TestCrawlDbStatesExtended.java deleted file mode 100644 index 2e6ea55af1..0000000000 --- a/src/test/org/apache/nutch/crawl/TestCrawlDbStatesExtended.java +++ /dev/null @@ -1,195 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nutch.crawl; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.Reducer.Context; -import org.apache.nutch.util.TimingUtil; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.lang.invoke.MethodHandles; - -import static org.apache.nutch.crawl.CrawlDatum.*; -import static org.junit.jupiter.api.Assertions.fail; - -public class TestCrawlDbStatesExtended extends TestCrawlDbStates { - - private static final Logger LOG = LoggerFactory - .getLogger(MethodHandles.lookup().lookupClass()); - - /** - * NUTCH-578: a fetch_retry should result in a db_gone if db.fetch.retry.max - * is reached. Retry counter has to be reset appropriately. - */ - @Test - public void testCrawlDbReducerPageRetrySchedule() { - LOG.info("NUTCH-578: test long running continuous crawl with fetch_retry"); - ContinuousCrawlTestUtil crawlUtil = new ContinuousCrawlTestFetchRetry(); - // keep going for long, to "provoke" a retry counter overflow - try { - if (!crawlUtil.run(150)) { - fail("fetch_retry did not result in a db_gone if retry counter > maxRetries (NUTCH-578)"); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - private class ContinuousCrawlTestFetchRetry extends ContinuousCrawlTestUtil { - - private int retryMax = 3; - private int totalRetries = 0; - - ContinuousCrawlTestFetchRetry() { - super(); - fetchStatus = STATUS_FETCH_RETRY; - retryMax = context.getConfiguration().getInt("db.fetch.retry.max", retryMax); - } - - @Override - protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { - datum.setStatus(fetchStatus); - datum.setFetchTime(currentTime); - totalRetries++; - return datum; - } - - @Override - protected boolean check(CrawlDatum result) { - if (result.getRetriesSinceFetch() > retryMax) { - LOG.warn("Retry counter > db.fetch.retry.max: {}", result); - } else if (result.getRetriesSinceFetch() == Byte.MAX_VALUE) { - LOG.warn("Retry counter max. value reached (overflow imminent): {}", result); - } else if (result.getRetriesSinceFetch() < 0) { - LOG.error("Retry counter overflow: {}", result); - return false; - } - // use retry counter bound to this class (totalRetries) - // instead of result.getRetriesSinceFetch() because the retry counter - // in CrawlDatum could be reset (eg. NUTCH-578_v5.patch) - if (totalRetries < retryMax) { - if (result.getStatus() == STATUS_DB_UNFETCHED) { - LOG.info("ok: {}", result); - result.getRetriesSinceFetch(); - return true; - } - } else { - if (result.getStatus() == STATUS_DB_GONE) { - LOG.info("ok: {}", result); - return true; - } - } - LOG.warn("wrong: {}", result); - return false; - } - - } - - /** - * NUTCH-1564 AdaptiveFetchSchedule: sync_delta forces immediate re-fetch for - * documents not modified - *

- * Problem: documents not modified for a longer time are fetched in every - * cycle because of an error in the SYNC_DELTA calculation of - * {@link AdaptiveFetchSchedule}.
- * The next fetch time should always be in the future, never in the past. - *

- */ - @Test - public void testAdaptiveFetchScheduleSyncDelta() { - LOG.info("NUTCH-1564 test SYNC_DELTA calculation of AdaptiveFetchSchedule"); - Context context = CrawlDBTestUtil.createContext(); - Configuration conf = context.getConfiguration(); - conf.setLong("db.fetch.interval.default", 172800); // 2 days - conf.setLong("db.fetch.schedule.adaptive.min_interval", 86400); // 1 day - conf.setLong("db.fetch.schedule.adaptive.max_interval", 604800); // 7 days - conf.setLong("db.fetch.interval.max", 604800); // 7 days - conf.set("db.fetch.schedule.class", - "org.apache.nutch.crawl.AdaptiveFetchSchedule"); - ContinuousCrawlTestUtil crawlUtil = new CrawlTestFetchScheduleNotModifiedFetchTime( - context); - crawlUtil.setInterval(FetchSchedule.SECONDS_PER_DAY / 3); - try { - if (!crawlUtil.run(100)) { - fail("failed: sync_delta calculation with AdaptiveFetchSchedule"); - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - private class CrawlTestFetchScheduleNotModifiedFetchTime extends - CrawlTestFetchNotModified { - - // time of current fetch - private long fetchTime; - - private long minInterval; - private long maxInterval; - - CrawlTestFetchScheduleNotModifiedFetchTime(Context context) { - super(context); - Configuration conf = context.getConfiguration(); - minInterval = conf.getLong("db.fetch.schedule.adaptive.min_interval", - 86400); // 1 day - maxInterval = conf.getLong("db.fetch.schedule.adaptive.max_interval", - 604800); // 7 days - if (conf.getLong("db.fetch.interval.max", 604800) < maxInterval) { - maxInterval = conf.getLong("db.fetch.interval.max", 604800); - } - } - - @Override - protected CrawlDatum fetch(CrawlDatum datum, long currentTime) { - // remember time of fetching - fetchTime = currentTime; - return super.fetch(datum, currentTime); - } - - @Override - protected boolean check(CrawlDatum result) { - if (result.getStatus() == STATUS_DB_NOTMODIFIED) { - // check only status notmodified here - long secondsUntilNextFetch = (result.getFetchTime() - fetchTime) / 1000L; - if (secondsUntilNextFetch < -1) { - // next fetch time is in the past (more than one second) - LOG.error("Next fetch time is in the past: {}", result); - return false; - } - if (secondsUntilNextFetch < 60) { - // next fetch time is in less than one minute - // (critical: Nutch can hardly be so fast) - LOG.error("Less then one minute until next fetch: {}", result); - } - // Next fetch time should be within min. and max. (tolerance: 60 sec.) - if (secondsUntilNextFetch + 60 < minInterval - || secondsUntilNextFetch - 60 > maxInterval) { - LOG.error( - "Interval until next fetch time ({}) is not within min. and max. interval: {}", - TimingUtil.elapsedTime(fetchTime, result.getFetchTime()), result); - // TODO: is this a failure? - } - } - return true; - } - - } - -} diff --git a/src/test/org/apache/nutch/fetcher/TestFetchWithParseFailures.java b/src/test/org/apache/nutch/fetcher/TestFetchWithParseFailures.java new file mode 100644 index 0000000000..0ed666558d --- /dev/null +++ b/src/test/org/apache/nutch/fetcher/TestFetchWithParseFailures.java @@ -0,0 +1,264 @@ +package org.apache.nutch.fetcher; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.nutch.crawl.AbstractFetchSchedule; +import org.apache.nutch.crawl.CrawlDBTestUtil; +import org.apache.nutch.crawl.Generator; +import org.apache.nutch.crawl.Injector; +import org.apache.nutch.metadata.Nutch; +import org.apache.nutch.parse.ParseSegment; +import org.apache.nutch.protocol.Content; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.ResourceHandler; +import org.apache.nutch.crawl.CrawlDatum; +import org.apache.nutch.crawl.CrawlDb; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class TestFetchWithParseFailures { + + private static final Logger LOG = LoggerFactory + .getLogger(MethodHandles.lookup().lookupClass()); + + private static final Path TEST_DIR = new Path("build/test/test-fail-parse"); + + private static final String BASE_FOLDER = "build/test/data/fetch-parse-failure"; + private static final String TEST_FILE = "test.html"; + + private Configuration conf; + private FileSystem fs; + private Path crawldbPath; + private Path segmentsPath; + private Path urlPath; + private Server server; + + private static List files; + private static java.nio.file.Path baseFolderPath; + + private static ExecutorService executor = Executors.newCachedThreadPool(); + private static final AtomicInteger FETCH_COUNT = new AtomicInteger(0); + + + @BeforeEach + public void setUp() throws Exception { + baseFolderPath = java.nio.file.Paths.get(BASE_FOLDER); + + files = java.nio.file.Files.list(baseFolderPath).map(p -> p.getFileName().toString()).filter(n -> !n.equals("robots.txt")).collect(Collectors.toList()); + Collections.sort(files); + LOG.info("TEST FILES : " + files); + + conf = CrawlDBTestUtil.createContext().getConfiguration(); + // Do not include 'parse-tika', because it would parse anything + conf.set("plugin.includes", "protocol-http|urlfilter-regex|parse-html|index-(basic|anchor)|indexer-csv|scoring-opic|urlnormalizer-(pass|regex|basic)"); + conf.setInt("fetcher.threads.fetch", 1); + // force an exception from ParserFactory.getParsers(...): ParserNotFound + conf.setBoolean("mime.type.magic", false); + conf.setBoolean("fetcher.parse", true); + conf.setBoolean("fetcher.store.content", true); + conf.setBoolean(ParseSegment.DELETE_FAILED_PARSE, true); + + fs = FileSystem.get(conf); + fs.delete(TEST_DIR, true); + crawldbPath = new Path(TEST_DIR, "crawldb"); + segmentsPath = new Path(TEST_DIR, "segments"); + urlPath = new Path(TEST_DIR, "urls"); + server = CrawlDBTestUtil.getServer( + conf.getInt("content.server.port", 1234), + BASE_FOLDER, new ParseFailureResourceHandler()); + server.start(); + } + + @AfterEach + public void tearDown() throws Exception { + executor.shutdown(); + server.stop(); + for (int i = 0; i < 5; i++) { + if (!server.isStopped()) { + Thread.sleep(1000); + } + } + fs.delete(TEST_DIR, true); + } + + + @Test + public void testFetchWithParseFailure() throws Exception { + AbstractFetchSchedule schedule = new AbstractFetchSchedule(conf) {}; + + // generate seedlist + ArrayList urls = new ArrayList(); + files.forEach(f -> urls.add("http://127.0.0.1:" + server.getURI().getPort() + "/" + f)); + CrawlDBTestUtil.generateSeedList(fs, urlPath, urls); + + // inject + Injector injector = new Injector(conf); + injector.inject(crawldbPath, urlPath); + + // generate + Generator g1 = new Generator(conf); + Path[] generatedSegment1 = g1.generate(crawldbPath, segmentsPath, 1, + Long.MAX_VALUE, Long.MAX_VALUE, false, false, true, 1, null); + Assertions.assertNotNull(generatedSegment1); + + Map args1 = Map.of("segment", generatedSegment1[0]); + // fetch once + LOG.info("1ST FETCH"); + Fetcher fetcher1 = new Fetcher(conf); + Map result1 = executor.submit(new Callable>(){ + + @Override + public Map call() throws Exception { + try { + return fetcher1.run(args1, "1"); + } catch (Throwable t) { + LOG.error("ERROR!", t); + } finally { + FETCH_COUNT.incrementAndGet(); + } + return null; + }}).get(); + Assertions.assertFalse(result1.isEmpty()); + Assertions.assertEquals("0", result1.get(Nutch.VAL_RESULT)); + Assertions.assertEquals(1, FETCH_COUNT.get()); + + CrawlDb crawlDb = new CrawlDb(conf); + crawlDb.update(crawldbPath, generatedSegment1, true, true, true, true); + + // verify content + LOG.info("1ST VERIFY CONTENT"); + Map fetchedUrls = new HashMap<>(); + Path data = new Path( + new Path(generatedSegment1[0], Content.DIR_NAME), "part-r-00000/data"); + try(SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(data))){ + do { + Text key = new Text(); + Content value = new Content(); + if (!reader.next(key, value)) + break; + // status is stored in Nutch.FETCH_STATUS_KEY: ref. FetcherThread + String status = value.getMetadata().get(Nutch.FETCH_STATUS_KEY); + fetchedUrls.put(key.toString(), status); + } while (true); + } + assertEquals(urls.size(), fetchedUrls.size()); + LOG.info("1ST FETCHED URLS: {}", fetchedUrls); + fetchedUrls.entrySet().forEach(i -> Assertions.assertEquals(i.getValue(), "" + CrawlDatum.STATUS_FETCH_SUCCESS)); + + // force re-fetch and wait a bit + urls.forEach(i -> + schedule.forceRefetch(new Text(i), new CrawlDatum(CrawlDatum.STATUS_DB_UNFETCHED, 1), true)); + long start = System.currentTimeMillis(); + while(System.currentTimeMillis() < start + 1000L); + + // inject and generate again + injector.inject(crawldbPath, urlPath, true, true); + Generator g2 = new Generator(conf); + Path[] generatedSegment2 = g2.generate(crawldbPath, segmentsPath, 1, + Long.MAX_VALUE, Long.MAX_VALUE, false, false, true, 1, null); + Assertions.assertNotNull(generatedSegment2); + + crawlDb.update(crawldbPath, generatedSegment2, true, true, true, true); + + Map args2 = Map.of("segment", generatedSegment2[0]); + // next fetch should generate parse failure + LOG.info("2ND FETCH"); + Fetcher fetcher2 = new Fetcher(conf); + + Map result2 = executor.submit(new Callable>(){ + + @Override + public Map call() throws Exception { + try { + return fetcher2.run(args2, "2"); + } catch (Throwable t) { + LOG.error("ERROR!", t); + } finally { + FETCH_COUNT.incrementAndGet(); + } + return null; + }}).get(); + Assertions.assertFalse(result2.isEmpty()); + Assertions.assertEquals("0", result2.get(Nutch.VAL_RESULT)); + Assertions.assertEquals(2, FETCH_COUNT.get()); + + // verify parsed data + LOG.info("2ND VERIFY CONTENT"); + Path newData = new Path( + new Path(generatedSegment2[0], Content.DIR_NAME), "part-r-00000/data"); + try(SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(newData))){ + do { + Text key = new Text(); + Content value = new Content(); + if (!reader.next(key, value)) + break; + // status is stored in Nutch.FETCH_STATUS_KEY: ref. FetcherThread + String status = value.getMetadata().get(Nutch.FETCH_STATUS_KEY); + fetchedUrls.put(key.toString(), status); + } while (true); + } + assertEquals(urls.size(), fetchedUrls.size()); + LOG.info("2ND FETCHED STATUS: {}", fetchedUrls); + + for (Map.Entry entry : fetchedUrls.entrySet()) { + if(entry.getKey().endsWith(TEST_FILE)) { + Assertions.assertEquals(entry.getValue(), "" + CrawlDatum.STATUS_PARSE_FAILED); + } else { + Assertions.assertEquals(entry.getValue(), "" + CrawlDatum.STATUS_FETCH_SUCCESS); + } + } + } + + public static class ParseFailureResourceHandler extends ResourceHandler { + + public ParseFailureResourceHandler() { + super(); + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + if(FETCH_COUNT.get() == 1 && target.endsWith(TEST_FILE)) { + LOG.info("Set mime type 'application/unknown' and random bytes content"); + // force an exception from ParserFactory.getParsers(...): ParserNotFound + response.setContentType("application/unknown"); + response.setContentLength(1024); + byte[] randomBytes = new byte[1024]; + new Random(123).nextBytes(randomBytes); + response.getOutputStream().write(randomBytes); + + } else { + super.handle(target, baseRequest, request, response); + } + } + } +} diff --git a/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java b/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java index 81dd192063..c49eef3578 100644 --- a/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java +++ b/src/test/org/apache/nutch/indexer/TestIndexerMapReduce.java @@ -26,6 +26,7 @@ import org.apache.nutch.metadata.Nutch; import org.apache.nutch.parse.Outlink; import org.apache.nutch.parse.ParseData; +import org.apache.nutch.parse.ParseSegment; import org.apache.nutch.parse.ParseStatus; import org.apache.nutch.parse.ParseText; import org.apache.nutch.protocol.Content; @@ -46,6 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; /** Test {@link IndexerMapReduce} */ public class TestIndexerMapReduce { @@ -82,17 +84,73 @@ public class TestIndexerMapReduce { htmlMeta.add(Nutch.SIGNATURE_KEY, "123"); } public static ParseText parseText = new ParseText("Test"); + public static ParseText failedParseText = new ParseText("Failed Parse"); public static ParseData parseData = new ParseData(ParseStatus.STATUS_SUCCESS, "Test", new Outlink[] {}, htmlMeta); + public static ParseData failedParseData = new ParseData(ParseStatus.STATUS_FAILURE, + "Failed Parse", new Outlink[] {}, htmlMeta); public static CrawlDatum crawlDatumDbFetched = new CrawlDatum( CrawlDatum.STATUS_DB_FETCHED, 60 * 60 * 24); public static CrawlDatum crawlDatumFetchSuccess = new CrawlDatum( CrawlDatum.STATUS_FETCH_SUCCESS, 60 * 60 * 24); + public static CrawlDatum crawlDatumDbParseFailed = new CrawlDatum( + CrawlDatum.STATUS_DB_PARSE_FAILED, 60 * 60 * 24); + public static CrawlDatum crawlDatumParseFailed = new CrawlDatum( + CrawlDatum.STATUS_PARSE_FAILED, 60 * 60 * 24); private IndexerMapReduce.IndexerReducer reducer = new IndexerMapReduce.IndexerReducer(); private Configuration configuration; + @Test + public void testDeleteParseFailure() { + configuration = NutchConfiguration.create(); + configuration.setBoolean(ParseSegment.DELETE_FAILED_PARSE, true); + + // unrelated issue with "index.jexl.filter", don't use all plugins. Ref: src/test/nutch-site.xml + configuration.set("plugin.includes", "protocol-http|urlfilter-regex|parse-(html|tika)|index-(basic|anchor)|indexer-csv|scoring-opic|urlnormalizer-(pass|regex|basic)"); + + int docToDelete = 0; + int docsToIndex = 0; + int deletedDocs = 0; + int indexedDocs = 0; + for(int i=0; i<5; i++) { + boolean failParse = i % 2 == 0; + ParseData data = null; + ParseText text = null; + CrawlDatum dbStatus = null; + CrawlDatum status = null; + + Content content = new Content(testUrl, testUrl, + testHtmlDoc.getBytes(StandardCharsets.UTF_8), htmlContentType, htmlMeta, + configuration); + + if(failParse) { + data = failedParseData; + text = failedParseText; + dbStatus = crawlDatumDbParseFailed; + status = crawlDatumParseFailed; + docToDelete++; + } else { + data = parseData; + text = parseText; + dbStatus = crawlDatumDbFetched; + status = crawlDatumFetchSuccess; + docsToIndex++; + } + + NutchDocument doc = runIndexer(dbStatus, status, text, data, content); + if(failParse) { + deletedDocs++; + assertNull(doc, "NutchDocument should not be indexed"); + } else { + indexedDocs++; + assertNotNull(doc, "No NutchDocument indexed"); + } + assertEquals(docToDelete, deletedDocs); + assertEquals(docsToIndex, indexedDocs); + } + } /** * Test indexing of base64-encoded binary content. diff --git a/src/testresources/fetch-parse-failure/index.html b/src/testresources/fetch-parse-failure/index.html new file mode 100644 index 0000000000..8dd11bf16d --- /dev/null +++ b/src/testresources/fetch-parse-failure/index.html @@ -0,0 +1,25 @@ + + + + front page + + +This is the front page. +
Test + + \ No newline at end of file diff --git a/src/testresources/fetch-parse-failure/robots.txt b/src/testresources/fetch-parse-failure/robots.txt new file mode 100644 index 0000000000..fc590f9733 --- /dev/null +++ b/src/testresources/fetch-parse-failure/robots.txt @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. \ No newline at end of file diff --git a/src/testresources/fetch-parse-failure/test.html b/src/testresources/fetch-parse-failure/test.html new file mode 100644 index 0000000000..2151060d30 --- /dev/null +++ b/src/testresources/fetch-parse-failure/test.html @@ -0,0 +1,25 @@ + + + + Test + + +This is a test +home + + \ No newline at end of file