diff --git a/src/sparsezoo/utils/download.py b/src/sparsezoo/utils/download.py index eae9ee68..184eafb7 100644 --- a/src/sparsezoo/utils/download.py +++ b/src/sparsezoo/utils/download.py @@ -16,6 +16,7 @@ import concurrent.futures import logging import math +import multiprocessing import os import re import shutil @@ -92,6 +93,13 @@ def __init__( self.chunk_bytes = chunk_bytes self.job_queues = Queue() self._lock = threading.Lock() + self.chunk_folder = self.get_parent_chunk_folder(download_path) + + def get_parent_chunk_folder(self, path: str) -> str: + """Get the name of the file that is used as folder inside chunks""" + path = path.split(os.path.sep)[-1] + path = path.replace(".", "_") + return path def is_range_header_supported(self) -> bool: """Check if chunck download is supported""" @@ -148,9 +156,11 @@ def queue_chunk_download_jobs(self) -> None: The jobs need to be executed by a worker or scheduler that processes the queued JobQueues. """ - download_jobs: Queue = JobQueue(description="Downloading Chunks") + file_name = self.download_path.split(os.path.sep)[-1] + download_jobs: Queue = JobQueue( + description=f"Downloading Chunks for {file_name}" + ) num_download_jobs = math.ceil(self.file_size / self.chunk_bytes) - for job_id in range(num_download_jobs): start_byte = 0 if job_id == 0 else job_id * (self.chunk_bytes) + 1 end_byte = ( @@ -162,7 +172,7 @@ def queue_chunk_download_jobs(self) -> None: func_kwargs = { "download_path": self.get_chunk_file_path( - f"{job_id:05d}_{bytes_range}" + os.path.join(self.chunk_folder, f"{job_id:05d}_{bytes_range}") ), "headers": { "Range": bytes_range, @@ -237,7 +247,7 @@ def queue_jobs(self) -> None: ) self.job_queues.put(job_queue) - def run(self, num_threads: int = 10) -> None: + def run(self, num_threads: int = 1) -> None: """ Executes queued download jobs in parallel using multiple threads. @@ -250,6 +260,9 @@ def run(self, num_threads: int = 10) -> None: file chunks in parallel. Defaults to 10. """ + available_threads = multiprocessing.cpu_count() - threading.active_count() + num_threads = max(available_threads // 2, num_threads) + is_prev_job_queue_success = True while not self.job_queues.empty() and is_prev_job_queue_success: job_queue = self.job_queues.get() @@ -295,23 +308,25 @@ def execute_job_from_queue(self, job_queue: Queue, **kwargs) -> None: with self._lock: job: Job = job_queue.get() success = False + err = "" while not success and job.retries < job.max_retries: try: job.func(**job.func_kwargs, **kwargs) success = True except Exception as _err: + err = _err _LOGGER.debug( f"{job.retries/self.max_retries}: " "Failed running {self.func} with kwargs {job.func_kwargs}" ) - _LOGGER.debug(_err) + _LOGGER.error(_err) job.retries += 1 if job.retries < job.max_retries: job_queue.put(job) if not success: _LOGGER.debug(f"Chunk download failed after {self.max_retries} retries.") - raise ValueError + raise ValueError(err) def download_file( self, @@ -339,7 +354,10 @@ def download_file( """ write_chunk_size = min(CHUNK_BYTES, self.file_size) + _LOGGER.debug("creating ", download_path) + create_parent_dirs(download_path) + response = requests.get( self.url, headers=headers, stream=True, allow_redirects=True ) @@ -359,7 +377,8 @@ def combine_chunks_and_delete(self, download_path: str, progress_bar: tqdm) -> N """ parent_directory = os.path.dirname(download_path) - chunk_directory = os.path.join(parent_directory, "chunks") + chunk_directory = os.path.join(parent_directory, "chunks", self.chunk_folder) + _LOGGER.debug("Combing and deleting ", chunk_directory) pattern = re.compile(r"\d+_bytes=") files = os.listdir(chunk_directory)