Skip to content

Commit

Permalink
Optimize: Introduce multiple disk write for MPU
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 20, 2024
1 parent eb03197 commit b621bae
Showing 1 changed file with 38 additions and 76 deletions.
114 changes: 38 additions & 76 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2102,82 +2102,6 @@ def _upload_chunk(self, final: bool = False) -> bool:

return not final

def _fetch_range(self, start: int, end: int) -> bytes:
if start == end:
logger.debug(
"skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d",
self.bucket,
self.key,
start,
end,
)
return b""
logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end)

def fetch() -> bytes:
temp_buffer = io.BytesIO()
for chunk in self.fs.tos_client.get_object(
self.bucket, self.key, self.version_id, range_start=start, range_end=end
):
temp_buffer.write(chunk)
temp_buffer.seek(0)
return temp_buffer.read()

return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num)

# def commit(self) -> None:
# """Complete multipart upload or PUT."""
# logger.debug("Commit %s", self)
# if self.tell() == 0:
# if self.buffer is not None:
# logger.debug("Empty file committed %s", self)
# self._abort_mpu()
# self.fs.touch(self.path, **self.kwargs)
# elif not self.parts:
# if self.buffer is not None:
# logger.debug("One-shot upload of %s", self)
# self.buffer.seek(0)
# data = self.buffer.read()
# write_result = retryable_func_executor(
# lambda: self.fs.tos_client.put_object(
# self.bucket, self.key, content=data
# ),
# max_retry_num=self.fs.max_retry_num,
# )
# else:
# raise RuntimeError
# else:
# logger.debug("Complete multi-part upload for %s ", self)
# write_result = retryable_func_executor(
# lambda: self.fs.tos_client.complete_multipart_upload(
# self.bucket,
# self.key,
# upload_id=self.mpu.upload_id,
# parts=self.parts,
# ),
# max_retry_num=self.fs.max_retry_num,
# )
#
# if self.fs.version_aware:
# self.version_id = write_result.version_id
#
# self.buffer = None

def discard(self) -> None:
"""Close the file without writing."""
self._abort_mpu()
self.buffer = None # file becomes unusable

def _abort_mpu(self) -> None:
if self.mpu:
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.mpu.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.mpu = None

def _upload_multiple_chunks(self, bucket: str, key: str) -> None:
if self.buffer:
self.buffer.seek(0)
Expand Down Expand Up @@ -2230,6 +2154,44 @@ def _upload_part_from_file(self, staging_file: str, part_number: int) -> PartInf
is_completed=None,
)

def _fetch_range(self, start: int, end: int) -> bytes:
if start == end:
logger.debug(
"skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d",
self.bucket,
self.key,
start,
end,
)
return b""
logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end)

def fetch() -> bytes:
temp_buffer = io.BytesIO()
for chunk in self.fs.tos_client.get_object(
self.bucket, self.key, self.version_id, range_start=start, range_end=end
):
temp_buffer.write(chunk)
temp_buffer.seek(0)
return temp_buffer.read()

return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num)

def discard(self) -> None:
"""Close the file without writing."""
self._abort_mpu()
self.buffer = None # file becomes unusable

def _abort_mpu(self) -> None:
if self.mpu:
retryable_func_executor(
lambda: self.fs.tos_client.abort_multipart_upload(
self.bucket, self.key, self.mpu.upload_id
),
max_retry_num=self.fs.max_retry_num,
)
self.mpu = None

def commit(self) -> None:
"""Complete multipart upload or PUT."""
logger.debug("Commit %s", self)
Expand Down

0 comments on commit b621bae

Please sign in to comment.