Skip to content

Commit

Permalink
retry bulk rm
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant committed Feb 27, 2024
1 parent d615109 commit b652449
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 39 deletions.
86 changes: 53 additions & 33 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .checkers import get_consistency_checker
from .credentials import GoogleCredentials
from .inventory_report import InventoryReport
from .retry import retry_request, validate_response
from .retry import errs, retry_request, validate_response

logger = logging.getLogger("gcsfs")

Expand Down Expand Up @@ -1168,6 +1168,8 @@ async def _rm_file(self, path, **kwargs):
await self._rmdir(path)

async def _rm_files(self, paths):
import random

template = (
"\n--===============7330845974216740156==\n"
"Content-Type: application/http\n"
Expand All @@ -1178,46 +1180,64 @@ async def _rm_files(self, paths):
"accept: application/json\ncontent-length: 0\n"
)
errors = []
success = []
# Splitting requests into 100 chunk batches
# See https://cloud.google.com/storage/docs/batch
for chunk in _chunks(paths, 100):
parts = []
for i, p in enumerate(chunk):
bucket, key, generation = self.split_path(p)
query = f"?generation={generation}" if generation else ""
parts.append(
template.format(
i=i + 1,
bucket=quote(bucket),
key=quote(key),
query=query,
for retry in range(1, 6):
for chunk in _chunks(paths, 100):
parts = []
for i, p in enumerate(chunk):
bucket, key, generation = self.split_path(p)
query = f"?generation={generation}" if generation else ""
parts.append(
template.format(
i=i + 1,
bucket=quote(bucket),
key=quote(key),
query=query,
)
)
body = "".join(parts)
headers, content = await self._call(
"POST",
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)
body = "".join(parts)
headers, content = await self._call(
"POST",
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)

boundary = headers["Content-Type"].split("=", 1)[1]
parents = [self._parent(p) for p in paths]
[self.invalidate_cache(parent) for parent in parents + list(paths)]
txt = content.decode()
if any(
not ("200 OK" in c or "204 No Content" in c)
for c in txt.split(boundary)[1:-1]
):
pattern = '"message": "([^"]+)"'
out = set(re.findall(pattern, txt))
errors.extend(out)
boundary = headers["Content-Type"].split("=", 1)[1]
parents = [self._parent(p) for p in paths]
[self.invalidate_cache(parent) for parent in parents + list(paths)]
txt = content.decode()
responses = txt.split(boundary)[1:-1]
remaining = []
for path, response in zip(paths, responses):
m = re.search("HTTP/[0-9.]+ ([0-9]+)", response)
code = int(m.groups()[0]) if m else None
if code in [200, 204]:
success.append(path)
elif code in errs:
remaining.append(path)
else:
msg = re.search("({.*})", response.replace("\n", ""))
if msg:
errors.append(json.loads(msg.groups()[0])["error"])
else:
errors.append((path, code))
if errors:
break
if remaining:
paths = remaining
await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32))
else:
break

if errors:
raise OSError(errors)
return success

@property
def on_google(self):
Expand Down
15 changes: 9 additions & 6 deletions gcsfs/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,17 @@ class ChecksumError(Exception):
)


errs = list(range(500, 505)) + [
# Request Timeout
408,
# Too Many Requests
429,
]


def is_retriable(exception):
"""Returns True if this exception is retriable."""
errs = list(range(500, 505)) + [
# Request Timeout
408,
# Too Many Requests
429,
]

errs += [str(e) for e in errs]
if isinstance(exception, HttpError):
return exception.code in errs
Expand Down

0 comments on commit b652449

Please sign in to comment.