Skip to content

Commit

Permalink
Fixed upload_fileobj handling less than requested bytes returned
Browse files Browse the repository at this point in the history
  • Loading branch information
terricain committed Jun 5, 2024
1 parent 0696d02 commit 72c9dbe
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 13 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
History
=======

13.0.1 (2024-06-05)
-------------------

* Fixed issue with upload_fileobj where uploads would be incomplete if the async file object returned less bytes than
the read requested. This is noticable when passing in async streams like that of `aiohttp`'s response content.

13.0.0 (2024-05-27)
-------------------

Expand Down
32 changes: 19 additions & 13 deletions aioboto3/s3/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,24 @@ async def upload_fileobj(
complete_upload_args = {k: v for k, v in kwargs.items() if k in UploadSubmissionTask.COMPLETE_MULTIPART_ARGS}
Config = Config or S3TransferConfig()

# Read
initial_data = Fileobj.read(Config.multipart_threshold)
if inspect.isawaitable(initial_data):
initial_data = await initial_data
if Processing:
initial_data = Processing(initial_data)
async def fileobj_read(num_bytes: int, process: bool = False) -> bytes:
data = Fileobj.read(num_bytes)
if inspect.isawaitable(data):
data = await data
else:
await asyncio.sleep(0.0) # Yield to the eventloop incase .read() took ages

if process:
data = Processing(data)
return data

# So some streams might return less than Config.multipart_threshold on a read, but that might not be eof
initial_data = b''
while len(initial_data) < Config.multipart_threshold:
new_data = await fileobj_read(Config.multipart_threshold)
if new_data == b'':
break
initial_data += new_data

if len(initial_data) < Config.multipart_threshold:
# Do put_object
Expand Down Expand Up @@ -433,13 +445,7 @@ async def file_reader() -> None:
while len(multipart_payload) < Config.multipart_chunksize:
try:
# Handles if .read() returns anything that can be awaited
data_chunk = Fileobj.read(Config.io_chunksize)
if inspect.isawaitable(data_chunk):
# noinspection PyUnresolvedReferences
data = await data_chunk
else:
data = data_chunk
await asyncio.sleep(0.0) # Yield to the eventloop incase .read() took ages
data = await fileobj_read(Config.io_chunksize, False)
except Exception as err:
# Caught some random exception whilst reading from a file
exception = err
Expand Down

0 comments on commit 72c9dbe

Please sign in to comment.