Skip to content

Commit

Permalink
Feature/alcs 1710 etl srw submission (#1467)
Browse files Browse the repository at this point in the history
SRW submission import
  • Loading branch information
mhuseinov authored Mar 4, 2024
1 parent 5812485 commit d57503d
Show file tree
Hide file tree
Showing 9 changed files with 298 additions and 2 deletions.
4 changes: 2 additions & 2 deletions bin/migrate-oats-data/menu/post_launch_commands/srws.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from srw.post_launch import (
init_srw,
process_srw,
clean_srw,
)
from common import setup_and_get_logger
Expand All @@ -17,7 +17,7 @@ def srw_import(console, args):

logger.debug(f"Processing SRW import in batch size = {import_batch_size}")

init_srw(batch_size=import_batch_size)
process_srw(batch_size=import_batch_size)


def srw_clean(console):
Expand Down
9 changes: 9 additions & 0 deletions bin/migrate-oats-data/srw/post_launch/srw_migration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from ..srw_base import init_srw_base, clean_initial_srw
from ..srw_base_update import update_srw_base_fields
from ..submission.srw_submission_init import init_srw_submissions, clean_srw_submissions
from ..submission.srw_proposal_fields import process_alcs_srw_proposal_fields


def process_srw(batch_size):
Expand All @@ -9,7 +11,14 @@ def process_srw(batch_size):
def init_srw(batch_size):
init_srw_base(batch_size)
update_srw_base_fields(batch_size)
_process_srw_submission(batch_size)


def _process_srw_submission(batch_size):
init_srw_submissions(batch_size)
process_alcs_srw_proposal_fields(batch_size)


def clean_srw():
clean_srw_submissions()
clean_initial_srw()
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
SELECT oaa.alr_application_id,
oaa.applicant_file_no,
n.summary,
CASE
WHEN oaa.plan_no IS NOT NULL
OR oaa.control_no IS NOT NULL THEN TRUE
ELSE FALSE
END AS has_survey_plan,
au."uuid",
oaac.component_area
FROM oats.oats_alr_applications oaa
JOIN alcs.notification_submission nos ON nos.file_number = oaa.alr_application_id::TEXT AND type_code = 'SRW'
JOIN alcs.notification n ON n.file_number = nos.file_number
JOIN oats.alcs_etl_srw aes ON aes.alr_application_id = oaa.alr_application_id
JOIN oats.oats_alr_appl_components oaac ON oaac.alr_appl_component_id = aes.alr_appl_component_id
LEFT JOIN alcs."user" au ON oaa.created_guid = au.bceid_guid
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
SELECT count(*)
FROM oats.oats_alr_applications oaa
JOIN alcs.notification_submission nos ON nos.file_number = oaa.alr_application_id::TEXT
LEFT JOIN alcs."user" au ON oaa.created_guid = au.bceid_guid
16 changes: 16 additions & 0 deletions bin/migrate-oats-data/srw/sql/submission/srw_submission_init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
WITH srw_components_grouped AS (
SELECT *
FROM oats.alcs_etl_srw oaa
WHERE oaa.application_class_code = 'NAN'
and oaa.alr_change_code = 'SRW'
)
SELECT n.file_number,
n.type_code,
n.local_government_uuid,
oc.alr_change_code,
srwg.alr_application_id,
oc.alr_appl_component_id
FROM srw_components_grouped srwg
LEFT JOIN alcs.notification n ON n.file_number = srwg.alr_application_id::TEXT
AND n.type_code = 'SRW'
JOIN oats.oats_alr_appl_components oc ON srwg.alr_appl_component_id = oc.alr_appl_component_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
WITH srw_components_grouped AS (
SELECT oaa.alr_application_id
FROM oats.alcs_etl_applications_nois oaa
WHERE oaa.application_class_code = 'NAN'
and oaa.alr_change_code = 'SRW'
)
SELECT count(*)
FROM srw_components_grouped srwg
2 changes: 2 additions & 0 deletions bin/migrate-oats-data/srw/submission/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .srw_submission_init import init_srw_submissions, clean_srw_submissions
from .srw_proposal_fields import process_alcs_srw_proposal_fields
121 changes: 121 additions & 0 deletions bin/migrate-oats-data/srw/submission/srw_proposal_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from common import BATCH_UPLOAD_SIZE, NO_DATA_IN_OATS, setup_and_get_logger
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor, execute_batch

etl_name = "process_alcs_srw_proposal_fields"
logger = setup_and_get_logger(etl_name)


@inject_conn_pool
def process_alcs_srw_proposal_fields(conn=None, batch_size=BATCH_UPLOAD_SIZE):
"""
This function is responsible for populating of the notification_submission in ALCS: proposal fields and soil fields populated with default values.
Args:
conn (psycopg2.extensions.connection): PostgreSQL database connection. Provided by the decorator.
batch_size (int): The number of items to process at once. Defaults to BATCH_UPLOAD_SIZE.
"""

logger.info(f"Start {etl_name}")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"srw/sql/submission/proposal_fields/srw_proposal_fields_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
count_total = dict(cursor.fetchone())["count"]
logger.info(f"Total SRW data to update: {count_total}")

failed_inserts = 0
successful_updates_count = 0
last_application_id = 0

with open(
"srw/sql/submission/proposal_fields/srw_proposal_fields.sql",
"r",
encoding="utf-8",
) as sql_file:
application_sql = sql_file.read()
while True:
cursor.execute(
f"""
{application_sql}
WHERE oaa.alr_application_id > {last_application_id} ORDER BY oaa.alr_application_id;
"""
)

rows = cursor.fetchmany(batch_size)

if not rows:
break
try:
records_to_be_updated_count = len(rows)

_update_records(conn, batch_size, cursor, rows)

successful_updates_count = (
successful_updates_count + records_to_be_updated_count
)
last_application_id = dict(rows[-1])["alr_application_id"]

logger.debug(
f"retrieved/updated items count: {records_to_be_updated_count}; total successfully updated SRWs so far {successful_updates_count}; last updated alr_application_id: {last_application_id}"
)
except Exception as err:
# this is NOT going to be caused by actual data update failure. This code is only executed when the code error appears or connection to DB is lost
logger.exception(err)
conn.rollback()
failed_inserts = count_total - successful_updates_count
last_application_id = last_application_id + 1

logger.info(
f"Finished {etl_name}: total amount of successful updates {successful_updates_count}, total failed updates {failed_inserts}"
)


def _update_records(conn, batch_size, cursor, rows):
parsed_data_list = _prepare_oats_data(rows)

if len(parsed_data_list) > 0:
execute_batch(
cursor,
_update_query,
parsed_data_list,
page_size=batch_size,
)

conn.commit()


_update_query = """
UPDATE
alcs.notification_submission
SET
created_by_uuid = %(created_by_uuid)s,
has_survey_plan = %(has_survey_plan)s,
purpose= %(purpose)s,
submitters_file_number = %(submitters_file_number)s,
total_area = %(total_area)s
WHERE
alcs.notification_submission.file_number = %(file_number)s::TEXT
"""


def _prepare_oats_data(row_data_list):
data_list = []
for row in row_data_list:
data_list.append(_map_fields(dict(row)))
return data_list


def _map_fields(data):
return {
"created_by_uuid": data["uuid"],
"has_survey_plan": data["has_survey_plan"],
"purpose": data["summary"],
"submitters_file_number": data["applicant_file_no"],
"total_area": data["component_area"],
"file_number": data["alr_application_id"],
}
120 changes: 120 additions & 0 deletions bin/migrate-oats-data/srw/submission/srw_submission_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from common import BATCH_UPLOAD_SIZE, OATS_ETL_USER, setup_and_get_logger
from db import inject_conn_pool
from psycopg2.extras import RealDictCursor, execute_batch

etl_name = "init_srw_submissions"
logger = setup_and_get_logger(etl_name)


@inject_conn_pool
def init_srw_submissions(conn=None, batch_size=BATCH_UPLOAD_SIZE):
"""
This function is responsible for initializing the srw_submission in ALCS.
Args:
conn (psycopg2.extensions.connection): PostgreSQL database connection. Provided by the decorator.
batch_size (int): The number of items to process at once. Defaults to BATCH_UPLOAD_SIZE.
"""

logger.info(f"Start {etl_name}")
with conn.cursor(cursor_factory=RealDictCursor) as cursor:
with open(
"srw/sql/submission/srw_submission_init_count.sql",
"r",
encoding="utf-8",
) as sql_file:
count_query = sql_file.read()
cursor.execute(count_query)
count_total = dict(cursor.fetchone())["count"]
logger.info(f"Total SRW Submission data to insert: {count_total}")

failed_inserts = 0
successful_inserts_count = 0
last_submission_id = 0

with open(
"srw/sql/submission/srw_submission_init.sql",
"r",
encoding="utf-8",
) as sql_file:
submission_sql = sql_file.read()
while True:
cursor.execute(
f"{submission_sql} WHERE srwg.alr_application_id > {last_submission_id} ORDER BY srwg.alr_application_id;"
)

rows = cursor.fetchmany(batch_size)

if not rows:
break
try:
submissions_to_be_inserted_count = len(rows)

_insert_srw_submissions(conn, batch_size, cursor, rows)

successful_inserts_count = (
successful_inserts_count + submissions_to_be_inserted_count
)
last_submission_id = dict(rows[-1])["alr_application_id"]

logger.debug(
f"retrieved/inserted items count: {submissions_to_be_inserted_count}; total successfully inserted submissions so far {successful_inserts_count}; last inserted alr_application_id: {last_submission_id}"
)
except Exception as err:
logger.exception(err)
conn.rollback()
failed_inserts = count_total - successful_inserts_count
last_submission_id = last_submission_id + 1

logger.info(
f"Finished {etl_name}: total amount of successful inserts {successful_inserts_count}, total failed inserts {failed_inserts}"
)


def _insert_srw_submissions(conn, batch_size, cursor, rows):
query = _get_insert_query()
parsed_data_list = _prepare_oats_alr_applications_data(rows)

if len(parsed_data_list) > 0:
execute_batch(cursor, query, parsed_data_list, page_size=batch_size)

conn.commit()


def _get_insert_query():
query = f"""
INSERT INTO alcs.notification_submission (
file_number,
local_government_uuid,
type_code,
audit_created_by,
applicant
)
VALUES (
%(file_number)s,
%(local_government_uuid)s,
%(type_code)s,
'{OATS_ETL_USER}',
'Unknown'
)
"""
return query


def _prepare_oats_alr_applications_data(row_data_list):
data_list = []
for row in row_data_list:
data_list.append(dict(row))
return data_list


@inject_conn_pool
def clean_srw_submissions(conn=None):
logger.info("Start srw_submissions cleaning")
with conn.cursor() as cursor:
cursor.execute(
f"DELETE FROM alcs.notification_submission nos WHERE nos.audit_created_by = '{OATS_ETL_USER}' and nos.audit_updated_by is NULL"
)
logger.info(f"Deleted items count = {cursor.rowcount}")

conn.commit()

0 comments on commit d57503d

Please sign in to comment.