-
Notifications
You must be signed in to change notification settings - Fork 72
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1037 from CodeForPhilly/lebovits/issu1015-cleanup…
…-new-pipeline Lebovits/issu1015 cleanup new pipeline
- Loading branch information
Showing
31 changed files
with
2,249 additions
and
1,881 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,155 +1,107 @@ | ||
import sys | ||
|
||
from new_etl.data_utils.access_process import access_process | ||
from new_etl.data_utils.contig_neighbors import contig_neighbors | ||
from new_etl.data_utils.dev_probability import dev_probability | ||
from new_etl.data_utils.negligent_devs import negligent_devs | ||
from new_etl.data_utils.opa_properties import opa_properties | ||
from new_etl.data_utils.priority_level import priority_level | ||
from new_etl.data_utils.vacant_properties import vacant_properties | ||
from new_etl.data_utils.pwd_parcels import pwd_parcels | ||
from new_etl.data_utils.city_owned_properties import city_owned_properties | ||
from new_etl.data_utils.phs_properties import phs_properties | ||
from new_etl.data_utils.li_violations import li_violations | ||
from new_etl.data_utils.li_complaints import li_complaints | ||
from new_etl.data_utils.rco_geoms import rco_geoms | ||
from new_etl.data_utils.council_dists import council_dists | ||
from new_etl.data_utils.tree_canopy import tree_canopy | ||
from new_etl.data_utils.nbhoods import nbhoods | ||
from new_etl.data_utils.gun_crimes import gun_crimes | ||
from new_etl.data_utils.drug_crimes import drug_crimes | ||
from new_etl.data_utils.delinquencies import delinquencies | ||
from new_etl.data_utils.unsafe_buildings import unsafe_buildings | ||
from new_etl.data_utils.imm_dang_buildings import imm_dang_buildings | ||
from new_etl.data_utils.tactical_urbanism import tactical_urbanism | ||
from new_etl.data_utils.conservatorship import conservatorship | ||
from new_etl.data_utils.owner_type import owner_type | ||
from new_etl.data_utils.community_gardens import community_gardens | ||
from new_etl.data_utils.park_priority import park_priority | ||
from new_etl.data_utils.ppr_properties import ppr_properties | ||
|
||
import pandas as pd | ||
import traceback | ||
|
||
from config.psql import conn | ||
from config.config import tiles_file_id_prefix | ||
|
||
from new_etl.classes.slack_reporters import send_dataframe_profile_to_slack, send_pg_stats_to_slack, send_error_to_slack | ||
from new_etl.classes.data_diff import DiffReport | ||
from new_etl.data_utils import * | ||
from new_etl.database import to_postgis_with_schema | ||
|
||
# Ensure the directory containing awkde is in the Python path | ||
awkde_path = "/usr/src/app" | ||
if awkde_path not in sys.path: | ||
sys.path.append(awkde_path) | ||
|
||
services = [ | ||
# vacant designation | ||
vacant_properties, # needs to run early so that other utils can make use of the `vacant` designation | ||
# geometries/areas | ||
pwd_parcels, | ||
council_dists, | ||
nbhoods, | ||
rco_geoms, | ||
# ownership | ||
city_owned_properties, | ||
phs_properties, | ||
community_gardens, | ||
ppr_properties, | ||
owner_type, | ||
# quality of life | ||
li_violations, | ||
li_complaints, | ||
tree_canopy, | ||
gun_crimes, | ||
drug_crimes, | ||
delinquencies, | ||
unsafe_buildings, | ||
imm_dang_buildings, | ||
# development | ||
contig_neighbors, | ||
dev_probability, | ||
negligent_devs, | ||
# access/interventions | ||
tactical_urbanism, | ||
conservatorship, | ||
park_priority, | ||
] | ||
|
||
dataset = opa_properties() | ||
|
||
print("Initial Dataset:") | ||
print("Shape:", dataset.gdf.shape) | ||
print("Head:\n", dataset.gdf.head()) | ||
print("NA Counts:\n", dataset.gdf.isna().sum()) | ||
|
||
for service in services: | ||
dataset = service(dataset) | ||
print(f"After {service.__name__}:") | ||
print("Dataset type:", type(dataset.gdf).__name__) | ||
print("Shape:", dataset.gdf.shape) | ||
print("Head:\n", dataset.gdf.head()) | ||
print("NA Counts:\n", dataset.gdf.isna().sum()) | ||
|
||
before_drop = dataset.gdf.shape[0] | ||
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id") | ||
after_drop = dataset.gdf.shape[0] | ||
print( | ||
f"Duplicate dataset rows dropped after initial services: {before_drop - after_drop}" | ||
) | ||
|
||
# Add Priority Level | ||
dataset = priority_level(dataset) | ||
|
||
# Print the distribution of "priority_level" | ||
distribution = dataset.gdf["priority_level"].value_counts() | ||
print("Distribution of priority level:") | ||
print(distribution) | ||
|
||
# Add Access Process | ||
dataset = access_process(dataset) | ||
|
||
# Print the distribution of "access_process" | ||
distribution = dataset.gdf["access_process"].value_counts() | ||
print("Distribution of access process:") | ||
print(distribution) | ||
|
||
before_drop = dataset.gdf.shape[0] | ||
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id") | ||
after_drop = dataset.gdf.shape[0] | ||
print(f"Duplicate final dataset rows droppeds: {before_drop - after_drop}") | ||
|
||
# Convert problematic columns to numeric | ||
numeric_columns = [ | ||
"market_value", | ||
"sale_price", | ||
"total_assessment", | ||
"total_due", | ||
"num_years_owed", | ||
"permit_count", | ||
] | ||
for col in numeric_columns: | ||
dataset.gdf[col] = pd.to_numeric(dataset.gdf[col], errors="coerce") | ||
|
||
dataset.gdf["most_recent_year_owed"] = dataset.gdf["most_recent_year_owed"].astype(str) | ||
|
||
print("Column data types before exporting to Parquet:") | ||
print(dataset.gdf.dtypes) | ||
|
||
# Quick dataset profiling | ||
print("\nQuick dataset profile:") | ||
|
||
# 1) Number of NA values per column | ||
print("\nNumber of NA values per column:") | ||
print(dataset.gdf.isna().sum()) | ||
|
||
# 2) Mean, median, and std of numeric columns | ||
print("\nMean, Median, and Standard Deviation of numeric columns:") | ||
numeric_columns = dataset.gdf.select_dtypes(include=["float", "int"]).columns | ||
|
||
for column in numeric_columns: | ||
mean = dataset.gdf[column].mean() | ||
median = dataset.gdf[column].median() | ||
std = dataset.gdf[column].std() | ||
print(f"{column}:\n Mean: {mean:.2f}\n Median: {median:.2f}\n Std: {std:.2f}") | ||
|
||
# 3) Number of unique values in string columns | ||
print("\nNumber of unique values in string columns:") | ||
string_columns = dataset.gdf.select_dtypes(include=["object", "string"]).columns | ||
unique_values = dataset.gdf[string_columns].nunique() | ||
print(unique_values) | ||
|
||
dataset.gdf.to_parquet("tmp/test_output.parquet") | ||
|
||
try: | ||
|
||
print("Starting ETL process.") | ||
|
||
services = [ | ||
vacant_properties, # Run early for other utils to use the `vacant` designation | ||
pwd_parcels, | ||
council_dists, | ||
nbhoods, | ||
rco_geoms, | ||
city_owned_properties, | ||
phs_properties, | ||
community_gardens, | ||
ppr_properties, | ||
owner_type, | ||
li_violations, | ||
li_complaints, | ||
tree_canopy, | ||
gun_crimes, | ||
drug_crimes, | ||
delinquencies, | ||
unsafe_buildings, | ||
imm_dang_buildings, | ||
contig_neighbors, | ||
dev_probability, | ||
negligent_devs, | ||
tactical_urbanism, | ||
conservatorship, | ||
park_priority, | ||
] | ||
|
||
print("Loading OPA properties dataset.") | ||
dataset = opa_properties() | ||
|
||
for service in services: | ||
print(f"Running service: {service.__name__}") | ||
dataset = service(dataset) | ||
|
||
print("Applying final dataset transformations.") | ||
dataset = priority_level(dataset) | ||
dataset = access_process(dataset) | ||
|
||
# Drop duplicates | ||
before_drop = dataset.gdf.shape[0] | ||
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id") | ||
print(f"Duplicate rows dropped: {before_drop - dataset.gdf.shape[0]}") | ||
|
||
# Convert columns to numeric where necessary | ||
numeric_columns = [ | ||
"market_value", | ||
"sale_price", | ||
"total_assessment", | ||
"total_due", | ||
"num_years_owed", | ||
"permit_count", | ||
] | ||
dataset.gdf[numeric_columns] = dataset.gdf[numeric_columns].apply(pd.to_numeric, errors="coerce") | ||
dataset.gdf["most_recent_year_owed"] = dataset.gdf["most_recent_year_owed"].astype(str) | ||
|
||
# Dataset profiling | ||
send_dataframe_profile_to_slack(dataset.gdf, "all_properties_end") | ||
|
||
# Save dataset to PostgreSQL | ||
to_postgis_with_schema(dataset.gdf, "all_properties_end", conn) | ||
|
||
# Generate and send diff report | ||
diff_report = DiffReport() | ||
diff_report.run() | ||
|
||
send_pg_stats_to_slack(conn) # Send PostgreSQL stats to Slack | ||
|
||
# Save local Parquet file | ||
parquet_path = "tmp/test_output.parquet" | ||
dataset.gdf.to_parquet(parquet_path) | ||
print(f"Dataset saved to Parquet: {parquet_path}") | ||
|
||
# Publish only vacant properties | ||
dataset.gdf = dataset.gdf[dataset.gdf["vacant"]] | ||
dataset.build_and_publish(tiles_file_id_prefix) | ||
|
||
# Finalize | ||
conn.commit() | ||
conn.close() | ||
print("ETL process completed successfully.") | ||
|
||
except Exception as e: | ||
error_message = f"Error in backend job: {str(e)}\n\n{traceback.format_exc()}" | ||
send_error_to_slack(error_message) | ||
raise # Optionally re-raise the exception |
Oops, something went wrong.