Skip to content

Commit

Permalink
fix: rate limits uploads to s3 (#163)
Browse files Browse the repository at this point in the history
* fix: rate limits uploads to s3

* format

* format

* format

* format

* Update deepset_cloud_sdk/_utils/ratelimiter.py

Co-authored-by: Tobias Wochinger <[email protected]>

* refactor and unit tests

* fix tests and format

* use pyrate-limiter instead of custom implementation

* remove unused dep

* remove script in pyproject.toml

* remove unrequired test deps

* fix rate limiting

* format

* update gitignore

---------

Co-authored-by: Tobias Wochinger <[email protected]>
  • Loading branch information
rjanjua and wochinge authored Mar 13, 2024
1 parent 5c3e3a8 commit 81e0068
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
6 changes: 5 additions & 1 deletion deepset_cloud_sdk/_s3/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import aiofiles
import aiohttp
import structlog
from pyrate_limiter import Duration, Limiter, Rate
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from tqdm.asyncio import tqdm

Expand Down Expand Up @@ -67,14 +68,15 @@ def make_safe_file_name(file_name: str) -> str:
class S3:
"""Client for S3 operations related to deepset Cloud uploads."""

def __init__(self, concurrency: int = 120):
def __init__(self, concurrency: int = 120, rate_limit: Rate = Rate(3000, Duration.SECOND)):
"""
Initialize the client.
:param concurrency: The number of concurrent upload requests
"""
self.connector = aiohttp.TCPConnector(limit=concurrency)
self.semaphore = asyncio.BoundedSemaphore(concurrency)
self.limiter = Limiter(rate_limit, raise_when_fail=False, max_delay=Duration.SECOND * 1)

@retry(
retry=retry_if_exception_type(RetryableHttpError),
Expand Down Expand Up @@ -102,6 +104,7 @@ async def _upload_file_with_retries(

file_data = self._build_file_data(content, aws_safe_name, aws_config)
try:
self.limiter.try_acquire("") # rate limit requests
async with client_session.post(
aws_config.url,
data=file_data,
Expand All @@ -115,6 +118,7 @@ async def _upload_file_with_retries(
# for example during automatic redirects. See https://github.com/aio-libs/aiohttp/issues/5577
redirect_url = response.headers["Location"]
file_data = self._build_file_data(content, aws_safe_name, aws_config)
self.limiter.try_acquire("") # rate limit requests
async with client_session.post(
redirect_url,
json=file_data,
Expand Down
4 changes: 3 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies = [
"tabulate==0.9.0",
"tqdm==4.66.1",
"yaspin==2.3.0",
"pyrate-limiter==3.4.1",
]

[project.urls]
Expand Down Expand Up @@ -62,7 +63,8 @@ dependencies = [
"httpx==0.26.0",
"python-dotenv==1.0.0",
"tenacity==8.2.3",
"aiohttp==3.9.1"
"aiohttp==3.9.1",
"pyrate-limiter==3.4.1",
]

[tool.hatch.envs.test.scripts]
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/s3/test_upload.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import time
from pathlib import Path
from unittest.mock import Mock, patch

import aiohttp
import pytest
from pyrate_limiter import Duration, Rate
from tqdm.asyncio import tqdm

from deepset_cloud_sdk._api.upload_sessions import UploadSession
Expand Down Expand Up @@ -93,6 +95,17 @@ async def test_upload_files_without_progress(self, post: Mock, upload_session_re
assert results.failed_upload_count == 0
assert len(results.failed) == 0

async def test_upload_rate(self, post: Mock, upload_session_response: UploadSession) -> None:
rate = Rate(3000, Duration.SECOND)
s3 = S3(rate_limit=rate)
number_of_files_to_upload = 9000
files = [DeepsetCloudFile(name=f"{i}.txt", text=f"{i}") for i in range(number_of_files_to_upload)]
start = time.monotonic()
await s3.upload_texts(upload_session_response, files)
time_taken = time.monotonic() - start
expected_time_taken = number_of_files_to_upload / rate.limit
assert time_taken == pytest.approx(expected_time_taken, 1)

async def test_upload_files_from_path_http_error(self, upload_session_response: UploadSession) -> None:
exception = aiohttp.ClientResponseError(request_info=Mock(), history=Mock(), status=503)
with patch.object(aiohttp.ClientSession, "post", side_effect=exception):
Expand Down

0 comments on commit 81e0068

Please sign in to comment.