Skip to content

Commit

Permalink
Optimize: Introduce multiple disk write for MPU
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 20, 2024
1 parent 2230c6d commit a75574b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 28 deletions.
44 changes: 23 additions & 21 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def __init__(
default_block_size: Optional[int] = None,
default_fill_cache: bool = True,
default_cache_type: str = "readahead",
multipart_staging_dirs: str = "/tmp",
multipart_staging_dirs: str = tempfile.mkdtemp(),
multipart_size: int = 8388608,
multipart_thread_pool_size: int = 96,
multipart_staging_buffer_size: int = 4096,
Expand Down Expand Up @@ -2044,27 +2044,7 @@ def _initiate_upload(self) -> None:
# only happens when closing small file, use on-shot PUT
return
logger.debug("Initiate upload for %s", self)
self.parts = []

self.mpu = retryable_func_executor(
lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key),
max_retry_num=self.fs.max_retry_num,
)

if self.append_block:
# use existing data in key when appending,
# and block is big enough
out = retryable_func_executor(
lambda: self.fs.tos_client.upload_part_copy(
bucket=self.bucket,
key=self.key,
part_number=1,
upload_id=self.mpu.upload_id,
),
max_retry_num=self.fs.max_retry_num,
)

self.parts.append({"PartNumber": out.part_number, "ETag": out.etag})

def _upload_chunk(self, final: bool = False) -> bool:
"""Write one part of a multi-block file upload.
Expand Down Expand Up @@ -2095,6 +2075,28 @@ def _upload_chunk(self, final: bool = False) -> bool:
# only happens when closing small file, use one-shot PUT
pass
else:
self.parts = []

self.mpu = retryable_func_executor(
lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key),
max_retry_num=self.fs.max_retry_num,
)

if self.append_block:
# use existing data in key when appending,
# and block is big enough
out = retryable_func_executor(
lambda: self.fs.tos_client.upload_part_copy(
bucket=self.bucket,
key=self.key,
part_number=1,
upload_id=self.mpu.upload_id,
),
max_retry_num=self.fs.max_retry_num,
)

self.parts.append({"PartNumber": out.part_number, "ETag": out.etag})

self._upload_multiple_chunks(bucket, key)

if self.autocommit and final:
Expand Down
15 changes: 8 additions & 7 deletions tosfs/stability.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,14 @@ def is_retryable_exception(e: TosError) -> bool:


def _is_retryable_tos_server_exception(e: TosError) -> bool:
return (
isinstance(e, TosServerError)
and e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES
# exclude some special error code under 409(conflict) status code
# let it fast fail
and e.code not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES
)
if not isinstance(e, TosServerError):
return False

# 409 is a conflict error, but not all conflict errors are retryable
if e.status_code == 409:
return e.code not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES

return e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES


def _is_retryable_tos_client_exception(e: TosError) -> bool:
Expand Down

0 comments on commit a75574b

Please sign in to comment.