From 79726314997fcc0e4cc81008dd1095ced1852eca Mon Sep 17 00:00:00 2001 From: yanghua Date: Mon, 16 Sep 2024 23:14:49 +0800 Subject: [PATCH] Performance: Reduce request numbers in exists --- tosfs/core.py | 104 +++++++++++++++++++------------------------------- 1 file changed, 39 insertions(+), 65 deletions(-) diff --git a/tosfs/core.py b/tosfs/core.py index 7de2b35..86dd1d3 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -171,6 +171,9 @@ def __init__( socket_timeout=socket_timeout, high_latency_log_threshold=high_latency_log_threshold, credentials_provider=credentials_provider, + enable_crc=False, + enable_verify_ssl=False, + disable_encoding_meta=True, ) self.version_aware = version_aware self.default_block_size = ( @@ -1560,10 +1563,31 @@ def exists(self, path: str, **kwargs: Any) -> bool: # if the path is a bucket if not key: return self._exists_bucket(bucket) - elif self.isfile(path): - return self._exists_object(bucket, key, path, version_id) - else: - return self._exists_object(bucket, key.rstrip("/") + "/", path, version_id) + + try: + return retryable_func_executor( + lambda: self.tos_client.head_object(bucket, key) or True, + max_retry_num=self.max_retry_num, + ) + except TosServerError as e: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: + try: + return retryable_func_executor( + lambda: self.tos_client.head_object( + bucket, key.rstrip("/") + "/" + ) + or True, + max_retry_num=self.max_retry_num, + ) + except TosServerError as ex: + if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: + return False + else: + raise ex + else: + raise e + except Exception as ex: + raise TosfsError(f"Tosfs failed with unknown error: {ex}") from ex def _exists_bucket(self, bucket: str) -> bool: """Check if a bucket exists in the TOS. @@ -1612,60 +1636,6 @@ def _exists_bucket(self, bucket: str) -> bool: except Exception as e: raise TosfsError(f"Tosfs failed with unknown error: {e}") from e - def _exists_object( - self, bucket: str, key: str, path: str, version_id: Optional[str] = None - ) -> bool: - """Check if an object exists in the TOS. - - Parameters - ---------- - bucket : str - The name of the bucket. - key : str - The key of the object. - path : str - The full path of the object. - version_id : str, optional - The version ID of the object (default is None). - - Returns - ------- - bool - True if the object exists, False otherwise. - - Raises - ------ - tos.exceptions.TosClientError - If there is a client error while checking the object. - tos.exceptions.TosServerError - If there is a server error while checking the object. - TosfsError - If there is an unknown error while checking the object. - - Examples - -------- - >>> fs = TosFileSystem() - >>> fs._exists_object("mybucket", "myfile", "tos://mybucket/myfile") - True - >>> fs._exists_object("mybucket", "nonexistentfile", "tos://mybucket/nonexistentfile") - False - - """ - try: - return retryable_func_executor( - lambda: self.tos_client.head_object(bucket, key) or True, - max_retry_num=self.max_retry_num, - ) - except TosClientError as e: - raise e - except TosServerError as e: - if e.status_code == TOS_SERVER_STATUS_CODE_NOT_FOUND: - return False - else: - raise e - except Exception as e: - raise TosfsError(f"Tosfs failed with unknown error: {e}") from e - def _lsbuckets(self) -> List[dict]: """List all buckets in the account. @@ -2003,6 +1973,7 @@ def __init__( self.fs = fs self.bucket = bucket self.key = key + self.version_id = path_version_id self.path = path self.mode = mode self.autocommit = autocommit @@ -2163,22 +2134,25 @@ def _call_upload_part( ) def _fetch_range(self, start: int, end: int) -> bytes: - bucket, key, version_id = self.fs._split_path(self.path) if start == end: logger.debug( "skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d", - bucket, - key, + self.bucket, + self.key, start, end, ) return b"" - logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end) + logger.debug("Fetch: %s/%s, %s-%s", self.bucket, self.key, start, end) def fetch() -> bytes: - return self.fs.tos_client.get_object( - bucket, key, version_id, range_start=start, range_end=end - ).read() + temp_buffer = io.BytesIO() + for chunk in self.fs.tos_client.get_object( + self.bucket, self.key, self.version_id, range_start=start, range_end=end + ): + temp_buffer.write(chunk) + temp_buffer.seek(0) + return temp_buffer.read() return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num)