Skip to content

Commit

Permalink
Merge branch 'main' into pipe_concurrent
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Oct 16, 2024
2 parents 8f2e9e9 + dd75a1a commit 540563a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 35 deletions.
13 changes: 9 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "3.13"
aiobotocore-version: [">=2.5.4,<2.6.0", ">=2.7.0,<2.8.0", ">=2.8.0,<2.9.0", "<3.0.0"]

env:
Expand All @@ -24,11 +30,10 @@ jobs:
fetch-depth: 0

- name: Setup conda
uses: mamba-org/setup-micromamba@v1
uses: conda-incubator/setup-miniconda@v3
with:
environment-file: ci/env.yaml
create-args: >-
python=${{ matrix.PY }}
python-version: ${{ matrix.python-version }}

- name: Install
shell: bash -l {0}
Expand Down
75 changes: 44 additions & 31 deletions s3fs/core.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import asyncio
import errno
import io
import logging
import mimetypes
import os
Expand Down Expand Up @@ -248,6 +249,9 @@ class S3FileSystem(AsyncFileSystem):
this parameter to affect ``pipe()``, ``cat()`` and ``get()``. Increasing this
value will result in higher memory usage during multipart upload operations (by
``max_concurrency * chunksize`` bytes per file).
fixed_upload_size : bool (False)
Use same chunk size for all parts in multipart upload (last part can be smaller).
Cloudflare R2 storage requires fixed_upload_size=True for multipart uploads.
The following parameters are passed on to fsspec:
Expand Down Expand Up @@ -296,6 +300,7 @@ def __init__(
asynchronous=False,
loop=None,
max_concurrency=10,
fixed_upload_size: bool = False,
**kwargs,
):
if key and username:
Expand Down Expand Up @@ -333,6 +338,7 @@ def __init__(
self.cache_regions = cache_regions
self._s3 = None
self.session = session
self.fixed_upload_size = fixed_upload_size
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
self.max_concurrency = max_concurrency
Expand Down Expand Up @@ -2337,47 +2343,54 @@ def _upload_chunk(self, final=False):
and final
and self.tell() < self.blocksize
):
# only happens when closing small file, use on-shot PUT
data1 = False
# only happens when closing small file, use one-shot PUT
pass
else:
self.buffer.seek(0)
(data0, data1) = (None, self.buffer.read(self.blocksize))

while data1:
# concurrency here??
(data0, data1) = (data1, self.buffer.read(self.blocksize))
data1_size = len(data1)
def upload_part(part_data: bytes):
if len(part_data) == 0:
return
part = len(self.parts) + 1
logger.debug("Upload chunk %s, %s" % (self, part))

if 0 < data1_size < self.blocksize:
remainder = data0 + data1
remainder_size = self.blocksize + data1_size

if remainder_size <= self.part_max:
(data0, data1) = (remainder, None)
else:
partition = remainder_size // 2
(data0, data1) = (remainder[:partition], remainder[partition:])
out = self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part,
UploadId=self.mpu["UploadId"],
Body=part_data,
Key=key,
)

part = len(self.parts) + 1
logger.debug("Upload chunk %s, %s" % (self, part))
part_header = {"PartNumber": part, "ETag": out["ETag"]}
if "ChecksumSHA256" in out:
part_header["ChecksumSHA256"] = out["ChecksumSHA256"]
self.parts.append(part_header)

out = self._call_s3(
"upload_part",
Bucket=bucket,
PartNumber=part,
UploadId=self.mpu["UploadId"],
Body=data0,
Key=key,
)
def n_bytes_left() -> int:
return len(self.buffer.getbuffer()) - self.buffer.tell()

part_header = {"PartNumber": part, "ETag": out["ETag"]}
if "ChecksumSHA256" in out:
part_header["ChecksumSHA256"] = out["ChecksumSHA256"]
self.parts.append(part_header)
min_chunk = 1 if final else self.blocksize
# TODO: concurrent here
if self.fs.fixed_upload_size:
# all chunks have fixed size, exception: last one can be smaller
while n_bytes_left() >= min_chunk:
upload_part(self.buffer.read(self.blocksize))
else:
while n_bytes_left() >= min_chunk:
upload_part(self.buffer.read(self.part_max))

if self.autocommit and final:
self.commit()
return not final
else:
# update 'upload offset'
self.offset += self.buffer.tell()
# create new smaller buffer, seek to file end
self.buffer = io.BytesIO(self.buffer.read())
self.buffer.seek(0, 2)

return False # instruct fsspec.flush to NOT clear self.buffer

def commit(self):
logger.debug("Commit %s" % self)
Expand Down
50 changes: 50 additions & 0 deletions s3fs/tests/test_s3fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,9 @@ def test_seek(s3):
with s3.open(a, "wb") as f:
f.write(b"123")

with s3.open(a) as f:
assert f.read() == b"123"

with s3.open(a) as f:
f.seek(1000)
with pytest.raises(ValueError):
Expand Down Expand Up @@ -2752,3 +2755,50 @@ def test_bucket_versioning(s3):
assert s3.is_bucket_versioned("maybe_versioned")
s3.make_bucket_versioned("maybe_versioned", False)
assert not s3.is_bucket_versioned("maybe_versioned")


@pytest.fixture()
def s3_fixed_upload_size(s3):
s3_fixed = S3FileSystem(
anon=False,
client_kwargs={"endpoint_url": endpoint_uri},
fixed_upload_size=True,
)
s3_fixed.invalidate_cache()
yield s3_fixed


def test_upload_parts(s3_fixed_upload_size):
with s3_fixed_upload_size.open(a, "wb", block_size=6_000_000) as f:
f.write(b" " * 6_001_000)
assert len(f.buffer.getbuffer()) == 1000
# check we are at the right position
assert f.tell() == 6_001_000
# offset is introduced in fsspec.core, but never used.
# apparently it should keep offset for part that is already uploaded
assert f.offset == 6_000_000
f.write(b" " * 6_001_000)
assert len(f.buffer.getbuffer()) == 2000
assert f.tell() == 2 * 6_001_000
assert f.offset == 2 * 6_000_000

with s3_fixed_upload_size.open(a, "r") as f:
assert len(f.read()) == 6_001_000 * 2


def test_upload_part_with_prime_pads(s3_fixed_upload_size):
block = 6_000_000
pad1, pad2 = 1013, 1019 # prime pad sizes to exclude divisibility
with s3_fixed_upload_size.open(a, "wb", block_size=block) as f:
f.write(b" " * (block + pad1))
assert len(f.buffer.getbuffer()) == pad1
# check we are at the right position
assert f.tell() == block + pad1
assert f.offset == block
f.write(b" " * (block + pad2))
assert len(f.buffer.getbuffer()) == pad1 + pad2
assert f.tell() == 2 * block + pad1 + pad2
assert f.offset == 2 * block

with s3_fixed_upload_size.open(a, "r") as f:
assert len(f.read()) == 2 * block + pad1 + pad2
2 changes: 2 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
],
description="Convenient Filesystem interface over S3",
url="http://github.com/fsspec/s3fs/",
Expand Down

0 comments on commit 540563a

Please sign in to comment.