Skip to content

Commit

Permalink
Core: Implement read API
Browse files Browse the repository at this point in the history
  • Loading branch information
yanghua committed Sep 3, 2024
1 parent 037788b commit 8bc3975
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 2 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ignore = [
"D213", # multi-line-summary-second-line
"PLR0913", # Too many arguments in function definition
"SIM105", # Use `contextlib.suppress(IOError)` instead of `try`-`except`-`pass`
"PERF203", # `try`-`except` within a loop incurs performance overhead
]

[tool.ruff.lint.per-file-ignores]
Expand Down
10 changes: 10 additions & 0 deletions tosfs/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,13 @@

# Tos server response codes
TOS_SERVER_RESPONSE_CODE_NOT_FOUND = 404

TOS_SERVER_RETRYABLE_ERROR_CODE_SET = {
"IncompleteBody",
"ExceedAccountQPSLimit",
"ExceedAccountRateLimit",
"ExceedBucketQPSLimit",
"ExceedBucketRateLimit",
"InternalError",
"ServiceUnavailable",
}
22 changes: 21 additions & 1 deletion tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@

from tosfs.consts import TOS_SERVER_RESPONSE_CODE_NOT_FOUND
from tosfs.exceptions import TosfsError
from tosfs.utils import find_bucket_key
from tosfs.utils import find_bucket_key, retryable_func_wrapper

# environment variable names
ENV_NAME_TOSFS_LOGGING_LEVEL = "TOSFS_LOGGING_LEVEL"
Expand Down Expand Up @@ -1556,6 +1556,26 @@ def handle_remainder(
else None
)

def _fetch_range(self, start: int, end: int) -> bytes:
bucket, key, version_id = self.fs._split_path(self.path)
if start == end:
logger.debug(
"skip fetch for negative range - bucket=%s,key=%s,start=%d,end=%d",
bucket,
key,
start,
end,
)
return b""
logger.debug("Fetch: %s/%s, %s-%s", bucket, key, start, end)

def fetch() -> bytes:
return self.fs.tos_client.get_object(
bucket, key, version_id, range_start=start, range_end=end
).read()

return retryable_func_wrapper(fetch, retries=self.fs.retries)

def commit(self) -> None:
"""Complete multipart upload or PUT."""
logger.debug("Commit %s", self)
Expand Down
72 changes: 72 additions & 0 deletions tosfs/tests/test_tosfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,75 @@ def test_file_write_mpu(
)

tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")


def test_file_read(tosfs: TosFileSystem, bucket: str, temporary_workspace: str) -> None:
file_name = random_str()
content = "hello world"
with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f:
f.write(content)

with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f:
assert f.read() == content

with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "rb") as f:
assert f.read().decode() == content

tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")


def test_file_read_encdec(
tosfs: TosFileSystem, bucket: str, temporary_workspace: str
) -> None:
file_name = random_str()
content = "你好"
with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "wb") as f:
f.write(content.encode("gbk"))

with tosfs.open(
f"{bucket}/{temporary_workspace}/{file_name}", "r", encoding="gbk"
) as f:
assert f.read() == content

tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")

content = "\u00af\\_(\u30c4)_/\u00af"
with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "wb") as f:
f.write(content.encode("utf-16-le"))

with tosfs.open(
f"{bucket}/{temporary_workspace}/{file_name}", "r", encoding="utf-16-le"
) as f:
assert f.read() == content

tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")

content = "Hello, World!"
with tosfs.open(
f"{bucket}/{temporary_workspace}/{file_name}", "w", encoding="ibm500"
) as f:
f.write(content)

with tosfs.open(
f"{bucket}/{temporary_workspace}/{file_name}", "r", encoding="ibm500"
) as f:
assert f.read() == content

tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")


def test_file_readlines(
tosfs: TosFileSystem, bucket: str, temporary_workspace: str
) -> None:
file_name = random_str()
content = "hello\nworld"
with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "w") as f:
f.write(content)

with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "r") as f:
assert f.readlines() == ["hello\n", "world"]

with tosfs.open(f"{bucket}/{temporary_workspace}/{file_name}", "rb") as f:
assert f.readlines() == [b"hello\n", b"world"]

tosfs.rm_file(f"{bucket}/{temporary_workspace}/{file_name}")
38 changes: 37 additions & 1 deletion tosfs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import re
import string
import tempfile
from typing import Tuple
import time
from typing import Any, Optional, Tuple

import tos

from tosfs.consts import TOS_SERVER_RETRYABLE_ERROR_CODE_SET


def random_str(length: int = 5) -> str:
Expand Down Expand Up @@ -84,3 +89,34 @@ def find_bucket_key(tos_path: str) -> Tuple[str, str]:
if len(tos_components) > 1:
tos_key = tos_components[1]
return bucket, tos_key


def retryable_func_wrapper(
func: Any, *, args: tuple[()] = (), kwargs: Optional[Any] = None, retries: int = 5
) -> Any:
"""Retry a function in case of server errors."""
if kwargs is None:
kwargs = {}

err = None

for i in range(retries):
try:
return func(*args, **kwargs)
except tos.exceptions.TosServerError as e:
err = e
from tosfs.core import logger

logger.debug("Server error (maybe retryable): %s", e)
if e.code in TOS_SERVER_RETRYABLE_ERROR_CODE_SET:
time.sleep(min(1.7**i * 0.1, 15))
else:
break
except Exception as e:
err = e
from tosfs.core import logger

logger.debug("Nonretryable error: %s", e)
break

raise err if err is not None else ""

0 comments on commit 8bc3975

Please sign in to comment.