diff --git a/gcsfs/core.py b/gcsfs/core.py index f6f50036..3c02f21d 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -16,7 +16,7 @@ from urllib.parse import urlsplit import fsspec -from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper +from fsspec import asyn from fsspec.callbacks import NoOpCallback from fsspec.implementations.http import get_client from fsspec.utils import setup_logging, stringify_path @@ -151,7 +151,7 @@ def _coalesce_generation(*args): return generations.pop() -class GCSFileSystem(AsyncFileSystem): +class GCSFileSystem(asyn.AsyncFileSystem): r""" Connect to Google Cloud Storage. @@ -349,7 +349,7 @@ def close_session(loop, session): pass try: - sync(loop, session.close, timeout=0.1) + asyn.sync(loop, session.close, timeout=0.1) except fsspec.FSTimeoutError: pass else: @@ -452,13 +452,14 @@ async def _call( else: return headers, contents - call = sync_wrapper(_call) + call = asyn.sync_wrapper(_call) @property def buckets(self): """Return list of available project buckets.""" return [ - b["name"] for b in sync(self.loop, self._list_buckets, timeout=self.timeout) + b["name"] + for b in asyn.sync(self.loop, self._list_buckets, timeout=self.timeout) ] def _process_object(self, bucket, object_metadata): @@ -492,7 +493,7 @@ async def _make_bucket_requester_pays(self, path, state=True): json = {"billing": {"requesterPays": state}} await self._call("PATCH", f"b/{path}", json=json) - make_bucket_requester_pays = sync_wrapper(_make_bucket_requester_pays) + make_bucket_requester_pays = asyn.sync_wrapper(_make_bucket_requester_pays) async def _get_object(self, path): """Return object information at the given path.""" @@ -894,7 +895,7 @@ async def _mkdir( ) self.invalidate_cache(bucket) - mkdir = sync_wrapper(_mkdir) + mkdir = asyn.sync_wrapper(_mkdir) async def _rmdir(self, bucket): """Delete an empty bucket @@ -912,7 +913,7 @@ async def _rmdir(self, bucket): self.invalidate_cache(bucket) self.invalidate_cache("") - rmdir = sync_wrapper(_rmdir) + rmdir = asyn.sync_wrapper(_rmdir) def modified(self, path): return self.info(path)["mtime"] @@ -1040,7 +1041,7 @@ async def _getxattr(self, path, attr): meta = (await self._info(path)).get("metadata", {}) return meta[attr] - getxattr = sync_wrapper(_getxattr) + getxattr = asyn.sync_wrapper(_getxattr) async def _setxattrs( self, @@ -1103,7 +1104,7 @@ async def _setxattrs( ) return o_json.get("metadata", {}) - setxattrs = sync_wrapper(_setxattrs) + setxattrs = asyn.sync_wrapper(_setxattrs) async def _merge(self, path, paths, acl=None): """Concatenate objects within a single bucket""" @@ -1123,7 +1124,7 @@ async def _merge(self, path, paths, acl=None): }, ) - merge = sync_wrapper(_merge) + merge = asyn.sync_wrapper(_merge) async def _cp_file(self, path1, path2, acl=None, **kwargs): """Duplicate remote file""" @@ -1167,7 +1168,7 @@ async def _rm_file(self, path, **kwargs): else: await self._rmdir(path) - async def _rm_files(self, paths): + async def _rm_files(self, paths, batchsize=20): import random template = ( @@ -1179,12 +1180,11 @@ async def _rm_files(self, paths): "Content-Type: application/json\n" "accept: application/json\ncontent-length: 0\n" ) - errors = [] - success = [] + out = [] # Splitting requests into 100 chunk batches # See https://cloud.google.com/storage/docs/batch for retry in range(1, 6): - for chunk in _chunks(paths, 100): + for chunk in _chunks(paths, 20): parts = [] for i, p in enumerate(chunk): bucket, key, generation = self.split_path(p) @@ -1209,8 +1209,8 @@ async def _rm_files(self, paths): ) boundary = headers["Content-Type"].split("=", 1)[1] - parents = [self._parent(p) for p in paths] - [self.invalidate_cache(parent) for parent in parents + list(paths)] + parents = set(self._parent(p) for p in paths) | set(paths) + [self.invalidate_cache(parent) for parent in parents] txt = content.decode() responses = txt.split(boundary)[1:-1] remaining = [] @@ -1218,26 +1218,28 @@ async def _rm_files(self, paths): m = re.search("HTTP/[0-9.]+ ([0-9]+)", response) code = int(m.groups()[0]) if m else None if code in [200, 204]: - success.append(path) - elif code in errs: + out.append(path) + elif code in errs and i < 5: remaining.append(path) else: - msg = re.search("({.*})", response.replace("\n", "")) + msg = re.search("{(.*)}", response.replace("\n", "")) if msg: - errors.append(json.loads(msg.groups()[0])["error"]) + msg2 = re.search("({.*})", msg.groups()[0]) else: - errors.append((path, code)) - if errors: - break + msg2 = None + if msg and msg2: + out.append(IOError(msg2.groups()[0])) + else: + out.append(IOError(str(path, code))) if remaining: paths = remaining await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32)) else: + end = True break - - if errors: - raise OSError(errors) - return success + if end: + break + return out @property def on_google(self): @@ -1249,33 +1251,47 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20): dirs = [p for p in paths if not self.split_path(p)[1]] if self.on_google: # emulators do not support batch - exs = await asyncio.gather( - *( + exs = sum( + await asyn._run_coros_in_chunks( [ self._rm_files(files[i : i + batchsize]) for i in range(0, len(files), batchsize) - ] + ], + return_exceptions=True, ), - return_exceptions=True, + [], ) else: - exs = await asyncio.gather( - *([self._rm_file(f) for f in files]), - return_exceptions=True, + exs = await asyn._run_coros_in_chunks( + [self._rm_file(f) for f in files], return_exceptions=True, batch_size=5 ) - exs = [ + # buckets + exs.extend( + await asyncio.gather( + *[self._rmdir(d) for d in dirs], return_exceptions=True + ) + ) + errors = [ ex for ex in exs - if ex is not None + if isinstance(ex, Exception) and "No such object" not in str(ex) and not isinstance(ex, FileNotFoundError) ] - if exs: - raise exs[0] - await asyncio.gather(*[self._rmdir(d) for d in dirs]) + if errors: + raise errors[0] + exs = [ + ex + for ex in exs + if "No such object" not in str(ex) and not isinstance(ex, FileNotFoundError) + ] + if not exs: + # nothing got deleted + raise FileNotFoundError(path) + return exs - rm = sync_wrapper(_rm) + rm = asyn.sync_wrapper(_rm) async def _pipe_file( self, @@ -1824,7 +1840,7 @@ def commit(self): def _initiate_upload(self): """Create multi-upload""" - self.location = sync( + self.location = asyn.sync( self.gcsfs.loop, initiate_upload, self.gcsfs, @@ -1854,7 +1870,7 @@ def _simple_upload(self): """One-shot upload, less than 5MB""" self.buffer.seek(0) data = self.buffer.read() - sync( + asyn.sync( self.gcsfs.loop, simple_upload, self.gcsfs, diff --git a/gcsfs/tests/test_core.py b/gcsfs/tests/test_core.py index 1415d66f..f9a9c65d 100644 --- a/gcsfs/tests/test_core.py +++ b/gcsfs/tests/test_core.py @@ -183,9 +183,8 @@ def test_rm(gcs): assert gcs.exists(a) gcs.rm(a) assert not gcs.exists(a) - # silently ignored for now - # with pytest.raises((OSError, IOError)): - # gcs.rm(TEST_BUCKET + "/nonexistent") + with pytest.raises((OSError, IOError)): + gcs.rm(TEST_BUCKET + "/nonexistent") with pytest.raises((OSError, IOError)): gcs.rm("nonexistent")