Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Feb 29, 2024
1 parent bffcd93 commit a26390e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 46 deletions.
102 changes: 59 additions & 43 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from urllib.parse import urlsplit

import fsspec
from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper
from fsspec import asyn
from fsspec.callbacks import NoOpCallback
from fsspec.implementations.http import get_client
from fsspec.utils import setup_logging, stringify_path
Expand Down Expand Up @@ -151,7 +151,7 @@ def _coalesce_generation(*args):
return generations.pop()


class GCSFileSystem(AsyncFileSystem):
class GCSFileSystem(asyn.AsyncFileSystem):
r"""
Connect to Google Cloud Storage.
Expand Down Expand Up @@ -349,7 +349,7 @@ def close_session(loop, session):
pass

try:
sync(loop, session.close, timeout=0.1)
asyn.sync(loop, session.close, timeout=0.1)
except fsspec.FSTimeoutError:
pass
else:
Expand Down Expand Up @@ -452,13 +452,14 @@ async def _call(
else:
return headers, contents

call = sync_wrapper(_call)
call = asyn.sync_wrapper(_call)

@property
def buckets(self):
"""Return list of available project buckets."""
return [
b["name"] for b in sync(self.loop, self._list_buckets, timeout=self.timeout)
b["name"]
for b in asyn.sync(self.loop, self._list_buckets, timeout=self.timeout)
]

def _process_object(self, bucket, object_metadata):
Expand Down Expand Up @@ -492,7 +493,7 @@ async def _make_bucket_requester_pays(self, path, state=True):
json = {"billing": {"requesterPays": state}}
await self._call("PATCH", f"b/{path}", json=json)

make_bucket_requester_pays = sync_wrapper(_make_bucket_requester_pays)
make_bucket_requester_pays = asyn.sync_wrapper(_make_bucket_requester_pays)

async def _get_object(self, path):
"""Return object information at the given path."""
Expand Down Expand Up @@ -894,7 +895,7 @@ async def _mkdir(
)
self.invalidate_cache(bucket)

mkdir = sync_wrapper(_mkdir)
mkdir = asyn.sync_wrapper(_mkdir)

async def _rmdir(self, bucket):
"""Delete an empty bucket
Expand All @@ -912,7 +913,7 @@ async def _rmdir(self, bucket):
self.invalidate_cache(bucket)
self.invalidate_cache("")

rmdir = sync_wrapper(_rmdir)
rmdir = asyn.sync_wrapper(_rmdir)

def modified(self, path):
return self.info(path)["mtime"]
Expand Down Expand Up @@ -1040,7 +1041,7 @@ async def _getxattr(self, path, attr):
meta = (await self._info(path)).get("metadata", {})
return meta[attr]

getxattr = sync_wrapper(_getxattr)
getxattr = asyn.sync_wrapper(_getxattr)

async def _setxattrs(
self,
Expand Down Expand Up @@ -1103,7 +1104,7 @@ async def _setxattrs(
)
return o_json.get("metadata", {})

setxattrs = sync_wrapper(_setxattrs)
setxattrs = asyn.sync_wrapper(_setxattrs)

async def _merge(self, path, paths, acl=None):
"""Concatenate objects within a single bucket"""
Expand All @@ -1123,7 +1124,7 @@ async def _merge(self, path, paths, acl=None):
},
)

merge = sync_wrapper(_merge)
merge = asyn.sync_wrapper(_merge)

async def _cp_file(self, path1, path2, acl=None, **kwargs):
"""Duplicate remote file"""
Expand Down Expand Up @@ -1167,7 +1168,7 @@ async def _rm_file(self, path, **kwargs):
else:
await self._rmdir(path)

async def _rm_files(self, paths):
async def _rm_files(self, paths, batchsize=20):
import random

template = (
Expand All @@ -1179,12 +1180,11 @@ async def _rm_files(self, paths):
"Content-Type: application/json\n"
"accept: application/json\ncontent-length: 0\n"
)
errors = []
success = []
out = []
# Splitting requests into 100 chunk batches
# See https://cloud.google.com/storage/docs/batch
for retry in range(1, 6):
for chunk in _chunks(paths, 100):
for chunk in _chunks(paths, 20):
parts = []
for i, p in enumerate(chunk):
bucket, key, generation = self.split_path(p)
Expand All @@ -1209,35 +1209,37 @@ async def _rm_files(self, paths):
)

