Skip to content

Commit

Permalink
Merge branch 'fsspec:master' into accept-ranges-none
Browse files Browse the repository at this point in the history
  • Loading branch information
itcarroll authored Sep 24, 2024
2 parents 22bdf4a + 1f61512 commit 6560375
Show file tree
Hide file tree
Showing 33 changed files with 577 additions and 91 deletions.
19 changes: 6 additions & 13 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,15 @@ jobs:
fetch-depth: 0

- name: Setup conda
uses: mamba-org/setup-micromamba@v1
uses: conda-incubator/setup-miniconda@v3
with:
environment-file: ci/environment-py38.yml
create-args: >-
python=${{ matrix.PY }}
python-version: ${{ matrix.PY }}

- name: Run Tests
shell: bash -l {0}
run: |
pip install s3fs
pip uninstall s3fs
pip install -e .[test_full]
pip install s3fs --no-deps
pytest -v
win:
Expand All @@ -54,17 +50,14 @@ jobs:
fetch-depth: 0

- name: Setup conda
uses: mamba-org/setup-micromamba@v1
uses: conda-incubator/setup-miniconda@v3
with:
environment-file: ci/environment-win.yml

- name: Run Tests
shell: bash -l {0}
run: |
pip install s3fs
pip uninstall s3fs
pip install -e .[test]
pip install s3fs --no-deps
pytest -v
lint:
Expand All @@ -84,7 +77,7 @@ jobs:
# uses: actions/checkout@v4
#
# - name: Setup conda
# uses: mamba-org/setup-micromamba@v1
# uses: conda-incubator/setup-miniconda@v3
# with:
# environment-file: ci/environment-typecheck.yml
#
Expand All @@ -104,7 +97,7 @@ jobs:
fetch-depth: 0

- name: Setup conda
uses: mamba-org/setup-micromamba@v1
uses: conda-incubator/setup-miniconda@v3
with:
environment-file: ci/environment-downstream.yml

Expand Down Expand Up @@ -145,7 +138,7 @@ jobs:
uses: actions/checkout@v4

- name: Setup conda
uses: mamba-org/setup-micromamba@v1
uses: conda-incubator/setup-miniconda@v3
with:
environment-file: ci/environment-friends.yml

Expand Down
2 changes: 1 addition & 1 deletion ci/environment-downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ name: test_env
channels:
- conda-forge
dependencies:
- python=3.9
- python=3.11
- pip:
- git+https://github.com/dask/dask
2 changes: 1 addition & 1 deletion ci/environment-friends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: test_env
channels:
- conda-forge
dependencies:
- python=3.9
- python=3.12
- pytest
- pytest-asyncio !=0.22.0
- pytest-benchmark
Expand Down
1 change: 1 addition & 0 deletions ci/environment-py38.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ dependencies:
- pip
- git <2.45.0
- py
- s3fs
- pip:
- hadoop-test-cluster
- smbprotocol
4 changes: 3 additions & 1 deletion ci/environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ name: test_env
channels:
- conda-forge
dependencies:
- python=3.9
- python=3.11
- s3fs
- pytest
22 changes: 22 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
Changelog
=========

2024.9.0
--------

Enhancements

