Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunk download folder and threads change #469

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions src/sparsezoo/utils/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import concurrent.futures
import logging
import math
import multiprocessing
import os
import re
import shutil
Expand Down Expand Up @@ -92,6 +93,13 @@ def __init__(
self.chunk_bytes = chunk_bytes
self.job_queues = Queue()
self._lock = threading.Lock()
self.chunk_folder = self.get_parent_chunk_folder(download_path)

def get_parent_chunk_folder(self, path: str) -> str:
"""Get the name of the file that is used as folder inside chunks"""
path = path.split(os.path.sep)[-1]
path = path.replace(".", "_")
return path

def is_range_header_supported(self) -> bool:
"""Check if chunck download is supported"""
Expand Down Expand Up @@ -148,9 +156,11 @@ def queue_chunk_download_jobs(self) -> None:
The jobs need to be executed by a worker or scheduler that processes the
queued JobQueues.
"""
download_jobs: Queue = JobQueue(description="Downloading Chunks")
file_name = self.download_path.split(os.path.sep)[-1]
download_jobs: Queue = JobQueue(
description=f"Downloading Chunks for {file_name}"
)
num_download_jobs = math.ceil(self.file_size / self.chunk_bytes)

for job_id in range(num_download_jobs):
start_byte = 0 if job_id == 0 else job_id * (self.chunk_bytes) + 1
end_byte = (
Expand All @@ -162,7 +172,7 @@ def queue_chunk_download_jobs(self) -> None:

func_kwargs = {
"download_path": self.get_chunk_file_path(
f"{job_id:05d}_{bytes_range}"
os.path.join(self.chunk_folder, f"{job_id:05d}_{bytes_range}")
),
"headers": {
"Range": bytes_range,
Expand Down Expand Up @@ -237,7 +247,7 @@ def queue_jobs(self) -> None:
)
self.job_queues.put(job_queue)

def run(self, num_threads: int = 10) -> None:
def run(self, num_threads: int = 1) -> None:
"""
Executes queued download jobs in parallel using multiple threads.

Expand All @@ -250,6 +260,9 @@ def run(self, num_threads: int = 10) -> None:
file chunks in parallel. Defaults to 10.

"""
available_threads = multiprocessing.cpu_count() - threading.active_count()
num_threads = max(available_threads // 2, num_threads)

is_prev_job_queue_success = True
while not self.job_queues.empty() and is_prev_job_queue_success:
job_queue = self.job_queues.get()
Expand Down Expand Up @@ -295,23 +308,25 @@ def execute_job_from_queue(self, job_queue: Queue, **kwargs) -> None:
with self._lock:
job: Job = job_queue.get()
success = False
err = ""
while not success and job.retries < job.max_retries:
try:
job.func(**job.func_kwargs, **kwargs)
success = True
except Exception as _err:
err = _err
_LOGGER.debug(
f"{job.retries/self.max_retries}: "
"Failed running {self.func} with kwargs {job.func_kwargs}"
)
_LOGGER.debug(_err)
_LOGGER.error(_err)
job.retries += 1
if job.retries < job.max_retries:
job_queue.put(job)

if not success:
_LOGGER.debug(f"Chunk download failed after {self.max_retries} retries.")
raise ValueError
raise ValueError(err)

def download_file(
self,
Expand Down Expand Up @@ -339,7 +354,10 @@ def download_file(

"""
write_chunk_size = min(CHUNK_BYTES, self.file_size)
_LOGGER.debug("creating ", download_path)

create_parent_dirs(download_path)

response = requests.get(
self.url, headers=headers, stream=True, allow_redirects=True
)
Expand All @@ -359,7 +377,8 @@ def combine_chunks_and_delete(self, download_path: str, progress_bar: tqdm) -> N

"""
parent_directory = os.path.dirname(download_path)
chunk_directory = os.path.join(parent_directory, "chunks")
chunk_directory = os.path.join(parent_directory, "chunks", self.chunk_folder)
_LOGGER.debug("Combing and deleting ", chunk_directory)

pattern = re.compile(r"\d+_bytes=")
files = os.listdir(chunk_directory)
Expand Down
Loading