Skip to content

Commit

Permalink
Working async rsync (fsspec#1345)
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Aug 31, 2023
1 parent 55e3042 commit 212c26f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 16 deletions.
123 changes: 108 additions & 15 deletions fsspec/generic.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from __future__ import annotations

import inspect
import logging
import os
import shutil
import uuid
from typing import Optional

from .asyn import AsyncFileSystem
from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
from .callbacks import _DEFAULT_CALLBACK
from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs

Expand Down Expand Up @@ -52,19 +58,23 @@ def rsync(
Parameters
----------
source: str
Root of the directory tree to take files from.
Root of the directory tree to take files from. This must be a directory, but
do not include any terminating "/" character
destination: str
Root path to copy into. The contents of this location should be
identical to the contents of ``source`` when done.
identical to the contents of ``source`` when done. This will be made a
directory, and the terminal "/" should not be included.
delete_missing: bool
If there are paths in the destination that don't exist in the
source and this is True, delete them. Otherwise, leave them alone.
source_field: str
source_field: str | callable
If ``update_field`` is "different", this is the key in the info
of source files to consider for difference.
dest_field: str
of source files to consider for difference. Maybe a function of the
info dict.
dest_field: str | callable
If ``update_field`` is "different", this is the key in the info
of destination files to consider for difference.
of destination files to consider for difference. May be a function of
the info dict.
update_cond: "different"|"always"|"never"
If "always", every file is copied, regardless of whether it exists in
the destination. If "never", files that exist in the destination are
Expand All @@ -91,9 +101,10 @@ def rsync(
if v["type"] == "directory" and a.replace(source, destination) not in otherfiles
]
logger.debug(f"{len(dirs)} directories to create")
for dirn in dirs:
# no async
fs.mkdirs(dirn.replace(source, destination), exist_ok=True)
if dirs:
fs.make_many_dirs(
[dirn.replace(source, destination) for dirn in dirs], exist_ok=True
)
allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"}
logger.debug(f"{len(allfiles)} files to consider for copy")
to_delete = [
Expand All @@ -107,7 +118,10 @@ def rsync(
if update_cond == "always":
allfiles[k] = otherfile
elif update_cond == "different":
if v[source_field] != otherfiles[otherfile][dest_field]:
inf1 = source_field(v) if callable(source_field) else v[source_field]
v2 = otherfiles[otherfile]
inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field]
if inf1 != inf2:
# details mismatch, make copy
allfiles[k] = otherfile
else:
Expand All @@ -116,12 +130,12 @@ def rsync(
else:
# file not in target yet
allfiles[k] = otherfile
logger.debug(f"{len(allfiles)} files to copy")
if allfiles:
source_files, target_files = zip(*allfiles.items())
logger.debug(f"{len(source_files)} files to copy")
fs.cp(source_files, target_files, **kwargs)
logger.debug(f"{len(to_delete)} files to delete")
if delete_missing:
logger.debug(f"{len(to_delete)} files to delete")
fs.rm(to_delete)


Expand Down Expand Up @@ -166,11 +180,11 @@ async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwarg
fs = _resolve_fs(path, self.method)
if fs.async_impl:
out = await fs._find(
path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs
path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
)
else:
out = fs.find(
path, maxdepth=maxdepth, withdirs=withdirs, detail=detail, **kwargs
path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
)
result = {}
for k, v in out.items():
Expand Down Expand Up @@ -239,6 +253,7 @@ async def _rm(self, url, **kwargs):
fs.rm(url, **kwargs)

async def _makedirs(self, path, exist_ok=False):
logger.debug("Make dir %s", path)
fs = _resolve_fs(path, self.method)
if fs.async_impl:
await fs._makedirs(path, exist_ok=exist_ok)
Expand Down Expand Up @@ -295,6 +310,84 @@ async def _cp_file(
# fail while opening f1 or f2
pass

async def _make_many_dirs(self, urls, exist_ok=True):
fs = _resolve_fs(urls[0], self.method)
if fs.async_impl:
coros = [fs._makedirs(u, exist_ok=exist_ok) for u in urls]
await _run_coros_in_chunks(coros)
else:
for u in urls:
fs.makedirs(u, exist_ok=exist_ok)

make_many_dirs = sync_wrapper(_make_many_dirs)

async def _copy(
self,
path1: list[str],
path2: list[str],
recursive: bool = False,
on_error: str = "ignore",
maxdepth: Optional[int] = None,
batch_size: Optional[int] = None,
tempdir: Optional[str] = None,
**kwargs,
):
if recursive:
raise NotImplementedError
fs = _resolve_fs(path1[0], self.method)
fs2 = _resolve_fs(path2[0], self.method)
# not expanding paths atm., assume call is from rsync()
if fs is fs2:
# pure remote
if fs.async_impl:
return await fs._copy(path1, path2, **kwargs)
else:
return fs.copy(path1, path2, **kwargs)
await copy_file_op(
fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error
)


async def copy_file_op(
fs1, url1, fs2, url2, tempdir=None, batch_size=20, on_error="ignore"
):
import tempfile

tempdir = tempdir or tempfile.mkdtemp()
try:
coros = [
_copy_file_op(
fs1,
u1,
fs2,
u2,
os.path.join(tempdir, uuid.uuid4().hex),
on_error=on_error,
)
for u1, u2 in zip(url1, url2)
]
await _run_coros_in_chunks(coros, batch_size=batch_size)
finally:
shutil.rmtree(tempdir)


async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"):
ex = () if on_error == "raise" else Exception
logger.debug("Copy %s -> %s", url1, url2)
try:
if fs1.async_impl:
await fs1._get_file(url1, local)
else:
fs1.get_file(url1, local)
if fs2.async_impl:
await fs2._put_file(local, url2)
else:
fs2.put_file(local, url2)
os.unlink(local)
logger.debug("Copy %s -> %s; done", url1, url2)
except ex as e:
logger.debug("ignoring cp exception for %s: %s", url1, e)


async def maybe_await(cor):
if inspect.iscoroutine(cor):
Expand Down
2 changes: 1 addition & 1 deletion fsspec/tests/test_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_touch_rm(m):
def test_cp_async_to_sync(server, m):
fsspec.filesystem("http", headers={"give_length": "true", "head_ok": "true"})
fs = fsspec.filesystem("generic", default_method="current")
fs.cp(server + "/index/realfile", "memory://realfile")
fs.cp([server + "/index/realfile"], ["memory://realfile"])
assert m.cat("realfile") == data

fs.rm("memory://realfile")
Expand Down

0 comments on commit 212c26f

Please sign in to comment.