diff --git a/pyproject.toml b/pyproject.toml index 68b4966..db1a095 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,7 @@ ignore = [ "PLR0913", # Too many arguments in function definition "SIM105", # Use `contextlib.suppress(IOError)` instead of `try`-`except`-`pass` "PERF203", # `try`-`except` within a loop incurs performance overhead + "PLR1714", # Consider merging multiple comparisons. Use a `set` if the elements are hashable ] [tool.ruff.lint.per-file-ignores] diff --git a/tosfs/core.py b/tosfs/core.py index bb62302..af16111 100644 --- a/tosfs/core.py +++ b/tosfs/core.py @@ -57,7 +57,7 @@ ) from tosfs.exceptions import TosfsError from tosfs.fsspec_utils import glob_translate -from tosfs.stability import retryable_func_executor +from tosfs.retry import retryable_func_executor from tosfs.utils import find_bucket_key, get_brange logger = logging.getLogger("tosfs") diff --git a/tosfs/stability.py b/tosfs/retry.py similarity index 77% rename from tosfs/stability.py rename to tosfs/retry.py index 9293959..f981bce 100644 --- a/tosfs/stability.py +++ b/tosfs/retry.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""The module contains utility functions for the tosfs stability.""" - +"""The module contains retry utility functions for the tosfs stability.""" +import math import time from typing import Any, Optional, Tuple @@ -33,11 +33,14 @@ from tosfs.exceptions import TosfsError CONFLICT_CODE = "409" +TOO_MANY_REQUESTS_CODE = "429" +SERVICE_UNAVAILABLE = "503" TOS_SERVER_RETRYABLE_STATUS_CODES = { - CONFLICT_CODE, # CONFLICT - "429", # TOO_MANY_REQUESTS - "500", # INTERNAL_SERVER_ERROR + CONFLICT_CODE, + TOO_MANY_REQUESTS_CODE, + "500", # INTERNAL_SERVER_ERROR, + SERVICE_UNAVAILABLE, } TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES = { @@ -68,6 +71,8 @@ } MAX_RETRY_NUM = 20 +SLEEP_BASE_SECONDS = 0.1 +SLEEP_MAX_SECONDS = 60 def retryable_func_executor( @@ -99,7 +104,8 @@ def retryable_func_executor( "Retry TOS request in the %d times, error: %s", attempt, e ) try: - time.sleep(min(1.7**attempt * 0.1, 15)) + sleep_time = _get_sleep_time(e, attempt) + time.sleep(sleep_time) except InterruptedError as ie: raise TosfsError(f"Request {func} interrupted.") from ie else: @@ -132,3 +138,23 @@ def _is_retryable_tos_client_exception(e: TosError) -> bool: return isinstance(e, TosClientError) and any( isinstance(e.cause, excp) for excp in TOS_CLIENT_RETRYABLE_EXCEPTIONS ) + + +def _get_sleep_time(err: TosError, retry_count: int) -> float: + sleep_time = SLEEP_BASE_SECONDS * math.pow(2, retry_count) + sleep_time = min(sleep_time, SLEEP_MAX_SECONDS) + if ( + isinstance(err, TosServerError) + and ( + err.status_code == TOO_MANY_REQUESTS_CODE + or err.status_code == SERVICE_UNAVAILABLE + ) + and "retry-after" in err.headers + ): + try: + sleep_time = max(int(err.headers["retry-after"]), int(sleep_time)) + except Exception as e: + from tosfs.core import logger + + logger.warning("try to parse retry-after from headers error: {}".format(e)) + return sleep_time