Skip to content

Commit

Permalink
Core: Extract hard code numbers (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua authored Sep 10, 2024
1 parent 51496f7 commit c6cd336
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
13 changes: 12 additions & 1 deletion tosfs/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,19 @@
"ServiceUnavailable",
}

MANAGED_COPY_THRESHOLD = 5 * 2**30
MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30
MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20

RETRY_NUM = 5
PART_MIN_SIZE = 5 * 2**20
PART_MAX_SIZE = 5 * 2**30

FILE_OPERATION_READ_WRITE_BUFFER_SIZE = 5 * 2**20 # 5MB
PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD = 5 * 2**30 # 5GB
GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE = 2**16 # 64KB
APPEND_OPERATION_SMALL_FILE_THRESHOLD = 5 * 2**20 # 5MB

LS_OPERATION_DEFAULT_MAX_ITEMS = 1000

# environment variable names
ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL"
43 changes: 24 additions & 19 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,22 @@
)

from tosfs.consts import (
MANAGED_COPY_THRESHOLD,
APPEND_OPERATION_SMALL_FILE_THRESHOLD,
ENV_NAME_TOSFS_LOGGING_LEVEL,
FILE_OPERATION_READ_WRITE_BUFFER_SIZE,
GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE,
LS_OPERATION_DEFAULT_MAX_ITEMS,
MANAGED_COPY_MAX_THRESHOLD,
MANAGED_COPY_MIN_THRESHOLD,
PART_MAX_SIZE,
PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD,
RETRY_NUM,
TOS_SERVER_RESPONSE_CODE_NOT_FOUND,
)
from tosfs.exceptions import TosfsError
from tosfs.fsspec_utils import glob_translate
from tosfs.utils import find_bucket_key, get_brange, retryable_func_wrapper

# environment variable names
ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL"

logger = logging.getLogger("tosfs")


Expand All @@ -73,7 +77,6 @@ class TosFileSystem(AbstractFileSystem):
"""

protocol = ("tos", "tosfs")
default_block_size = 5 * 2**20

def __init__(
self,
Expand All @@ -97,7 +100,9 @@ def __init__(
credentials_provider=credentials_provider,
)
self.version_aware = version_aware
self.default_block_size = default_block_size or self.default_block_size
self.default_block_size = (
default_block_size or FILE_OPERATION_READ_WRITE_BUFFER_SIZE
)
self.default_fill_cache = default_fill_cache
self.default_cache_type = default_cache_type

Expand Down Expand Up @@ -519,7 +524,7 @@ def put_file(
self,
lpath: str,
rpath: str,
chunksize: int = 5 * 2**20,
chunksize: int = FILE_OPERATION_READ_WRITE_BUFFER_SIZE,
**kwargs: Any,
) -> None:
"""Put a file from local to TOS.
Expand Down Expand Up @@ -578,7 +583,7 @@ def put_file(
bucket, key, _ = self._split_path(rpath)

with open(lpath, "rb") as f:
if size < min(5 * 2**30, 2 * chunksize):
if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize):
chunk = f.read()
self.tos_client.put_object(
bucket,
Expand Down Expand Up @@ -642,10 +647,10 @@ def _read_chunks(body: BinaryIO, f: BinaryIO) -> None:
bytes_read = 0
while True:
try:
chunk = body.read(2**16)
chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE)
except tos.exceptions.TosClientError as e:
failed_reads += 1
if failed_reads >= self.RETRY_NUM:
if failed_reads >= RETRY_NUM:
raise e
try:
body.close()
Expand Down Expand Up @@ -848,7 +853,7 @@ def cp_file(
path1: str,
path2: str,
preserve_etag: Optional[bool] = None,
managed_copy_threshold: Optional[int] = MANAGED_COPY_THRESHOLD,
managed_copy_threshold: Optional[int] = MANAGED_COPY_MAX_THRESHOLD,
**kwargs: Any,
) -> None:
"""Copy file between locations on tos.
Expand Down Expand Up @@ -897,11 +902,11 @@ def cp_file(
if preserve_etag and parts_suffix:
self._copy_etag_preserved(path1, path2, size, total_parts=int(parts_suffix))
elif size <= min(
MANAGED_COPY_THRESHOLD,
MANAGED_COPY_MAX_THRESHOLD,
(
managed_copy_threshold
if managed_copy_threshold
else MANAGED_COPY_THRESHOLD
else MANAGED_COPY_MAX_THRESHOLD
),
):
self._copy_basic(path1, path2, **kwargs)
Expand Down Expand Up @@ -1065,17 +1070,17 @@ def _copy_managed(
path1: str,
path2: str,
size: int,
block: int = MANAGED_COPY_THRESHOLD,
block: int = MANAGED_COPY_MAX_THRESHOLD,
**kwargs: Any,
) -> None:
"""Copy file between locations on tos as multiple-part.
block: int
The size of the pieces, must be larger than 5MB and at
most MANAGED_COPY_THRESHOLD.
most MANAGED_COPY_MAX_THRESHOLD.
Smaller blocks mean more calls, only useful for testing.
"""
if block < 5 * 2**20 or block > MANAGED_COPY_THRESHOLD:
if block < MANAGED_COPY_MIN_THRESHOLD or block > MANAGED_COPY_MAX_THRESHOLD:
raise ValueError("Copy block size must be 5MB<=block<=5GB")

bucket1, key1, version1 = self._split_path(path1)
Expand Down Expand Up @@ -1485,7 +1490,7 @@ def _lsbuckets(self) -> List[dict]:
def _lsdir(
self,
path: str,
max_items: int = 1000,
max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
delimiter: str = "/",
prefix: str = "",
include_self: bool = False,
Expand Down Expand Up @@ -1555,7 +1560,7 @@ def _lsdir(
def _listdir(
self,
bucket: str,
max_items: int = 1000,
max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
delimiter: str = "/",
prefix: str = "",
include_self: bool = False,
Expand Down Expand Up @@ -1779,7 +1784,7 @@ def __init__(
head = self.fs.tos_client.head_object(bucket, key)
loc = head.content_length

if loc < 5 * 2**20:
if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD:
# existing file too small for multi-upload: download
self.write(self.fs.cat(self.path))
else:
Expand Down

0 comments on commit c6cd336

Please sign in to comment.