diff --git a/gcsfs/core.py b/gcsfs/core.py index f425afc0..f6f50036 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -25,7 +25,7 @@ from .checkers import get_consistency_checker from .credentials import GoogleCredentials from .inventory_report import InventoryReport -from .retry import retry_request, validate_response +from .retry import errs, retry_request, validate_response logger = logging.getLogger("gcsfs") @@ -1168,6 +1168,8 @@ async def _rm_file(self, path, **kwargs): await self._rmdir(path) async def _rm_files(self, paths): + import random + template = ( "\n--===============7330845974216740156==\n" "Content-Type: application/http\n" @@ -1178,46 +1180,64 @@ async def _rm_files(self, paths): "accept: application/json\ncontent-length: 0\n" ) errors = [] + success = [] # Splitting requests into 100 chunk batches # See https://cloud.google.com/storage/docs/batch - for chunk in _chunks(paths, 100): - parts = [] - for i, p in enumerate(chunk): - bucket, key, generation = self.split_path(p) - query = f"?generation={generation}" if generation else "" - parts.append( - template.format( - i=i + 1, - bucket=quote(bucket), - key=quote(key), - query=query, + for retry in range(1, 6): + for chunk in _chunks(paths, 100): + parts = [] + for i, p in enumerate(chunk): + bucket, key, generation = self.split_path(p) + query = f"?generation={generation}" if generation else "" + parts.append( + template.format( + i=i + 1, + bucket=quote(bucket), + key=quote(key), + query=query, + ) ) + body = "".join(parts) + headers, content = await self._call( + "POST", + f"{self._location}/batch/storage/v1", + headers={ + "Content-Type": 'multipart/mixed; boundary="==========' + '=====7330845974216740156=="' + }, + data=body + "\n--===============7330845974216740156==--", ) - body = "".join(parts) - headers, content = await self._call( - "POST", - f"{self._location}/batch/storage/v1", - headers={ - "Content-Type": 'multipart/mixed; boundary="==========' - '=====7330845974216740156=="' - }, - data=body + "\n--===============7330845974216740156==--", - ) - boundary = headers["Content-Type"].split("=", 1)[1] - parents = [self._parent(p) for p in paths] - [self.invalidate_cache(parent) for parent in parents + list(paths)] - txt = content.decode() - if any( - not ("200 OK" in c or "204 No Content" in c) - for c in txt.split(boundary)[1:-1] - ): - pattern = '"message": "([^"]+)"' - out = set(re.findall(pattern, txt)) - errors.extend(out) + boundary = headers["Content-Type"].split("=", 1)[1] + parents = [self._parent(p) for p in paths] + [self.invalidate_cache(parent) for parent in parents + list(paths)] + txt = content.decode() + responses = txt.split(boundary)[1:-1] + remaining = [] + for path, response in zip(paths, responses): + m = re.search("HTTP/[0-9.]+ ([0-9]+)", response) + code = int(m.groups()[0]) if m else None + if code in [200, 204]: + success.append(path) + elif code in errs: + remaining.append(path) + else: + msg = re.search("({.*})", response.replace("\n", "")) + if msg: + errors.append(json.loads(msg.groups()[0])["error"]) + else: + errors.append((path, code)) + if errors: + break + if remaining: + paths = remaining + await asyncio.sleep(min(random.random() + 2 ** (retry - 1), 32)) + else: + break if errors: raise OSError(errors) + return success @property def on_google(self): diff --git a/gcsfs/retry.py b/gcsfs/retry.py index 4bb860ba..793eaba5 100644 --- a/gcsfs/retry.py +++ b/gcsfs/retry.py @@ -58,14 +58,17 @@ class ChecksumError(Exception): ) +errs = list(range(500, 505)) + [ + # Request Timeout + 408, + # Too Many Requests + 429, +] + + def is_retriable(exception): """Returns True if this exception is retriable.""" - errs = list(range(500, 505)) + [ - # Request Timeout - 408, - # Too Many Requests - 429, - ] + errs += [str(e) for e in errs] if isinstance(exception, HttpError): return exception.code in errs