From f7f610795c6000f54a0f632cf38bf590d74a06ac Mon Sep 17 00:00:00 2001 From: Zhiyuan Chen Date: Thu, 2 Jan 2025 16:26:30 +0800 Subject: [PATCH] fix: improve downloader thread exit conditions - Implement timeout mechanism for inactive downloads - Add max_idle_time parameter to prevent infinite waiting - Return download success status from download method The downloader now exits when: 1. Reached max_num of downloads 2. No new downloads for max_idle_time 3. Parser exited and queue is empty This fixes the issue where threads would hang indefinitely when fewer images were found than max_num specified. --- icrawler/builtin/baidu.py | 10 +++++++--- icrawler/builtin/bing.py | 8 +++++++- icrawler/builtin/flickr.py | 2 ++ icrawler/builtin/google.py | 13 ++++++++----- icrawler/builtin/greedy.py | 8 ++++++-- icrawler/builtin/urllist.py | 11 +++++++---- icrawler/crawler.py | 2 +- icrawler/downloader.py | 34 +++++++++++++++++++++++++--------- icrawler/parser.py | 7 +++---- 9 files changed, 66 insertions(+), 29 deletions(-) diff --git a/icrawler/builtin/baidu.py b/icrawler/builtin/baidu.py index a593dad..8a748f5 100644 --- a/icrawler/builtin/baidu.py +++ b/icrawler/builtin/baidu.py @@ -157,6 +157,7 @@ def crawl( max_size=None, file_idx_offset=0, overwrite=False, + max_idle_time=None, ): if offset + max_num > 1000: if offset > 1000: @@ -170,10 +171,13 @@ def crawl( "been automatically set to %d", 1000 - offset, ) - else: - pass feeder_kwargs = dict(keyword=keyword, offset=offset, max_num=max_num, filters=filters) downloader_kwargs = dict( - max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset, overwrite=overwrite + max_num=max_num, + min_size=min_size, + max_size=max_size, + file_idx_offset=file_idx_offset, + overwrite=overwrite, + max_idle_time=max_idle_time, ) super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs) diff --git a/icrawler/builtin/bing.py b/icrawler/builtin/bing.py index 2b39bdf..67c0212 100644 --- a/icrawler/builtin/bing.py +++ b/icrawler/builtin/bing.py @@ -145,6 +145,7 @@ def crawl( max_size=None, file_idx_offset=0, overwrite=False, + max_idle_time=None, ): if offset + max_num > 1000: if offset > 1000: @@ -160,6 +161,11 @@ def crawl( ) feeder_kwargs = dict(keyword=keyword, offset=offset, max_num=max_num, filters=filters) downloader_kwargs = dict( - max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset, overwrite=overwrite + max_num=max_num, + min_size=min_size, + max_size=max_size, + file_idx_offset=file_idx_offset, + overwrite=overwrite, + max_idle_time=max_idle_time, ) super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs) diff --git a/icrawler/builtin/flickr.py b/icrawler/builtin/flickr.py index 27c9a02..6871aaf 100644 --- a/icrawler/builtin/flickr.py +++ b/icrawler/builtin/flickr.py @@ -135,6 +135,7 @@ def crawl( max_size=None, file_idx_offset=0, overwrite=False, + max_idle_time=None, **kwargs, ): kwargs["apikey"] = self.apikey @@ -170,5 +171,6 @@ def crawl( max_size=max_size, file_idx_offset=file_idx_offset, overwrite=overwrite, + max_idle_time=max_idle_time, ), ) diff --git a/icrawler/builtin/google.py b/icrawler/builtin/google.py index d6cd2b8..074bda3 100644 --- a/icrawler/builtin/google.py +++ b/icrawler/builtin/google.py @@ -185,12 +185,11 @@ def crawl( language=None, file_idx_offset=0, overwrite=False, + max_idle_time=None, ): if offset + max_num > 1000: if offset > 1000: - self.logger.error( - '"Offset" cannot exceed 1000, otherwise you will get ' "duplicated searching results." - ) + self.logger.error("Offset cannot exceed 1000, otherwise you " "will get duplicated searching results.") return elif max_num > 1000: max_num = 1000 - offset @@ -201,9 +200,13 @@ def crawl( "can specify different date ranges.", 1000 - offset, ) - feeder_kwargs = dict(keyword=keyword, offset=offset, max_num=max_num, language=language, filters=filters) downloader_kwargs = dict( - max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset, overwrite=overwrite + max_num=max_num, + min_size=min_size, + max_size=max_size, + file_idx_offset=file_idx_offset, + overwrite=overwrite, + max_idle_time=max_idle_time, ) super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs) diff --git a/icrawler/builtin/greedy.py b/icrawler/builtin/greedy.py index 2037bfa..83c2088 100644 --- a/icrawler/builtin/greedy.py +++ b/icrawler/builtin/greedy.py @@ -77,7 +77,7 @@ def __init__( ): super().__init__(feeder_cls, parser_cls, downloader_cls, *args, **kwargs) - def crawl(self, domains, max_num=0, min_size=None, max_size=None, file_idx_offset=0): + def crawl(self, domains, max_num=0, min_size=None, max_size=None, file_idx_offset=0, max_idle_time=None): if isinstance(domains, str): domains = [domains] elif not isinstance(domains, list): @@ -90,6 +90,10 @@ def crawl(self, domains, max_num=0, min_size=None, max_size=None, file_idx_offse feeder_kwargs={"domains": domains}, parser_kwargs={"domains": domains}, downloader_kwargs=dict( - max_num=max_num, min_size=min_size, max_size=max_size, file_idx_offset=file_idx_offset + max_num=max_num, + min_size=min_size, + max_size=max_size, + file_idx_offset=file_idx_offset, + max_idle_time=max_idle_time, ), ) diff --git a/icrawler/builtin/urllist.py b/icrawler/builtin/urllist.py index 4a5957e..14719b5 100644 --- a/icrawler/builtin/urllist.py +++ b/icrawler/builtin/urllist.py @@ -33,7 +33,10 @@ def __init__( ): super().__init__(feeder_cls, parser_cls, downloader_cls, *args, **kwargs) - def crawl(self, url_list, max_num=1000, file_idx_offset=0, overwrite=False): - feeder_kwargs = dict(url_list=url_list) - downloader_kwargs = dict(file_idx_offset=file_idx_offset, max_num=max_num, overwrite=overwrite) - super().crawl(feeder_kwargs=feeder_kwargs, downloader_kwargs=downloader_kwargs) + def crawl(self, url_list, max_num=1000, file_idx_offset=0, overwrite=False, max_idle_time=None): + super().crawl( + feeder_kwargs=dict(url_list=url_list), + downloader_kwargs=dict( + file_idx_offset=file_idx_offset, max_num=max_num, overwrite=overwrite, max_idle_time=max_idle_time + ), + ) diff --git a/icrawler/crawler.py b/icrawler/crawler.py index 1791c7e..12f8f1d 100644 --- a/icrawler/crawler.py +++ b/icrawler/crawler.py @@ -139,7 +139,7 @@ def set_session(self, headers=None): "Mozilla/5.0 (Windows NT 10.0; Win64; x64)" " AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/88.0.4324.104 Safari/537.36" - ) + ), } elif not isinstance(headers, dict): raise TypeError('"headers" must be a dict object') diff --git a/icrawler/downloader.py b/icrawler/downloader.py index db536f3..f60870c 100644 --- a/icrawler/downloader.py +++ b/icrawler/downloader.py @@ -1,4 +1,5 @@ import queue +import time from io import BytesIO from threading import current_thread from urllib.parse import urlparse @@ -163,7 +164,7 @@ def start(self, file_idx_offset=0, *args, **kwargs): worker.start() self.logger.debug("thread %s started", worker.name) - def worker_exec(self, max_num, default_ext="", queue_timeout=5, req_timeout=5, **kwargs): + def worker_exec(self, max_num, default_ext="", queue_timeout=5, req_timeout=5, max_idle_time=None, **kwargs): """Target method of workers. Get task from ``task_queue`` and then download files and process meta @@ -171,34 +172,49 @@ def worker_exec(self, max_num, default_ext="", queue_timeout=5, req_timeout=5, * 1. All parser threads have exited and the task_queue is empty. 2. Downloaded image number has reached required number(max_num). + 3. No new downloads for max_idle_time seconds. Args: + max_num (int): Maximum number of images to download queue_timeout (int): Timeout of getting tasks from ``task_queue``. req_timeout (int): Timeout of making requests for downloading pages. + max_idle_time (int): Maximum time (in seconds) to wait without receiving new images **kwargs: Arguments passed to the :func:`download` method. """ self.max_num = max_num + last_download_time = time.time() + while True: if self.signal.get("reach_max_num"): - self.logger.info( - "downloaded images reach max num, thread %s" " is ready to exit", current_thread().name - ) + self.logger.info("downloaded images reach max num, thread %s is ready to exit", current_thread().name) break + + current_time = time.time() + if max_idle_time is not None and current_time - last_download_time > max_idle_time and self.fetched_num > 0: + self.logger.info("no new images for %d seconds, thread %s exit", max_idle_time, current_thread().name) + break + try: task = self.in_queue.get(timeout=queue_timeout) except queue.Empty: if self.signal.get("parser_exited"): self.logger.info("no more download task for thread %s", current_thread().name) break - else: + elif self.fetched_num == 0: self.logger.info("%s is waiting for new download tasks", current_thread().name) + else: + self.logger.info("no more images available, thread %s exit", current_thread().name) + break except: self.logger.error("exception in thread %s", current_thread().name) else: - self.download(task, default_ext, req_timeout, **kwargs) + success = self.download(task, default_ext, req_timeout, **kwargs) + if success: + last_download_time = time.time() self.process_meta(task) self.in_queue.task_done() - self.logger.info(f"thread {current_thread().name} exit") + + self.logger.info("thread %s exit", current_thread().name) def __exit__(self, exc_type, exc_val, exc_tb): self.logger.info("all downloader threads exited") @@ -247,5 +263,5 @@ def get_filename(self, task, default_ext): file_idx = self.fetched_num + self.file_idx_offset return f"{file_idx:06d}.{extension}" - def worker_exec(self, max_num, default_ext="jpg", queue_timeout=5, req_timeout=5, **kwargs): - super().worker_exec(max_num, default_ext, queue_timeout, req_timeout, **kwargs) + def worker_exec(self, max_num, default_ext="jpg", queue_timeout=5, req_timeout=5, max_idle_time=None, **kwargs): + super().worker_exec(max_num, default_ext, queue_timeout, req_timeout, max_idle_time, **kwargs) diff --git a/icrawler/parser.py b/icrawler/parser.py index a3b2a96..7fb845a 100644 --- a/icrawler/parser.py +++ b/icrawler/parser.py @@ -68,9 +68,8 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs): if self.signal.get("feeder_exited"): self.logger.info("no more page urls for thread %s to parse", current_thread().name) break - else: - self.logger.info("%s is waiting for new page urls", current_thread().name) - continue + self.logger.info("%s is waiting for new page urls", current_thread().name) + continue except: self.logger.error("exception in thread %s", current_thread().name) continue @@ -117,5 +116,5 @@ def worker_exec(self, queue_timeout=2, req_timeout=5, max_retry=3, **kwargs): retry -= 1 self.logger.info(f"thread {current_thread().name} exit") - def __exit__(self): + def __exit__(self, exc_type, exc_val, exc_tb): logging.info("all parser threads exited")