boundary = headers["Content-Type"].split("=", 1)[1]
parents = [self._parent(p) for p in paths]
[self.invalidate_cache(parent) for parent in parents + list(paths)]
parents = set(self._parent(p) for p in paths) | set(paths)
[self.invalidate_cache(parent) for parent in parents]
txt = content.decode()
responses = txt.split(boundary)[1:-1]
remaining = []
for path, response in zip(paths, responses):
m = re.search("HTTP/[0-9.]+ ([0-9]+)", response)
code = int(m.groups()[0]) if m else None
if code in [200, 204]:
success.append(path)
elif code in errs:
out.append(path)
elif code in errs and i < 5:
remaining.append(path)
else:
msg = re.search("({.*})", response.replace("\n", ""))
msg = re.search("{(.*)}", response.replace("\n", ""))
if msg:
errors.append(json.loads(msg.groups()[0])["error"])
msg2 = re.search("({.*})", msg.groups()[0])
else:
errors.append((path, code))
if errors:
break
msg2 = None
if msg and msg2:
out.append(IOError(msg2.groups()[0]))
else:
out.append(IOError(str(path, code)))
if remaining:
paths = remaining
await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32))
else:
end = True
break

if errors:
raise OSError(errors)
return success
if end:
break
return out

@property
def on_google(self):
Expand All @@ -1249,33 +1251,47 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
dirs = [p for p in paths if not self.split_path(p)[1]]
if self.on_google:
# emulators do not support batch
exs = await asyncio.gather(
*(
exs = sum(
await asyn._run_coros_in_chunks(
[
self._rm_files(files[i : i + batchsize])
for i in range(0, len(files), batchsize)
]
],
return_exceptions=True,
),
return_exceptions=True,
[],
)
else:
exs = await asyncio.gather(
*([self._rm_file(f) for f in files]),
return_exceptions=True,
exs = await asyn._run_coros_in_chunks(
[self._rm_file(f) for f in files], return_exceptions=True, batch_size=5
)

exs = [
# buckets
exs.extend(
await asyncio.gather(
*[self._rmdir(d) for d in dirs], return_exceptions=True
)
)
errors = [
ex
for ex in exs
if ex is not None
if isinstance(ex, Exception)
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
]
if exs:
raise exs[0]
await asyncio.gather(*[self._rmdir(d) for d in dirs])
if errors:
raise errors[0]
exs = [
ex
for ex in exs
if "No such object" not in str(ex) and not isinstance(ex, FileNotFoundError)
]
if not exs:
# nothing got deleted
raise FileNotFoundError(path)
return exs

rm = sync_wrapper(_rm)
rm = asyn.sync_wrapper(_rm)

async def _pipe_file(
self,
Expand Down Expand Up @@ -1824,7 +1840,7 @@ def commit(self):

def _initiate_upload(self):
"""Create multi-upload"""
self.location = sync(
self.location = asyn.sync(
self.gcsfs.loop,
initiate_upload,
self.gcsfs,
Expand Down Expand Up @@ -1854,7 +1870,7 @@ def _simple_upload(self):
"""One-shot upload, less than 5MB"""
self.buffer.seek(0)
data = self.buffer.read()
sync(
asyn.sync(
self.gcsfs.loop,
simple_upload,
self.gcsfs,
Expand Down
5 changes: 2 additions & 3 deletions gcsfs/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,8 @@ def test_rm(gcs):
assert gcs.exists(a)
gcs.rm(a)
assert not gcs.exists(a)
# silently ignored for now
# with pytest.raises((OSError, IOError)):
# gcs.rm(TEST_BUCKET + "/nonexistent")
with pytest.raises((OSError, IOError)):
gcs.rm(TEST_BUCKET + "/nonexistent")
with pytest.raises((OSError, IOError)):
gcs.rm("nonexistent")

Expand Down

0 comments on commit a26390e

Please sign in to comment.