diff --git a/tosfs/core.py b/tosfs/core.py index 21e1860..9584bb1 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -54,7 +54,7 @@ ) from tosfs.exceptions import TosfsError from tosfs.fsspec_utils import glob_translate -from tosfs.stability import retryable_func +from tosfs.stability import retryable_func_executor from tosfs.utils import find_bucket_key, get_brange logger = logging.getLogger("tosfs") @@ -391,7 +391,7 @@ def rmdir(self, path: str) -> None: if len(self._listdir(bucket, max_items=1, prefix=key.rstrip("/") + "/")) > 0: raise TosfsError(f"Directory {path} is not empty.") - retryable_func( + retryable_func_executor( lambda: self.tos_client.delete_object(bucket, key.rstrip("/") + "/"), max_retry_num=self.max_retry_num, ) @@ -465,7 +465,7 @@ def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: # here we need to create the parent directory recursively self.mkdir(parent, create_parents=True) - retryable_func( + retryable_func_executor( lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"), max_retry_num=self.max_retry_num, ) @@ -474,7 +474,7 @@ def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None: if not self.exists(parent): raise FileNotFoundError(f"Parent directory {parent} does not exist.") else: - retryable_func( + retryable_func_executor( lambda: self.tos_client.put_object(bucket, key.rstrip("/") + "/"), max_retry_num=self.max_retry_num, ) @@ -537,7 +537,7 @@ def touch(self, path: str, truncate: bool = True, **kwargs: Any) -> None: if not truncate and self.exists(path): raise FileExistsError(f"File {path} already exists.") - retryable_func( + retryable_func_executor( lambda: self.tos_client.put_object(bucket, key), max_retry_num=self.max_retry_num, ) @@ -578,7 +578,7 @@ def isdir(self, path: str) -> bool: key = key.rstrip("/") + "/" try: - return retryable_func( + return retryable_func_executor( lambda: self.tos_client.head_object(bucket, key) or True, max_retry_num=self.max_retry_num, ) @@ -614,7 +614,7 @@ def isfile(self, path: str) -> bool: return False try: - return retryable_func( + return retryable_func_executor( lambda: self.tos_client.head_object(bucket, key) or True, max_retry_num=self.max_retry_num, ) @@ -689,7 +689,7 @@ def put_file( with open(lpath, "rb") as f: if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize): chunk = f.read() - retryable_func( + retryable_func_executor( lambda: self.tos_client.put_object( bucket, key, @@ -699,19 +699,19 @@ def put_file( max_retry_num=self.max_retry_num, ) else: - mpu = retryable_func( + mpu = retryable_func_executor( lambda: self.tos_client.create_multipart_upload( bucket, key, content_type=content_type ), max_retry_num=self.max_retry_num, ) - retryable_func( + retryable_func_executor( lambda: self.tos_client.upload_part_from_file( bucket, key, mpu.upload_id, file_path=lpath, part_number=1 ), max_retry_num=self.max_retry_num, ) - retryable_func( + retryable_func_executor( lambda: self.tos_client.complete_multipart_upload( bucket, key, mpu.upload_id, complete_all=True ), @@ -1129,7 +1129,7 @@ def _call_list_objects_type2( continuation_token=continuation_token, ) - resp = retryable_func( + resp = retryable_func_executor( _call_list_objects_type2, args=(continuation_token,), max_retry_num=self.max_retry_num, @@ -1144,7 +1144,7 @@ def _call_list_objects_type2( ] if deleting_objects: - delete_resp = retryable_func( + delete_resp = retryable_func_executor( lambda: self.tos_client.delete_multi_objects( bucket, deleting_objects, quiet=True ), @@ -1164,7 +1164,7 @@ def _copy_basic(self, path1: str, path2: str, **kwargs: Any) -> None: if ver2: raise ValueError("Cannot copy to a versioned file!") - retryable_func( + retryable_func_executor( lambda: self.tos_client.copy_object( bucket=buc2, key=key2, @@ -1185,7 +1185,7 @@ def _copy_etag_preserved( upload_id = None try: - mpu = retryable_func( + mpu = retryable_func_executor( lambda: self.tos_client.create_multipart_upload(bucket2, key2), max_retry_num=self.max_retry_num, ) @@ -1216,7 +1216,7 @@ def _call_upload_part_copy( copy_source_range_end=brange_last, ) - part = retryable_func( + part = retryable_func_executor( _call_upload_part_copy, args=(i, brange_first, brange_last), max_retry_num=self.max_retry_num, @@ -1233,14 +1233,14 @@ def _call_upload_part_copy( ) brange_first += part_size - retryable_func( + retryable_func_executor( lambda: self.tos_client.complete_multipart_upload( bucket2, key2, upload_id, parts ), max_retry_num=self.max_retry_num, ) except Exception as e: - retryable_func( + retryable_func_executor( lambda: self.tos_client.abort_multipart_upload( bucket2, key2, upload_id ), @@ -1272,7 +1272,7 @@ def _copy_managed( upload_id = None try: - mpu = retryable_func( + mpu = retryable_func_executor( lambda: self.tos_client.create_multipart_upload(bucket2, key2), max_retry_num=self.max_retry_num, ) @@ -1293,7 +1293,7 @@ def _call_upload_part_copy( ) out = [ - retryable_func( + retryable_func_executor( _call_upload_part_copy, args=(i, brange_first, brange_last), max_retry_num=self.max_retry_num, @@ -1313,14 +1313,14 @@ def _call_upload_part_copy( for i, o in enumerate(out) ] - retryable_func( + retryable_func_executor( lambda: self.tos_client.complete_multipart_upload( bucket2, key2, upload_id, parts ), max_retry_num=self.max_retry_num, ) except Exception as e: - retryable_func( + retryable_func_executor( lambda: self.tos_client.abort_multipart_upload( bucket2, key2, upload_id ), @@ -1367,7 +1367,7 @@ def _open_remote_file( range_start: int, **kwargs: Any, ) -> Tuple[BinaryIO, int]: - resp = retryable_func( + resp = retryable_func_executor( lambda: self.tos_client.get_object( bucket, key, @@ -1411,7 +1411,7 @@ def _bucket_info(self, bucket: str) -> dict: """ try: - retryable_func( + retryable_func_executor( lambda: self.tos_client.head_bucket(bucket), max_retry_num=self.max_retry_num, ) @@ -1464,7 +1464,7 @@ def _object_info( """ try: - out = retryable_func( + out = retryable_func_executor( lambda: self.tos_client.head_object(bucket, key, version_id=version_id), max_retry_num=self.max_retry_num, ) @@ -1494,7 +1494,7 @@ def _try_dir_info(self, bucket: str, key: str, path: str, fullpath: str) -> dict try: # We check to see if the path is a directory by attempting to list its # contexts. If anything is found, it is indeed a directory - out = retryable_func( + out = retryable_func_executor( lambda: self.tos_client.list_objects_type2( bucket, prefix=key.rstrip("/") + "/" if key else "", @@ -1597,7 +1597,7 @@ def _exists_bucket(self, bucket: str) -> bool: """ try: - retryable_func( + retryable_func_executor( lambda: self.tos_client.head_bucket(bucket), max_retry_num=self.max_retry_num, ) @@ -1652,7 +1652,7 @@ def _exists_object( """ try: - return retryable_func( + return retryable_func_executor( lambda: self.tos_client.head_object(bucket, key) or True, max_retry_num=self.max_retry_num, ) @@ -1692,7 +1692,7 @@ def _lsbuckets(self) -> List[dict]: """ try: - resp = retryable_func( + resp = retryable_func_executor( lambda: self.tos_client.list_buckets(), max_retry_num=self.max_retry_num ) except (TosClientError, TosServerError) as e: @@ -1843,7 +1843,7 @@ def _call_list_object_versions( version_id_marker=version_id_marker, ) - resp = retryable_func( + resp = retryable_func_executor( _call_list_object_versions, args=(key_marker, version_id_marker), max_retry_num=self.max_retry_num, @@ -1872,7 +1872,7 @@ def _call_list_objects_type2( continuation_token=continuation_token, ) - resp = retryable_func( + resp = retryable_func_executor( _call_list_objects_type2, args=(continuation_token,), max_retry_num=self.max_retry_num, @@ -1892,7 +1892,7 @@ def _rm(self, path: str) -> None: key = key.rstrip("/") + "/" try: - retryable_func( + retryable_func_executor( lambda: self.tos_client.delete_object(bucket, key), max_retry_num=self.max_retry_num, ) @@ -2164,7 +2164,7 @@ def fetch() -> bytes: bucket, key, version_id, range_start=start, range_end=end ).read() - return retryable_func(fetch, max_retry_num=self.fs.max_retry_num) + return retryable_func_executor(fetch, max_retry_num=self.fs.max_retry_num) def commit(self) -> None: """Complete multipart upload or PUT.""" diff --git a/tosfs/stability.py b/tosfs/stability.py index 9a01b68..f8479ad 100644 --- a/tosfs/stability.py +++ b/tosfs/stability.py @@ -68,7 +68,7 @@ MAX_RETRY_NUM = 20 -def retryable_func( +def retryable_func_executor( func: Any, *, args: Tuple[Any, ...] = (),