From a75574bd4b6ed4b059696aa2b6428842d30750ff Mon Sep 17 00:00:00 2001 From: yanghua Date: Fri, 20 Sep 2024 11:54:21 +0800 Subject: [PATCH] Optimize: Introduce multiple disk write for MPU --- tosfs/core.py | 44 +++++++++++++++++++++++--------------------- tosfs/stability.py | 15 ++++++++------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 145c853..d1aed62 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -116,7 +116,7 @@ def __init__( default_block_size: Optional[int] = None, default_fill_cache: bool = True, default_cache_type: str = "readahead", - multipart_staging_dirs: str = "/tmp", + multipart_staging_dirs: str = tempfile.mkdtemp(), multipart_size: int = 8388608, multipart_thread_pool_size: int = 96, multipart_staging_buffer_size: int = 4096, @@ -2044,27 +2044,7 @@ def _initiate_upload(self) -> None: # only happens when closing small file, use on-shot PUT return logger.debug("Initiate upload for %s", self) - self.parts = [] - self.mpu = retryable_func_executor( - lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key), - max_retry_num=self.fs.max_retry_num, - ) - - if self.append_block: - # use existing data in key when appending, - # and block is big enough - out = retryable_func_executor( - lambda: self.fs.tos_client.upload_part_copy( - bucket=self.bucket, - key=self.key, - part_number=1, - upload_id=self.mpu.upload_id, - ), - max_retry_num=self.fs.max_retry_num, - ) - - self.parts.append({"PartNumber": out.part_number, "ETag": out.etag}) def _upload_chunk(self, final: bool = False) -> bool: """Write one part of a multi-block file upload. @@ -2095,6 +2075,28 @@ def _upload_chunk(self, final: bool = False) -> bool: # only happens when closing small file, use one-shot PUT pass else: + self.parts = [] + + self.mpu = retryable_func_executor( + lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key), + max_retry_num=self.fs.max_retry_num, + ) + + if self.append_block: + # use existing data in key when appending, + # and block is big enough + out = retryable_func_executor( + lambda: self.fs.tos_client.upload_part_copy( + bucket=self.bucket, + key=self.key, + part_number=1, + upload_id=self.mpu.upload_id, + ), + max_retry_num=self.fs.max_retry_num, + ) + + self.parts.append({"PartNumber": out.part_number, "ETag": out.etag}) + self._upload_multiple_chunks(bucket, key) if self.autocommit and final: diff --git a/tosfs/stability.py b/tosfs/stability.py index f8479ad..b573a3d 100644 --- a/tosfs/stability.py +++ b/tosfs/stability.py @@ -112,13 +112,14 @@ def is_retryable_exception(e: TosError) -> bool: def _is_retryable_tos_server_exception(e: TosError) -> bool: - return ( - isinstance(e, TosServerError) - and e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES - # exclude some special error code under 409(conflict) status code - # let it fast fail - and e.code not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES - ) + if not isinstance(e, TosServerError): + return False + + # 409 is a conflict error, but not all conflict errors are retryable + if e.status_code == 409: + return e.code not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES + + return e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES def _is_retryable_tos_client_exception(e: TosError) -> bool: