Skip to content

Commit

Permalink
Merge branch 'master' into TCH
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Nov 15, 2024
2 parents 529dcd9 + 4d1ec47 commit 689559b
Show file tree
Hide file tree
Showing 34 changed files with 641 additions and 142 deletions.
7 changes: 6 additions & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ jobs:
strategy:
fail-fast: false
matrix:
PY: ["3.8", "3.9", "3.10", "3.11", "3.12"]
PY:
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "3.13"

env:
CIRUN: true
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ exclude: >
repos:

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -14,7 +14,7 @@ repos:
- id: check-yaml
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.4.4
rev: v0.6.9
hooks:
# Run the linter.
- id: ruff
Expand Down
56 changes: 44 additions & 12 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,41 +209,73 @@ Built-in Implementations
Other Known Implementations
---------------------------

- `abfs`_ for Azure Blob service
- `adl`_ for Azure DataLake storage

Note that most of these projects are hosted outside of the `fsspec` organisation. Please read their
documentation carefully before using any particular package.

- `abfs`_ for Azure Blob service, with protocol "abfs://"
- `adl`_ for Azure DataLake storage, with protocol "adl://"
- `alluxiofs`_ to access fsspec implemented filesystem with Alluxio distributed cache
- `boxfs`_ for access to Box file storage
- `dropbox`_ for access to dropbox shares
- `boxfs`_ for access to Box file storage, with protocol "box://"
- `csvbase`_ for access to csvbase.com hosted CSV files, with protocol "csvbase://"
- `dropbox`_ for access to dropbox shares, with protocol "dropbox://"
- `dvc`_ to access DVC/Git repository as a filesystem
- `gcsfs`_ for Google Cloud Storage
- `fsspec-encrypted`_ for transparent encryption on top of other fsspec filesystems.
- `gcsfs`_ for Google Cloud Storage, with protocol "gcs://"
- `gdrive`_ to access Google Drive and shares (experimental)
- `git`_ to access Git repositories
- `huggingface_hub`_ to access the Hugging Face Hub filesystem, with protocol "hf://"
- `lakefs`_ for lakeFS data lakes
- `ocifs`_ for access to Oracle Cloud Object Storage
- `hdfs-native`_ to access Hadoop filesystem, with protocol "hdfs://"
- `httpfs-sync`_ to access HTTP(s) files in a synchronous manner to offer an alternative to the aiohttp-based implementation.
- `ipfsspec`_ for the InterPlanetary File System (IPFS), with protocol "ipfs://"
- `irods`_ for access to iRODS servers, with protocol "irods://"
- `lakefs`_ for lakeFS data lakes, with protocol "lakefs://"
- `morefs`_ for `OverlayFileSystem`, `DictFileSystem`, and others
- `ocifs`_ for access to Oracle Cloud Object Storage, with protocol "oci://"
- `ocilake`_ for OCI Data Lake storage
- `ossfs`_ for Alibaba Cloud (Aliyun) Object Storage System (OSS)
- `p9fs`_ for 9P (Plan 9 Filesystem Protocol) servers
- `s3fs`_ for Amazon S3 and other compatible stores
- `PyAthena`_ for S3 access to Amazon Athena, with protocol "s3://" or "s3a://"
- `PyDrive2`_ for Google Drive access
- `s3fs`_ for Amazon S3 and other compatible stores, with protocol "s3://"
- `sshfs`_ for access to SSH servers, with protocol "ssh://" or "sftp://"
- `swiftspec`_ for OpenStack SWIFT, with protocol "swift://"
- `tosfs`_ for ByteDance volcano engine Tinder Object Storage (TOS)
- `wandbfs`_ to access Wandb run data (experimental)
- `webdav4`_ for WebDAV
- `wandbfsspec`_ to access Weights & Biases (experimental)
- `webdav4`_ for WebDAV, with protocol "webdav://" or "dav://"
- `xrootd`_ for xrootd, with protocol "root://"

