Skip to content

Commit

Permalink
fix: retry logic can silently just give up when creating indices (#1598)
Browse files Browse the repository at this point in the history
* Limit the number of retries for CREATE INDEX to 3
* After 3 retries, just give up and keep going, rather than raise an
  exception
  • Loading branch information
ryscheng authored Jun 6, 2024
1 parent d955a1e commit 0a3c000
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions warehouse/bq2cloudsql/cloudsql.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def retry(
check_success: Callable[[Optional[Exception], bool], bool] = lambda e, r: r,
retries: int = 10,
wait_time: int = 2,
give_up_after_retry: bool = False,
) -> Callable:
"""Wraps a function in retry logic
Expand All @@ -54,6 +55,9 @@ def retry(
wait_time: int
Time to wait in between first and second try (in seconds)
This is doubled after every failed attempt
give_up_after_retry: bool
After all retries, should we just let it slide?
If this is False, we raise an exception
Returns
-------
Expand All @@ -66,16 +70,29 @@ def call(*args, **kwargs):
last_run_finished = False
for i in range(retries):
try:
# Allow check_success to bypass the initial run
if check_success(last_exception, last_run_finished):
return last_result
print(f"{'' if i < 1 else 're'}trying attempt {i+1}...")
last_run_finished = False
print("[re]trying...")
last_result = callable(*args, **kwargs)
last_run_finished = True
except Exception as e:
last_exception = e
# Exponential back-off
time.sleep(wait_time * (i+1))
raise last_exception
if check_success(last_exception, last_run_finished):
# Check the last run
return last_result
elif give_up_after_retry:
print(f"Exhausted retries, giving up")
return last_result
elif last_exception is not None:
# Raise last known exception
raise last_exception
else:
raise Exception(f"Failed {retries} retry attempts")

return call

def cloudsql_check_success(e: Optional[Exception], run_success: bool):
Expand Down Expand Up @@ -187,8 +204,9 @@ def index_check_success(e: Optional[Exception], run_success: bool):
return True
# We don't actually care whether there's an exception or success
# If the index doesn't exist, try again
print(f"Index {key} is missing...")
return False
retry(create_single_index, index_check_success)(key, index_definitions[key])
retry(create_single_index, index_check_success, retries=3, give_up_after_retry=True)(key, index_definitions[key])

def import_csv(self, csv_uri: str, table: str, columns: None | List[str] = None):
print("importing into %s" % table)
Expand Down

0 comments on commit 0a3c000

Please sign in to comment.