Skip to content

Commit

Permalink
Optimize: Introduce multiple disk write for MPU
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 20, 2024
1 parent a75574b commit a229c49
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 13 deletions.
43 changes: 33 additions & 10 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
ListObjectVersionsOutput,
PartInfo,
UploadPartCopyOutput,
UploadPartOutput,
)

from tosfs.consts import (
Expand Down Expand Up @@ -117,8 +116,8 @@ def __init__(
default_fill_cache: bool = True,
default_cache_type: str = "readahead",
multipart_staging_dirs: str = tempfile.mkdtemp(),
multipart_size: int = 8388608,
multipart_thread_pool_size: int = 96,
multipart_size: int = 4096,
multipart_thread_pool_size: int = max(2, os.cpu_count() or 1),
multipart_staging_buffer_size: int = 4096,
multipart_threshold: int = 10485760,
**kwargs: Any,
Expand Down Expand Up @@ -165,6 +164,25 @@ def __init__(
Whether to fill the cache (default is True).
default_cache_type : str, optional
The default cache type (default is 'readahead').
multipart_staging_dirs : str, optional
The staging directories for multipart uploads (default is a temporary
directory). Separate the staging dirs with comma if there are many
staging dir paths.
multipart_size : int, optional
The max byte size which will buffer the staging data in-memory before
flushing to the staging file. Decrease the random write in local staging
disk dramatically if writing plenty of small files. (default is 4096).
multipart_thread_pool_size : int, optional
The size of the thread pool for multipart uploads (default is
max(2, os.cpu_count()).
multipart_staging_buffer_size : int, optional
The size of thread pool used for uploading multipart in parallel for the
given object storage. (default is 4096).
multipart_threshold : int, optional
The threshold which control whether enable multipart upload during
writing data to the given object storage, if the write data size is less
than threshold, will write data via simple put instead of multipart upload.
(default is 10 MB).
kwargs : Any, optional
Additional arguments.
Expand Down Expand Up @@ -192,7 +210,9 @@ def __init__(
self.default_cache_type = default_cache_type
self.max_retry_num = max_retry_num

self.multipart_staging_dirs = [d.strip() for d in multipart_staging_dirs.split(",")]
self.multipart_staging_dirs = [
d.strip() for d in multipart_staging_dirs.split(",")
]
self.multipart_size = multipart_size
self.multipart_thread_pool_size = multipart_thread_pool_size
self.multipart_staging_buffer_size = multipart_staging_buffer_size
Expand Down Expand Up @@ -2012,7 +2032,7 @@ def __init__(
self.mode = mode
self.autocommit = autocommit
self.mpu: CreateMultipartUploadOutput = None
self.parts: Optional[list] = None
self.parts: list = []
self.append_block = False
self.buffer: Optional[io.BytesIO] = io.BytesIO()

Expand All @@ -2022,7 +2042,7 @@ def __init__(
self.staging_buffer_size = fs.multipart_staging_buffer_size
self.multipart_threshold = fs.multipart_threshold
self.executor = ThreadPoolExecutor(max_workers=self.thread_pool_size)
self.staging_files = []
self.staging_files: list[str] = []

if "a" in mode and fs.exists(path):
head = retryable_func_executor(
Expand All @@ -2045,7 +2065,6 @@ def _initiate_upload(self) -> None:
return
logger.debug("Initiate upload for %s", self)


def _upload_chunk(self, final: bool = False) -> bool:
"""Write one part of a multi-block file upload.
Expand Down Expand Up @@ -2078,7 +2097,9 @@ def _upload_chunk(self, final: bool = False) -> bool:
self.parts = []

self.mpu = retryable_func_executor(
lambda: self.fs.tos_client.create_multipart_upload(self.bucket, self.key),
lambda: self.fs.tos_client.create_multipart_upload(
self.bucket, self.key
),
max_retry_num=self.fs.max_retry_num,
)

Expand Down Expand Up @@ -2117,12 +2138,14 @@ def _upload_multiple_chunks(self, bucket: str, key: str) -> None:
tmp.write(chunk)
self.staging_files.append(tmp.name)

def _upload_staged_files(self):
def _upload_staged_files(self) -> None:
futures = []
for i, staging_file in enumerate(self.staging_files):
part_number = i + 1
futures.append(
self.executor.submit(self._upload_part_from_file, staging_file, part_number)
self.executor.submit(
self._upload_part_from_file, staging_file, part_number
)
)

for future in futures:
Expand Down
8 changes: 5 additions & 3 deletions tosfs/stability.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@

from tosfs.exceptions import TosfsError

CONFLICT_CODE = "409"

TOS_SERVER_RETRYABLE_STATUS_CODES = {
"409", # CONFLICT
CONFLICT_CODE, # CONFLICT
"429", # TOO_MANY_REQUESTS
"500", # INTERNAL_SERVER_ERROR
}
Expand Down Expand Up @@ -115,8 +117,8 @@ def _is_retryable_tos_server_exception(e: TosError) -> bool:
if not isinstance(e, TosServerError):
return False

# 409 is a conflict error, but not all conflict errors are retryable
if e.status_code == 409:
# not all conflict errors are retryable
if e.status_code == CONFLICT_CODE:
return e.code not in TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES

return e.status_code in TOS_SERVER_RETRYABLE_STATUS_CODES
Expand Down

0 comments on commit a229c49

Please sign in to comment.