Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Extract hard code numbers #65

Merged
merged 1 commit into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion tosfs/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,19 @@
"ServiceUnavailable",
}

MANAGED_COPY_THRESHOLD = 5 * 2**30
MANAGED_COPY_MAX_THRESHOLD = 5 * 2**30
MANAGED_COPY_MIN_THRESHOLD = 5 * 2**20

RETRY_NUM = 5
PART_MIN_SIZE = 5 * 2**20
PART_MAX_SIZE = 5 * 2**30

FILE_OPERATION_READ_WRITE_BUFFER_SIZE = 5 * 2**20 # 5MB
PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD = 5 * 2**30 # 5GB
GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE = 2**16 # 64KB
APPEND_OPERATION_SMALL_FILE_THRESHOLD = 5 * 2**20 # 5MB

LS_OPERATION_DEFAULT_MAX_ITEMS = 1000

# environment variable names
ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL"
43 changes: 24 additions & 19 deletions tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,22 @@
)

from tosfs.consts import (
MANAGED_COPY_THRESHOLD,
APPEND_OPERATION_SMALL_FILE_THRESHOLD,
ENV_NAME_TOSFS_LOGGING_LEVEL,
FILE_OPERATION_READ_WRITE_BUFFER_SIZE,
GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE,
LS_OPERATION_DEFAULT_MAX_ITEMS,
MANAGED_COPY_MAX_THRESHOLD,
MANAGED_COPY_MIN_THRESHOLD,
PART_MAX_SIZE,
PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD,
RETRY_NUM,
TOS_SERVER_RESPONSE_CODE_NOT_FOUND,
)
from tosfs.exceptions import TosfsError
from tosfs.fsspec_utils import glob_translate
from tosfs.utils import find_bucket_key, get_brange, retryable_func_wrapper

# environment variable names
ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL"

logger = logging.getLogger("tosfs")


Expand All @@ -73,7 +77,6 @@ class TosFileSystem(AbstractFileSystem):
"""

protocol = ("tos", "tosfs")
default_block_size = 5 * 2**20

def __init__(
self,
Expand All @@ -97,7 +100,9 @@ def __init__(
credentials_provider=credentials_provider,
)
self.version_aware = version_aware
self.default_block_size = default_block_size or self.default_block_size
self.default_block_size = (
default_block_size or FILE_OPERATION_READ_WRITE_BUFFER_SIZE
)
self.default_fill_cache = default_fill_cache
self.default_cache_type = default_cache_type

Expand Down Expand Up @@ -519,7 +524,7 @@ def put_file(
self,
lpath: str,
rpath: str,
chunksize: int = 5 * 2**20,
chunksize: int = FILE_OPERATION_READ_WRITE_BUFFER_SIZE,
**kwargs: Any,
) -> None:
"""Put a file from local to TOS.
Expand Down Expand Up @@ -578,7 +583,7 @@ def put_file(
bucket, key, _ = self._split_path(rpath)

with open(lpath, "rb") as f:
if size < min(5 * 2**30, 2 * chunksize):
if size < min(PUT_OBJECT_OPERATION_SMALL_FILE_THRESHOLD, 2 * chunksize):
chunk = f.read()
self.tos_client.put_object(
bucket,
Expand Down Expand Up @@ -642,10 +647,10 @@ def _read_chunks(body: BinaryIO, f: BinaryIO) -> None:
bytes_read = 0
while True:
try:
chunk = body.read(2**16)
chunk = body.read(GET_OBJECT_OPERATION_DEFAULT_READ_CHUNK_SIZE)
except tos.exceptions.TosClientError as e:
failed_reads += 1
if failed_reads >= self.RETRY_NUM:
if failed_reads >= RETRY_NUM:
raise e
try:
body.close()
Expand Down Expand Up @@ -848,7 +853,7 @@ def cp_file(
path1: str,
path2: str,
preserve_etag: Optional[bool] = None,
managed_copy_threshold: Optional[int] = MANAGED_COPY_THRESHOLD,
managed_copy_threshold: Optional[int] = MANAGED_COPY_MAX_THRESHOLD,
**kwargs: Any,
) -> None:
"""Copy file between locations on tos.
Expand Down Expand Up @@ -897,11 +902,11 @@ def cp_file(
if preserve_etag and parts_suffix:
self._copy_etag_preserved(path1, path2, size, total_parts=int(parts_suffix))
elif size <= min(
MANAGED_COPY_THRESHOLD,
MANAGED_COPY_MAX_THRESHOLD,
(
managed_copy_threshold
if managed_copy_threshold
else MANAGED_COPY_THRESHOLD
else MANAGED_COPY_MAX_THRESHOLD
),
):
self._copy_basic(path1, path2, **kwargs)
Expand Down Expand Up @@ -1065,17 +1070,17 @@ def _copy_managed(
path1: str,
path2: str,
size: int,
block: int = MANAGED_COPY_THRESHOLD,
block: int = MANAGED_COPY_MAX_THRESHOLD,
**kwargs: Any,
) -> None:
"""Copy file between locations on tos as multiple-part.

block: int
The size of the pieces, must be larger than 5MB and at
most MANAGED_COPY_THRESHOLD.
most MANAGED_COPY_MAX_THRESHOLD.
Smaller blocks mean more calls, only useful for testing.
"""
if block < 5 * 2**20 or block > MANAGED_COPY_THRESHOLD:
if block < MANAGED_COPY_MIN_THRESHOLD or block > MANAGED_COPY_MAX_THRESHOLD:
raise ValueError("Copy block size must be 5MB<=block<=5GB")

bucket1, key1, version1 = self._split_path(path1)
Expand Down Expand Up @@ -1485,7 +1490,7 @@ def _lsbuckets(self) -> List[dict]:
def _lsdir(
self,
path: str,
max_items: int = 1000,
max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
delimiter: str = "/",
prefix: str = "",
include_self: bool = False,
Expand Down Expand Up @@ -1555,7 +1560,7 @@ def _lsdir(
def _listdir(
self,
bucket: str,
max_items: int = 1000,
max_items: int = LS_OPERATION_DEFAULT_MAX_ITEMS,
delimiter: str = "/",
prefix: str = "",
include_self: bool = False,
Expand Down Expand Up @@ -1779,7 +1784,7 @@ def __init__(
head = self.fs.tos_client.head_object(bucket, key)
loc = head.content_length

if loc < 5 * 2**20:
if loc < APPEND_OPERATION_SMALL_FILE_THRESHOLD:
# existing file too small for multi-upload: download
self.write(self.fs.cat(self.path))
else:
Expand Down