Skip to content
This repository has been archived by the owner on Feb 22, 2023. It is now read-only.

Commit

Permalink
Reduce flakiness of just init (#627)
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvkb authored Apr 7, 2022
1 parent 045e05f commit 6bcf2ee
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 20 deletions.
43 changes: 25 additions & 18 deletions ingestion_server/ingestion_server/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ingestion_server.indexer import database_connect
from ingestion_server.queries import (
get_copy_data_query,
get_create_ext_query,
get_fdw_query,
get_go_live_query,
)
Expand Down Expand Up @@ -259,12 +260,13 @@ def reload_upstream(
process involves the following steps.
1. Get the list of overlapping columns: ``_get_shared_cols``
2. Create FDW for the data transfer: ``get_fdw_query``
3. Import data into a temporary table: ``get_copy_data_query``
4. Clean the data: ``clean_image_data``
5. Recreate indices from the original table: ``_generate_indices``
6. Recreate constraints from the original table: ``_generate_constraints``
7. Promote the temp table and delete the original: ``get_go_live_query``
2. Create the FDW extension if it does not exist
3. Create FDW for the data transfer: ``get_fdw_query``
4. Import data into a temporary table: ``get_copy_data_query``
5. Clean the data: ``clean_image_data``
6. Recreate indices from the original table: ``_generate_indices``
7. Recreate constraints from the original table: ``_generate_constraints``
8. Promote the temp table and delete the original: ``get_go_live_query``
This is the main function of this module.
Expand All @@ -288,9 +290,17 @@ def reload_upstream(
shared_cols = _get_shared_cols(downstream_db, upstream_db, table)
upstream_db.close()

with downstream_db.cursor() as downstream_cur:
# Step 2: Create FDW for the data transfer
with downstream_db, downstream_db.cursor() as downstream_cur:
# Step 2: Create the FDW extension if it does not exist
log.info("(Re)initializing foreign data wrapper")
try:
create_ext = get_create_ext_query()
downstream_cur.execute(create_ext)
except psycopg2.errors.UniqueViolation:
log.warning("Extension already exists, possible race condition.")

with downstream_db, downstream_db.cursor() as downstream_cur:
# Step 3: Create FDW for the data transfer
init_fdw = get_fdw_query(
RELATIVE_UPSTREAM_DB_HOST,
RELATIVE_UPSTREAM_DB_PORT,
Expand All @@ -301,7 +311,7 @@ def reload_upstream(
)
downstream_cur.execute(init_fdw)

# Step 3: Import data into a temporary table
# Step 4: Import data into a temporary table
log.info("Copying upstream data...")
environment = config("ENVIRONMENT", default="local").lower()
limit_default = 100_000
Expand All @@ -321,42 +331,39 @@ def reload_upstream(
else "re-applying indices & constraints"
)
slack.verbose(f"`{table}`: Data copy complete | _Next: {next_step}_")
downstream_db.commit()
downstream_db.close()

if table == "image":
# Step 4: Clean the data
# Step 5: Clean the data
log.info("Cleaning data...")
clean_image_data(table)
slack.verbose(
f"`{table}`: Data cleaning complete | "
f"_Next: re-applying indices & constraints_"
)

downstream_db = database_connect()
with downstream_db.cursor() as downstream_cur:
# Step 5: Recreate indices from the original table
with downstream_db, downstream_db.cursor() as downstream_cur:
# Step 6: Recreate indices from the original table
log.info("Copying finished! Recreating database indices...")
create_indices, index_mapping = _generate_indices(downstream_db, table)
_update_progress(progress, 50.0)
if create_indices != "":
downstream_cur.execute(";\n".join(create_indices))
_update_progress(progress, 70.0)

# Step 6: Recreate constraints from the original table
# Step 7: Recreate constraints from the original table
log.info("Done creating indices! Remapping constraints...")
remap_constraints = SQL(";\n").join(_generate_constraints(downstream_db, table))
if len(remap_constraints.seq) != 0:
downstream_cur.execute(remap_constraints)
_update_progress(progress, 99.0)
slack.verbose(f"`{table}`: Indices & constraints applied | _Next: go-live_")

# Step 7: Promote the temporary table and delete the original
# Step 8: Promote the temporary table and delete the original
log.info("Done remapping constraints! Going live with new table...")
go_live = get_go_live_query(table, index_mapping)
log.info(f"Running go-live: \n{go_live.as_string(downstream_cur)}")
downstream_cur.execute(go_live)
downstream_db.commit()

downstream_db.close()
log.info(f"Finished refreshing table '{table}'.")
_update_progress(progress, 100.0)
Expand Down
12 changes: 10 additions & 2 deletions ingestion_server/ingestion_server/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def get_existence_queries(table):
return exists_in_deleted_table, exists_in_mature_table


def get_create_ext_query():
"""
Get the query for creating the ``postgres_fdw`` extension, if it does not exist.
:return: the SQL query for creating the FDW extension
"""

return SQL("CREATE EXTENSION IF NOT EXISTS postgres_fdw;")


def get_fdw_query(
host: str, port: int, dbname: str, user: str, password: str, table: str
):
Expand All @@ -52,8 +62,6 @@ def get_fdw_query(

return SQL(
"""
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
DROP SERVER IF EXISTS upstream CASCADE;
CREATE SERVER upstream FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host {host}, dbname {dbname}, port {port});
Expand Down

0 comments on commit 6bcf2ee

Please sign in to comment.