.. _abfs: https://github.com/dask/adlfs
.. _adl: https://github.com/dask/adlfs
.. _alluxiofs: https://github.com/fsspec/alluxiofs
.. _boxfs: https://github.com/IBM/boxfs
.. _dropbox: https://github.com/MarineChap/intake_dropbox
.. _csvbase: https://github.com/calpaterson/csvbase-client
.. _dropbox: https://github.com/fsspec/dropboxdrivefs
.. _dvc: https://github.com/iterative/dvc
.. _fsspec-encrypted: https://github.com/thevgergroup/fsspec-encrypted
.. _gcsfs: https://gcsfs.readthedocs.io/en/latest/
.. _gdrive: https://github.com/fsspec/gdrivefs
.. _git: https://github.com/iterative/scmrepo
.. _hdfs-native: https://github.com/Kimahriman/hdfs-native/blob/master/python/hdfs_native/fsspec.py
.. _httpfs-sync: https://github.com/moradology/httpfs-sync
.. _huggingface_hub: https://huggingface.co/docs/huggingface_hub/main/en/guides/hf_file_system
.. _lakefs: https://github.com/appliedAI-Initiative/lakefs-spec
.. _ocifs: https://pypi.org/project/ocifs
.. _ipfsspec: https://github.com/fsspec/ipfsspec
.. _irods: https://github.com/xwcl/irods_fsspec
.. _lakefs: https://github.com/aai-institute/lakefs-spec
.. _morefs: https://github.com/iterative/morefs
.. _ocifs: https://ocifs.readthedocs.io/en/latest/
.. _ocilake: https://github.com/oracle/ocifs
.. _ossfs: https://github.com/fsspec/ossfs
.. _p9fs: https://github.com/pbchekin/p9fs-py
.. _PyAthena: https://github.com/laughingman7743/PyAthena
.. _PyDrive2: https://github.com/iterative/PyDrive2
.. _s3fs: https://s3fs.readthedocs.io/en/latest/
.. _sshfs: https://github.com/fsspec/sshfs
.. _swiftspec: https://github.com/fsspec/swiftspec
.. _tosfs: https://tosfs.readthedocs.io/en/latest/
.. _wandbfs: https://github.com/jkulhanek/wandbfs
.. _wandbfsspec: https://github.com/alvarobartt/wandbfsspec
.. _webdav4: https://github.com/skshetry/webdav4
.. _xrootd: https://github.com/CoffeaTeam/fsspec-xrootd

Expand Down
34 changes: 34 additions & 0 deletions docs/source/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,37 @@ available as the attribute ``.loop``.

<script data-goatcounter="https://fsspec.goatcounter.com/count"
async src="//gc.zgo.at/count.js"></script>

AsyncFileSystemWrapper
----------------------

The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert
a synchronous filesystem into an asynchronous one. This is useful for quickly integrating
synchronous filesystems into workflows that may expect `AsyncFileSystem` instances.

Basic Usage
~~~~~~~~~~~

To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context.
In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance
backed by the normal, synchronous methods of `LocalFileSystem`:

.. code-block:: python
import asyncio
import fsspec
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
async def async_copy_file():
sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem
async_fs = AsyncFileSystemWrapper(sync_fs)
return await async_fs._copy('/source/file.txt', '/destination/file.txt')
asyncio.run(async_copy_file())
Limitations
-----------

This is experimental. Users should not expect this wrapper to magically make things faster.
It is primarily provided to allow usage of synchronous filesystems with interfaces that expect
`AsyncFileSystem` instances.
21 changes: 21 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
Changelog
=========

2024.10.0
---------

Fixes

- Performance of memoryFS rm (#1725)
- Performance of git FS info (#1712)
- Avoid git hex for newer pygit (#1703)
- tests fix for zip (#1700, 1691)
- missing open_async for dirFS (#1698)
- handle pathlib in zip (#1689)
- skip tests needing kerchunk if not installed (#1689)
- allow repeated kwargs in unchain (#1673)

Other

- Code style (#1704, 1706)
- allow pyarrow in referenceFS parquet (#1692)
- don't hardcode test port for parallel runs (#1690)


2024.9.0
--------

Expand Down
8 changes: 3 additions & 5 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,11 +816,9 @@ async def _glob(self, path, maxdepth=None, **kwargs):
p: info
for p, info in sorted(allpaths.items())
if pattern.match(
(
p + "/"
if append_slash_to_dirname and info["type"] == "directory"
else p
)
p + "/"
if append_slash_to_dirname and info["type"] == "directory"
else p
)
}

Expand Down
53 changes: 34 additions & 19 deletions fsspec/caching.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import threading
import warnings
from concurrent.futures import Future, ThreadPoolExecutor
from itertools import groupby
from operator import itemgetter
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -85,12 +87,7 @@ def _log_stats(self) -> str:
if self.hit_count == 0 and self.miss_count == 0:
# a cache that does nothing, this is for logs only
return ""
return " , %s: %d hits, %d misses, %d total requested bytes" % (
self.name,
self.hit_count,
self.miss_count,
self.total_requested_bytes,
)
return f" , {self.name}: {self.hit_count} hits, {self.miss_count} misses, {self.total_requested_bytes} total requested bytes"

def __repr__(self) -> str:
# TODO: use rich for better formatting
Expand Down Expand Up @@ -161,21 +158,39 @@ def _fetch(self, start: int | None, end: int | None) -> bytes:
return b""
start_block = start // self.blocksize
end_block = end // self.blocksize
need = [i for i in range(start_block, end_block + 1) if i not in self.blocks]
hits = [i for i in range(start_block, end_block + 1) if i in self.blocks]
self.miss_count += len(need)
self.hit_count += len(hits)
while need:
# TODO: not a for loop so we can consolidate blocks later to
# make fewer fetch calls; this could be parallel
i = need.pop(0)

sstart = i * self.blocksize
send = min(sstart + self.blocksize, self.size)
block_range = range(start_block, end_block + 1)
# Determine which blocks need to be fetched. This sequence is sorted by construction.
need = (i for i in block_range if i not in self.blocks)
# Count the number of blocks already cached
self.hit_count += sum(1 for i in block_range if i in self.blocks)

# Consolidate needed blocks.
# Algorithm adapted from Python 2.x itertools documentation.
# We are grouping an enumerated sequence of blocks. By comparing when the difference
# between an ascending range (provided by enumerate) and the needed block numbers
# we can detect when the block number skips values. The key computes this difference.
# Whenever the difference changes, we know that we have previously cached block(s),
# and a new group is started. In other words, this algorithm neatly groups
# runs of consecutive block numbers so they can be fetched together.
for _, _blocks in groupby(enumerate(need), key=lambda x: x[0] - x[1]):
# Extract the blocks from the enumerated sequence
_blocks = tuple(map(itemgetter(1), _blocks))
# Compute start of first block
sstart = _blocks[0] * self.blocksize
# Compute the end of the last block. Last block may not be full size.
send = min(_blocks[-1] * self.blocksize + self.blocksize, self.size)

# Fetch bytes (could be multiple consecutive blocks)
self.total_requested_bytes += send - sstart
logger.debug(f"MMap get block #{i} ({sstart}-{send})")
logger.debug(
f"MMap get blocks {_blocks[0]}-{_blocks[-1]} ({sstart}-{send})"
)
self.cache[sstart:send] = self.fetcher(sstart, send)
self.blocks.add(i)

# Update set of cached blocks
self.blocks.update(_blocks)
# Update cache statistics with number of blocks we had to cache
self.miss_count += len(_blocks)

return self.cache[start:end]

Expand Down
33 changes: 19 additions & 14 deletions fsspec/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,19 @@ def open_files(


def _un_chain(path, kwargs):
x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
bits = (
[p if "://" in p or x.match(p) else p + "://" for p in path.split("::")]
if "::" in path
else [path]
)
# Avoid a circular import
from fsspec.implementations.cached import CachingFileSystem

if "::" in path:
x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
bits = []
for p in path.split("::"):
if "://" in p or x.match(p):
bits.append(p)
else:
bits.append(p + "://")
else:
bits = [path]
# [[url, protocol, kwargs], ...]
out = []
previous_bit = None
Expand All @@ -346,12 +353,12 @@ def _un_chain(path, kwargs):
kws = kwargs.pop(protocol, {})
if bit is bits[0]:
kws.update(kwargs)
kw = dict(**extra_kwargs, **kws)
kw = dict(
**{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
**kws,
)
bit = cls._strip_protocol(bit)
if (
protocol in {"blockcache", "filecache", "simplecache"}
and "target_protocol" not in kw
):
if "target_protocol" not in kw and issubclass(cls, CachingFileSystem):
bit = previous_bit
out.append((bit, protocol, kw))
previous_bit = bit
Expand Down Expand Up @@ -673,9 +680,7 @@ def get_fs_token_paths(
elif not isinstance(paths, list):
paths = list(paths)
else:
if "w" in mode and expand:
paths = _expand_paths(paths, name_function, num)
elif "x" in mode and expand:
if ("w" in mode or "x" in mode) and expand:
paths = _expand_paths(paths, name_function, num)
elif "*" in paths:
paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
Expand Down
Loading

0 comments on commit 689559b

Please sign in to comment.