Skip to content

Commit

Permalink
feat: retry building indices until we see it in the DB (#1595)
Browse files Browse the repository at this point in the history
* For some reason previously index creation would succeed, but still be
  missing for a non-deterministic set of indices
* This changes the retry logic to preempt retries with checking for
  success criteria.
* Then we select for indices and retry until we see it in the database
  • Loading branch information
ryscheng authored Jun 5, 2024
1 parent 7e6af78 commit d955a1e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 28 deletions.
110 changes: 82 additions & 28 deletions warehouse/bq2cloudsql/cloudsql.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,76 @@ def _get_conn():

return _get_conn


def retry(
callable: Callable,
error_handler: Callable[[Exception], bool],
check_success: Callable[[Optional[Exception], bool], bool] = lambda e, r: r,
retries: int = 10,
wait_time: int = 2,
) -> Callable:
"""Wraps a function in retry logic
Parameters
----------
callable: Callable
The function to call
check_success: Callable[[Optional[Exception], bool], bool]
Pass in 2 parameters
- most recently caught exception
- boolean of whether the callable completed
Returns whether we need to try again
retries: int
Max number of times to retry
wait_time: int
Time to wait in between first and second try (in seconds)
This is doubled after every failed attempt
Returns
-------
Callable
a callable function that embodies retry logic
"""
def call(*args, **kwargs):
wait = wait_time
exc = None
for _i in range(retries):
last_exception = None
last_result = None
last_run_finished = False
for i in range(retries):
try:
return callable(*args, **kwargs)
if check_success(last_exception, last_run_finished):
return last_result
last_run_finished = False
print("[re]trying...")
last_result = callable(*args, **kwargs)
last_run_finished = True
except Exception as e:
exc = e
if not error_handler(e):
raise e
print("retrying")
time.sleep(wait)
wait += wait
raise exc

last_exception = e
time.sleep(wait_time * (i+1))
raise last_exception
return call


def handle_cloudsql_error(e: Exception):
def cloudsql_check_success(e: Optional[Exception], run_success: bool):
""" Generic check success handler for using `retry()` with CloudSQL
Parameters
----------
e: Exception
The most recent exception
run_success: bool
Whether the last run was successful
Returns
-------
bool
True if we are safe to return
False if we need to try again
"""
# HttpError and SSLEOFError are worth trying again
if isinstance(e, HttpError):
return True
return False
if isinstance(e, ssl.SSLEOFError):
return True
return False

return False
# All other exceptions, treat as fatal
if e is not None:
raise e
return run_success

class CloudSQLClient(object):
@classmethod
Expand Down Expand Up @@ -118,24 +156,40 @@ def ensure_table(self, table_name: str, columns: List[Column]):
Table(table_name, metadata, *columns)
metadata.create_all(self.sql_conn)

def build_index(self, table_name: str, index_names: Optional[Dict[str, List[str]]]):
def build_index(self, table_name: str, index_definitions: Optional[Dict[str, List[str]]]):
print(f"Building indices for {table_name}:")

# Check that the table exists
if not sqlalchemy.inspect(self.sql_conn).has_table(table_name):
raise Exception(f"Failed to build indices for {table_name}: the table does not exist")
elif index_names is None:
elif index_definitions is None:
print(f"No indices found for {table_name}")
return

for key in index_names:
column_names = index_names[key]
print(f"- creating Index({key}) as {column_names}")
def create_single_index(index_name: str, column_names: List[str]):
with self.begin() as conn:
sql_str = f"CREATE INDEX IF NOT EXISTS {key} ON {table_name}({','.join(column_names)})"
sql_str = f"CREATE INDEX {index_name} ON {table_name}({','.join(column_names)})"
print(sql_str)
conn.execute(text(sql_str))

for key in index_definitions:
print(f"- creating Index({key})")
def index_check_success(e: Optional[Exception], run_success: bool):
# First check if the index already exists, this is truth
with self.begin() as conn:
sql_str = f"SELECT * FROM pg_indexes WHERE tablename = '{table_name}' AND indexname = '{key}'"
print(f"Checking if index exists: {sql_str}")
index_results = conn.execute(text(sql_str))
index_list = [*index_results]
#print(index_list)
if len(index_list) > 0:
print(f"Index {key} already exists! Skipping...")
return True
# We don't actually care whether there's an exception or success
# If the index doesn't exist, try again
return False
retry(create_single_index, index_check_success)(key, index_definitions[key])

def import_csv(self, csv_uri: str, table: str, columns: None | List[str] = None):
print("importing into %s" % table)
csv_import_options = dict(table=table)
Expand All @@ -155,7 +209,7 @@ def import_csv(self, csv_uri: str, table: str, columns: None | List[str] = None)
instance=self._instance_id,
body=dict(importContext=body),
)
response = retry(request.execute, handle_cloudsql_error)()
response = retry(request.execute, cloudsql_check_success)()

print("resp")
pp.pprint(response)
Expand All @@ -166,7 +220,7 @@ def import_csv(self, csv_uri: str, table: str, columns: None | List[str] = None)
req = self._client.operations().get(
project=self._project_id, operation=operation_id
)
r = retry(req.execute, handle_cloudsql_error)()
r = retry(req.execute, cloudsql_check_success)()
pp.pprint(r)
if r["status"] not in ["PENDING", "RUNNING"]:
if r["status"] != "DONE":
Expand Down
3 changes: 3 additions & 0 deletions warehouse/bq2cloudsql/script.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ def run():
# Automatically discover dbt marts
table_sync_configs = table_sync_config_from_dbt_marts(os.environ.get("DBT_TARGET"))

### Testing
#table_sync_configs = table_sync_configs[3:4]

# Run sync
synchronizer = BigQueryCloudSQLSynchronizer(
bq,
Expand Down

0 comments on commit d955a1e

Please sign in to comment.