- fewer stat calls in localFS (#1659)
- faster find in ZIP (#1664)

Fixes

- paths without "/" in dirFS (#1638)
- paths with "/" in FTS (#1643, 1644)
- ls in parquet-based nested reference sets, and append (#1645, 1657)
- exception handling for SMB (#1650)


Other

- style (#1640, 1641, 1660)
- docs: xrootd (#1646)
- CI back on miniconda (#1658)

2024.6.1
--------

Expand Down
2 changes: 1 addition & 1 deletion fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,7 @@ async def flush(self, force=False):
self.offset = 0
try:
await self._initiate_upload()
except: # noqa: E722
except:
self.closed = True
raise

Expand Down
2 changes: 1 addition & 1 deletion fsspec/implementations/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def cp_file(self, path1, path2, **kwargs):
with self.open(tmp_fname, "wb") as rstream:
shutil.copyfileobj(lstream, rstream)
self.fs.move(tmp_fname, path2)
except BaseException: # noqa
except BaseException:
with suppress(FileNotFoundError):
self.fs.delete_file(tmp_fname)
raise
Expand Down
16 changes: 8 additions & 8 deletions fsspec/implementations/dbfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def ls(self, path, detail=True, **kwargs):
if e.error_code == "RESOURCE_DOES_NOT_EXIST":
raise FileNotFoundError(e.message) from e

raise e
raise
files = r["files"]
out = [
{
Expand Down Expand Up @@ -125,7 +125,7 @@ def makedirs(self, path, exist_ok=True):
if e.error_code == "RESOURCE_ALREADY_EXISTS":
raise FileExistsError(e.message) from e

raise e
raise
self.invalidate_cache(self._parent(path))

def mkdir(self, path, create_parents=True, **kwargs):
Expand Down Expand Up @@ -171,7 +171,7 @@ def rm(self, path, recursive=False, **kwargs):
# Using the same exception as the os module would use here
raise OSError(e.message) from e

raise e
raise
self.invalidate_cache(self._parent(path))

def mv(
Expand Down Expand Up @@ -216,7 +216,7 @@ def mv(
elif e.error_code == "RESOURCE_ALREADY_EXISTS":
raise FileExistsError(e.message) from e

raise e
raise
self.invalidate_cache(self._parent(source_path))
self.invalidate_cache(self._parent(destination_path))

Expand Down Expand Up @@ -299,7 +299,7 @@ def _create_handle(self, path, overwrite=True):
if e.error_code == "RESOURCE_ALREADY_EXISTS":
raise FileExistsError(e.message) from e

raise e
raise

def _close_handle(self, handle):
"""
Expand All @@ -316,7 +316,7 @@ def _close_handle(self, handle):
if e.error_code == "RESOURCE_DOES_NOT_EXIST":
raise FileNotFoundError(e.message) from e

raise e
raise

def _add_data(self, handle, data):
"""
Expand Down Expand Up @@ -346,7 +346,7 @@ def _add_data(self, handle, data):
elif e.error_code == "MAX_BLOCK_SIZE_EXCEEDED":
raise ValueError(e.message) from e

raise e
raise

def _get_data(self, path, start, end):
"""
Expand Down Expand Up @@ -376,7 +376,7 @@ def _get_data(self, path, start, end):
elif e.error_code in ["INVALID_PARAMETER_VALUE", "MAX_READ_SIZE_EXCEEDED"]:
raise ValueError(e.message) from e

raise e
raise

def invalidate_cache(self, path=None):
if path is None:
Expand Down
20 changes: 15 additions & 5 deletions fsspec/implementations/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import sys
import uuid
import warnings
from ftplib import FTP, Error, error_perm
from ftplib import FTP, FTP_TLS, Error, error_perm
from typing import Any

from ..spec import AbstractBufferedFile, AbstractFileSystem
Expand All @@ -27,6 +27,7 @@ def __init__(
tempdir=None,
timeout=30,
encoding="utf-8",
tls=False,
**kwargs,
):
"""
Expand Down Expand Up @@ -56,28 +57,37 @@ def __init__(
Timeout of the ftp connection in seconds
encoding: str
Encoding to use for directories and filenames in FTP connection
tls: bool
Use FTP-TLS, by default False
"""
super().__init__(**kwargs)
self.host = host
self.port = port
self.tempdir = tempdir or "/tmp"
self.cred = username, password, acct
self.cred = username or "", password or "", acct or ""
self.timeout = timeout
self.encoding = encoding
if block_size is not None:
self.blocksize = block_size
else:
self.blocksize = 2**16
self.tls = tls
self._connect()
if self.tls:
self.ftp.prot_p()

def _connect(self):
if self.tls:
ftp_cls = FTP_TLS
else:
ftp_cls = FTP
if sys.version_info >= (3, 9):
self.ftp = FTP(timeout=self.timeout, encoding=self.encoding)
self.ftp = ftp_cls(timeout=self.timeout, encoding=self.encoding)
elif self.encoding:
warnings.warn("`encoding` not supported for python<3.9, ignoring")
self.ftp = FTP(timeout=self.timeout)
self.ftp = ftp_cls(timeout=self.timeout)
else:
self.ftp = FTP(timeout=self.timeout)
self.ftp = ftp_cls(timeout=self.timeout)
self.ftp.connect(self.host, self.port)
self.ftp.login(*self.cred)

Expand Down
18 changes: 11 additions & 7 deletions fsspec/implementations/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ def info(self, path, **kwargs):
t = "file"
else:
t = "other"

size = out.st_size
if link:
try:
out2 = path.stat(follow_symlinks=True)
size = out2.st_size
except OSError:
size = 0
path = self._strip_protocol(path.path)
else:
# str or path-like
Expand All @@ -87,6 +95,7 @@ def info(self, path, **kwargs):
link = stat.S_ISLNK(out.st_mode)
if link:
out = os.stat(path, follow_symlinks=True)
size = out.st_size
if stat.S_ISDIR(out.st_mode):
t = "directory"
elif stat.S_ISREG(out.st_mode):
Expand All @@ -95,20 +104,15 @@ def info(self, path, **kwargs):
t = "other"
result = {
"name": path,
"size": out.st_size,
"size": size,
"type": t,
"created": out.st_ctime,
"islink": link,
}
for field in ["mode", "uid", "gid", "mtime", "ino", "nlink"]:
result[field] = getattr(out, f"st_{field}")
if result["islink"]:
if link:
result["destination"] = os.readlink(path)
try:
out2 = os.stat(path, follow_symlinks=True)
result["size"] = out2.st_size
except OSError:
result["size"] = 0
return result

def lexists(self, path, **kwargs):
Expand Down
13 changes: 8 additions & 5 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def open_refs(field, record):
path = self.url.format(field=field, record=record)
data = io.BytesIO(self.fs.cat_file(path))
df = self.pd.read_parquet(data, engine="fastparquet")
refs = {c: df[c].values for c in df.columns}
refs = {c: df[c].to_numpy() for c in df.columns}
return refs

self.open_refs = open_refs
Expand Down Expand Up @@ -502,6 +502,7 @@ def flush(self, base_url=None, storage_options=None):
if k != ".zmetadata" and ".z" in k:
self.zmetadata[k] = json.loads(self._items.pop(k))
met = {"metadata": self.zmetadata, "record_size": self.record_size}
self._items.clear()
self._items[".zmetadata"] = json.dumps(met).encode()
self.fs.pipe(
"/".join([base_url or self.out_root, ".zmetadata"]),
Expand Down Expand Up @@ -996,9 +997,11 @@ def _process_gen(self, gens):
out = {}
for gen in gens:
dimension = {
k: v
if isinstance(v, list)
else range(v.get("start", 0), v["stop"], v.get("step", 1))
k: (
v
if isinstance(v, list)
else range(v.get("start", 0), v["stop"], v.get("step", 1))
)
for k, v in gen["dimensions"].items()
}
products = (
Expand Down Expand Up @@ -1085,7 +1088,7 @@ def isdir(self, path): # overwrite auto-sync version
if self.dircache:
return path in self.dircache
elif isinstance(self.references, LazyReferenceMapper):
return path in self.references.listdir("")
return path in self.references.listdir()
else:
# this may be faster than building dircache for single calls, but
# by looping will be slow for many calls; could cache it?
Expand Down
2 changes: 1 addition & 1 deletion fsspec/implementations/smb.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def _connect(self):
else:
# All another ValueError exceptions should be raised, as they are not
# related to network issues.
raise exc
raise
except Exception as exc:
# Save the exception and retry to connect. This except might be dropped
# in the future, once all exceptions suited for retry are identified.
Expand Down
Loading

0 comments on commit 6560375

Please sign in to comment.