Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance retry sleep logic #99

Merged
merged 5 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ ignore = [
"PLR0913", # Too many arguments in function definition
"SIM105", # Use `contextlib.suppress(IOError)` instead of `try`-`except`-`pass`
"PERF203", # `try`-`except` within a loop incurs performance overhead
"PLR1714", # Consider merging multiple comparisons. Use a `set` if the elements are hashable
]

[tool.ruff.lint.per-file-ignores]
Expand Down
2 changes: 1 addition & 1 deletion tosfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
)
from tosfs.exceptions import TosfsError
from tosfs.fsspec_utils import glob_translate
from tosfs.stability import retryable_func_executor
from tosfs.retry import retryable_func_executor
from tosfs.utils import find_bucket_key, get_brange

logger = logging.getLogger("tosfs")
Expand Down
38 changes: 32 additions & 6 deletions tosfs/stability.py → tosfs/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""The module contains utility functions for the tosfs stability."""

"""The module contains retry utility functions for the tosfs stability."""
import math
import time
from typing import Any, Optional, Tuple

Expand All @@ -33,11 +33,14 @@
from tosfs.exceptions import TosfsError

CONFLICT_CODE = "409"
TOO_MANY_REQUESTS_CODE = "429"
SERVICE_UNAVAILABLE = "503"

TOS_SERVER_RETRYABLE_STATUS_CODES = {
CONFLICT_CODE, # CONFLICT
"429", # TOO_MANY_REQUESTS
"500", # INTERNAL_SERVER_ERROR
CONFLICT_CODE,
TOO_MANY_REQUESTS_CODE,
"500", # INTERNAL_SERVER_ERROR,
SERVICE_UNAVAILABLE,
}

TOS_SERVER_NOT_RETRYABLE_CONFLICT_ERROR_CODES = {
Expand Down Expand Up @@ -68,6 +71,8 @@
}

MAX_RETRY_NUM = 20
SLEEP_BASE_SECONDS = 0.1
SLEEP_MAX_SECONDS = 60


def retryable_func_executor(
Expand Down Expand Up @@ -99,7 +104,8 @@ def retryable_func_executor(
"Retry TOS request in the %d times, error: %s", attempt, e
)
try:
time.sleep(min(1.7**attempt * 0.1, 15))
sleep_time = _get_sleep_time(e, attempt)
time.sleep(sleep_time)
except InterruptedError as ie:
raise TosfsError(f"Request {func} interrupted.") from ie
else:
Expand Down Expand Up @@ -132,3 +138,23 @@ def _is_retryable_tos_client_exception(e: TosError) -> bool:
return isinstance(e, TosClientError) and any(
isinstance(e.cause, excp) for excp in TOS_CLIENT_RETRYABLE_EXCEPTIONS
)


def _get_sleep_time(err: TosError, retry_count: int) -> float:
sleep_time = SLEEP_BASE_SECONDS * math.pow(2, retry_count)
sleep_time = min(sleep_time, SLEEP_MAX_SECONDS)
if (
isinstance(err, TosServerError)
and (
err.status_code == TOO_MANY_REQUESTS_CODE
or err.status_code == SERVICE_UNAVAILABLE
)
and "retry-after" in err.headers
):
try:
sleep_time = max(int(err.headers["retry-after"]), int(sleep_time))
except Exception as e:
from tosfs.core import logger

logger.warning("try to parse retry-after from headers error: {}".format(e))
return sleep_time