diff --git a/tosfs/core.py b/tosfs/core.py index b7b983e..7102f23 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -18,7 +18,6 @@ import mimetypes import os import tempfile -import time from glob import has_magic from typing import Any, BinaryIO, Collection, Generator, List, Optional, Tuple, Union @@ -48,7 +47,6 @@ MPU_PART_SIZE_THRESHOLD, PART_MAX_SIZE, PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, - RETRY_NUM, TOS_SERVER_STATUS_CODE_NOT_FOUND, TOSFS_LOG_FORMAT, ) @@ -873,53 +871,34 @@ def get_file(self, rpath: str, lpath: str, **kwargs: Any) -> None: bucket, key, version_id = self._split_path(rpath) def _read_chunks(body: BinaryIO, f: BinaryIO) -> None: - failed_reads = 0 bytes_read = 0 while True: - try: - chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE) - except TosClientError as e: - failed_reads += 1 - if failed_reads >= RETRY_NUM: - raise e - try: - body.close() - except Exception as e: - logger.error( - "Failed to close the body when calling " - "get_file from %s to %s : %s", - rpath, - lpath, - e, - ) - - time.sleep(min(1.7**failed_reads * 0.1, 15)) - body, _ = self._open_remote_file( - bucket, key, version_id, bytes_read, **kwargs - ) - continue + chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE) if not chunk: break bytes_read += len(chunk) f.write(chunk) - body, content_length = self._open_remote_file( - bucket, key, version_id, range_start=0, **kwargs - ) - try: - with open(lpath, "wb") as f: - _read_chunks(body, f) - finally: + def download_file() -> None: + body, content_length = self._open_remote_file( + bucket, key, version_id, range_start=0, **kwargs + ) try: - body.close() - except Exception as e: - logger.error( - "Failed to close the body when calling " - "get_file from %s to %s: %s", - rpath, - lpath, - e, - ) + with open(lpath, "wb") as f: + retryable_func_executor(_read_chunks, args=(body, f)) + finally: + try: + body.close() + except Exception as e: + logger.error( + "Failed to close the body when calling " + "get_file from %s to %s: %s", + rpath, + lpath, + e, + ) + + retryable_func_executor(download_file) def walk( self,