diff --git a/tosfs/core.py b/tosfs/core.py index d1aed62..293c27f 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -38,7 +38,6 @@ ListObjectVersionsOutput, PartInfo, UploadPartCopyOutput, - UploadPartOutput, ) from tosfs.consts import ( @@ -117,8 +116,8 @@ def __init__( default_fill_cache: bool = True, default_cache_type: str = "readahead", multipart_staging_dirs: str = tempfile.mkdtemp(), - multipart_size: int = 8388608, - multipart_thread_pool_size: int = 96, + multipart_size: int = 4096, + multipart_thread_pool_size: int = max(2, os.cpu_count() or 1), multipart_staging_buffer_size: int = 4096, multipart_threshold: int = 10485760, **kwargs: Any, @@ -165,6 +164,25 @@ def __init__( Whether to fill the cache (default is True). default_cache_type : str, optional The default cache type (default is 'readahead'). + multipart_staging_dirs : str, optional + The staging directories for multipart uploads (default is a temporary + directory). Separate the staging dirs with comma if there are many + staging dir paths. + multipart_size : int, optional + The max byte size which will buffer the staging data in-memory before + flushing to the staging file. Decrease the random write in local staging + disk dramatically if writing plenty of small files. (default is 4096). + multipart_thread_pool_size : int, optional + The size of the thread pool for multipart uploads (default is + max(2, os.cpu_count()). + multipart_staging_buffer_size : int, optional + The size of thread pool used for uploading multipart in parallel for the + given object storage. (default is 4096). + multipart_threshold : int, optional + The threshold which control whether enable multipart upload during + writing data to the given object storage, if the write data size is less + than threshold, will write data via simple put instead of multipart upload. + (default is 10 MB). kwargs : Any, optional Additional arguments. @@ -192,7 +210,9 @@ def __init__( self.default_cache_type = default_cache_type self.max_retry_num = max_retry_num - self.multipart_staging_dirs = [d.strip() for d in multipart_staging_dirs.split(",")] + self.multipart_staging_dirs = [ + d.strip() for d in multipart_staging_dirs.split(",") + ] self.multipart_size = multipart_size self.multipart_thread_pool_size = multipart_thread_pool_size self.multipart_staging_buffer_size = multipart_staging_buffer_size @@ -2012,7 +2032,7 @@ def __init__( self.mode = mode self.autocommit = autocommit self.mpu: CreateMultipartUploadOutput = None - self.parts: Optional[list] = None + self.parts: list = [] self.append_block = False self.buffer: Optional[io.BytesIO] = io.BytesIO() @@ -2022,7 +2042,7 @@ def __init__( self.staging_buffer_size = fs.multipart_staging_buffer_size self.multipart_threshold = fs.multipart_threshold self.executor = ThreadPoolExecutor(max_workers=self.thread_pool_size) - self.staging_files = [] + self.staging_files: list[str] = [] if "a" in mode and fs.exists(path): head = retryable_func_executor( @@ -2045,7 +2065,6 @@ def _initiate_upload(self) -> None: return logger.debug("Initiate upload for %s", self) - def _upload_chunk(self, final: bool = False) -> bool: """Write one part of a multi-block file upload. @@ -2078,7 +2097,9 @@ def _upload_chunk(self, final: bool = False) -> bool: self.parts = [] self.mpu = retryable_func_executor( - lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key), + lambda: self.fs.tos_client.create_multipart_upload( + self.bucket, self.key + ), max_retry_num=self.fs.max_retry_num, ) @@ -2117,12 +2138,14 @@ def _upload_multiple_chunks(self, bucket: str, key: str) -> None: tmp.write(chunk) self.staging_files.append(tmp.name) - def _upload_staged_files(self): + def _upload_staged_files(self) -> None: futures = [] for i, staging_file in enumerate(self.staging_files): part_number = i + 1 futures.append( - self.executor.submit(self._upload_part_from_file, staging_file, part_number) + self.executor.submit( + self._upload_part_from_file, staging_file, part_number + ) ) for future in futures: diff --git a/tosfs/stability.py b/tosfs/stability.py index b573a3d..6a0e027 100644 --- a/tosfs/stability.py +++ b/tosfs/stability.py @@ -32,8 +32,10 @@ from tosfs.exceptions import TosfsError +CONFLICT_CODE = "409" + TOS_SERVER_RETRYABLE_STATUS_CODES = { - "409", # CONFLICT + CONFLICT_CODE, # CONFLICT "429", # TOO_MANY_REQUESTS "500", # INTERNAL_SERVER_ERROR } @@ -115,8 +117,8 @@ def _is_retryable_tos_server_exception(e: TosError) -> bool: if not isinstance(e, TosServerError): return False - # 409 is a conflict error, but not all conflict errors are retryable - if e.status_code == 409: + # not all conflict errors are retryable + if e.status_code == CONFLICT_CODE: return e.code not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES return e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES