Skip to content

Commit

Permalink
Merge pull request apache#832 from sebastian-nagel/NUTCH-3072
Browse files Browse the repository at this point in the history
NUTCH-3072 Fetcher to stop QueueFeeder if aborting with "hung threads"
  • Loading branch information
sebastian-nagel authored Jan 8, 2025
2 parents 74b49e9 + c226162 commit 3b6d2c6
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 35 deletions.
30 changes: 22 additions & 8 deletions src/java/org/apache/nutch/fetcher/FetchItemQueues.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public class FetchItemQueues {
int maxExceptionsPerQueue = -1;
long exceptionsPerQueueDelay = -1;
long exceptionsPerQueueClearAfter = 1800 * 1000L;
boolean feederAlive = true;
volatile boolean feederAlive = true;
volatile boolean timoutReached = false;
Configuration conf;

public static final String QUEUE_MODE_HOST = "byHost";
Expand All @@ -71,7 +72,8 @@ enum QueuingStatus {
SUCCESSFULLY_QUEUED,
ERROR_CREATE_FETCH_ITEM,
ABOVE_EXCEPTION_THRESHOLD,
HIT_BY_TIMELIMIT;
HIT_BY_TIMELIMIT,
HIT_BY_TIMEOUT;
}

public FetchItemQueues(Configuration conf) {
Expand Down Expand Up @@ -105,7 +107,7 @@ public FetchItemQueues(Configuration conf) {

/**
* Check whether queue mode is valid, fall-back to default mode if not.
*
*
* @param queueMode
* queue mode to check
* @return valid queue mode or default
Expand Down Expand Up @@ -258,6 +260,18 @@ public synchronized int checkTimelimit() {
return count;
}

/**
* Signal that the hard timeout is reached because new fetches / requests
* where made during half of the MapReduce task timeout
* (<code>mapreduce.task.timeout</code>, default value: 10 minutes). In order
* to avoid that the task timeout is hit and the fetcher job is failed, we
* stop the fetching now. See also the property
* <code>fetcher.threads.timeout.divisor</code>.
*/
public void setTimeoutReached() {
this.timoutReached = true;
}

// empties the queues (used by fetcher timelimit and throughput threshold)
public synchronized int emptyQueues() {
int count = 0, queuesDropped = 0;
Expand All @@ -282,10 +296,10 @@ public synchronized int emptyQueues() {
/**
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
*
*
* The next fetch is delayed if specified by the param {@code delay} or
* configured by the property {@code fetcher.exceptions.per.queue.delay}.
*
*
* @param queueid
* a queue identifier to locate and check
* @param maxExceptions
Expand All @@ -296,7 +310,7 @@ public synchronized int emptyQueues() {
* in addition to the delay defined for the given queue. If a
* negative value is passed the delay is chosen by
* {@code fetcher.exceptions.per.queue.delay}
*
*
* @return number of purged items
*/
public synchronized int checkExceptionThreshold(String queueid,
Expand Down Expand Up @@ -367,9 +381,9 @@ private int purgeAndBlockQueue(String queueid, FetchItemQueue fiq,
/**
* Increment the exception counter of a queue in case of an exception e.g.
* timeout; when higher than a given threshold simply empty the queue.
*
*
* @see #checkExceptionThreshold(String, int, long)
*
*
* @param queueid
* queue identifier to locate and check
* @return number of purged items
Expand Down
46 changes: 28 additions & 18 deletions src/java/org/apache/nutch/fetcher/Fetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,25 +58,25 @@

/**
* A queue-based fetcher.
*
*
* <p>
* This fetcher uses a well-known model of one producer (a QueueFeeder) and many
* consumers (FetcherThread-s).
*
*
* <p>
* QueueFeeder reads input fetchlists and populates a set of FetchItemQueue-s,
* which hold FetchItem-s that describe the items to be fetched. There are as
* many queues as there are unique hosts, but at any given time the total number
* of fetch items in all queues is less than a fixed number (currently set to a
* multiple of the number of threads).
*
*
* <p>
* As items are consumed from the queues, the QueueFeeder continues to add new
* input items, so that their total count stays fixed (FetcherThread-s may also
* add new items to the queues e.g. as a results of redirection) - until all
* input items are exhausted, at which point the number of items in the queues
* begins to decrease. When this number reaches 0 fetcher will finish.
*
*
* <p>
* This fetcher implementation handles per-host blocking itself, instead of
* delegating this work to protocol-specific plugins. Each per-host queue
Expand All @@ -85,13 +85,13 @@
* list of requests in progress, and the time the last request was finished. As
* FetcherThread-s ask for new items to be fetched, queues may return eligible
* items or null if for "politeness" reasons this host's queue is not yet ready.
*
*
* <p>
* If there are still unfetched items in the queues, but none of the items are
* ready, FetcherThread-s will spin-wait until either some items become
* available, or a timeout is reached (at which point the Fetcher will abort,
* assuming the task is hung).
*
*
* @author Andrzej Bialecki
*/
public class Fetcher extends NutchTool implements Tool {
Expand Down Expand Up @@ -147,7 +147,7 @@ public static class FetcherRun extends
private AtomicInteger activeThreads = new AtomicInteger(0);
private AtomicInteger spinWaiting = new AtomicInteger(0);
private long start = System.currentTimeMillis();
private AtomicLong lastRequestStart = new AtomicLong(start);
private AtomicLong lastRequestStart = new AtomicLong(start);
private AtomicLong bytes = new AtomicLong(0); // total bytes fetched
private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
private AtomicInteger errors = new AtomicInteger(0); // total pages errored
Expand All @@ -157,7 +157,7 @@ public static class FetcherRun extends
private AtomicInteger getActiveThreads() {
return activeThreads;
}

private void reportStatus(Context context, FetchItemQueues fetchQueues, int pagesLastSec, int bytesLastSec)
throws IOException {
StringBuilder status = new StringBuilder();
Expand All @@ -184,13 +184,13 @@ private void reportStatus(Context context, FetchItemQueues fetchQueues, int page
context.setStatus(status.toString());
}

@Override
@Override
public void setup(Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) {
Configuration conf = context.getConfiguration();
segmentName = conf.get(Nutch.SEGMENT_NAME_KEY);
storingContent = isStoringContent(conf);
parsing = isParsing(conf);
}
}

@Override
public void run(Context innerContext)
Expand Down Expand Up @@ -218,11 +218,6 @@ public void run(Context innerContext)
feeder = new QueueFeeder(innerContext, fetchQueues,
threadCount * queueDepthMultiplier);

// the value of the time limit is either -1 or the time where it should
// finish
long timelimit = conf.getLong("fetcher.timelimit", -1);
if (timelimit != -1)
feeder.setTimeLimit(timelimit);
feeder.start();

int startDelay = conf.getInt("fetcher.threads.start.delay", 10);
Expand Down Expand Up @@ -427,9 +422,12 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
* fetches started during half of the MapReduce task timeout
* (mapreduce.task.timeout, default value: 10 minutes). In order to
* avoid that the task timeout is hit and the fetcher job is failed,
* we stop the fetching now.
* we stop the fetching now. See also the property
* fetcher.threads.timeout.divisor.
*/
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
LOG.warn("Timeout reached with no new requests since {} seconds.",
timeout);
LOG.warn("Aborting with {} hung threads{}.", activeThreads,
feeder.isAlive() ? " (queue feeder still alive)" : "");
innerContext.getCounter("FetcherStatus", "hungThreads")
Expand All @@ -448,6 +446,18 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
LOG.warn(sb.toString());
}
}

/*
* signal the queue feeder that the timeout is reached and wait
* shortly for it to shut down
*/
fetchQueues.setTimeoutReached();
if (feeder.isAlive()) {
LOG.info(
"Signaled QueueFeeder to stop, waiting 1.5 seconds before exiting.");
Thread.sleep(1500);
}

/*
* log and count queued items dropped from the fetch queues because
* of the timeout
Expand All @@ -469,7 +479,7 @@ else if (bandwidthTargetCheckCounter == bandwidthTargetCheckEveryNSecs) {
}
}

public void fetch(Path segment, int threads) throws IOException,
public void fetch(Path segment, int threads) throws IOException,
InterruptedException, ClassNotFoundException {

checkConfiguration();
Expand Down Expand Up @@ -626,7 +636,7 @@ public Map<String, Object> run(Map<String, Object> args, String crawlId) throws
else {
String segmentDir = crawlId+"/segments";
File segmentsDir = new File(segmentDir);
File[] segmentsList = segmentsDir.listFiles();
File[] segmentsList = segmentsDir.listFiles();
Arrays.sort(segmentsList, (f1, f2) -> {
if(f1.lastModified()>f2.lastModified())
return -1;
Expand Down
27 changes: 18 additions & 9 deletions src/java/org/apache/nutch/fetcher/QueueFeeder.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ public class QueueFeeder extends Thread {

private static final Logger LOG = LoggerFactory
.getLogger(MethodHandles.lookup().lookupClass());

private FetcherRun.Context context;
private FetchItemQueues queues;
private int size;
private long timelimit = -1;
private URLFilters urlFilters = null;
private URLNormalizers urlNormalizers = null;
private String urlNormalizerScope = URLNormalizers.SCOPE_DEFAULT;
Expand All @@ -64,10 +63,6 @@ public QueueFeeder(FetcherRun.Context context,
}
}

public void setTimeLimit(long tl) {
timelimit = tl;
}

/** Filter and normalize the url */
private String filterNormalize(String url) {
if (url != null) {
Expand All @@ -90,12 +85,26 @@ public void run() {
int cnt = 0;
int[] queuingStatus = new int[QueuingStatus.values().length];
while (hasMore) {
if (timelimit != -1 && System.currentTimeMillis() >= timelimit) {
if (queues.timelimitExceeded() || queues.timoutReached) {
// enough ... lets' simply read all the entries from the input without
// processing them
if (queues.timoutReached) {
int qstatus = QueuingStatus.HIT_BY_TIMEOUT.ordinal();
if (queuingStatus[qstatus] == 0) {
LOG.info("QueueFeeder stopping, timeout reached.");
}
queuingStatus[qstatus]++;
context.getCounter("FetcherStatus", "hitByTimeout").increment(1);
} else {
int qstatus = QueuingStatus.HIT_BY_TIMELIMIT.ordinal();
if (queuingStatus[qstatus] == 0) {
LOG.info("QueueFeeder stopping, timelimit exceeded.");
}
queuingStatus[qstatus]++;
context.getCounter("FetcherStatus", "hitByTimeLimit").increment(1);
}
try {
hasMore = context.nextKeyValue();
queuingStatus[QueuingStatus.HIT_BY_TIMELIMIT.ordinal()]++;
} catch (IOException e) {
LOG.error("QueueFeeder error reading input, record " + cnt, e);
return;
Expand Down Expand Up @@ -137,7 +146,7 @@ public void run() {
url = new Text(url);
}
CrawlDatum datum = new CrawlDatum();
datum.set((CrawlDatum) context.getCurrentValue());
datum.set(context.getCurrentValue());
QueuingStatus status = queues.addFetchItem(url, datum);
queuingStatus[status.ordinal()]++;
if (status == QueuingStatus.ABOVE_EXCEPTION_THRESHOLD) {
Expand Down

0 comments on commit 3b6d2c6

Please sign in to comment.