Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lebovits/issu1015 cleanup new pipeline #1037

Merged
merged 26 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
021dd0e
update deps
nlebovits Nov 27, 2024
0666f1f
lint, format
nlebovits Nov 27, 2024
b51385f
add docstrings, typing; set num workers dynamically; minor tweaks to …
nlebovits Nov 27, 2024
f36f1d1
switch priority level calc to use z-score instead of percentile
nlebovits Nov 27, 2024
457099b
remove some logging
nlebovits Nov 28, 2024
f1d1c58
add draft database connection --no-verify
nlebovits Nov 28, 2024
87538b4
commit working draft of hypertable creation (still needs to be cleane…
nlebovits Dec 4, 2024
f858b84
successfully posting to postgres w tsdb extension
nlebovits Dec 4, 2024
11231c6
get constituent tables set up as hypertables
nlebovits Dec 5, 2024
b8268c7
add month partitioning and compression policies to hypertables
nlebovits Dec 5, 2024
0b6f407
add slack reporter for hypertable sizes
nlebovits Dec 5, 2024
6496cf4
ruff
nlebovits Dec 5, 2024
dddca54
relocate dev deps in pipfile; reinstall
nlebovits Dec 8, 2024
54d78e1
fix issue with incorrect comment
nlebovits Dec 8, 2024
7c7fce2
remove outdated parquet write to GCS
nlebovits Dec 8, 2024
48bf627
restore post to GCP; data diff not yet working correctly
nlebovits Dec 9, 2024
b113fde
modularize large parts of featurelayer; add slack reporter for data QC
nlebovits Dec 9, 2024
8df4b56
create new draft of diff report for new timestamp approach
nlebovits Dec 9, 2024
dd998ed
modularize database, data loaders components of featurelayer.py
nlebovits Dec 9, 2024
36eb609
set up data diffing
nlebovits Dec 10, 2024
f566695
update pip deps
nlebovits Dec 10, 2024
5b436b7
get diff report working
nlebovits Dec 11, 2024
432cbb5
clean up logging; add try-except block if main.py fails
nlebovits Dec 11, 2024
0521199
track data_diff.py class
nlebovits Dec 11, 2024
928da95
re-add geoalchemy2 to pipfile
nlebovits Dec 11, 2024
6e82443
remove duplicate, outdated diff report file
nlebovits Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions data/src/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,29 @@ matplotlib = "*"
rasterio = "*"
scikit-learn = "*"
mapclassify = "*"
black = "*"
fiona = "*"
esridump = "*"
sqlalchemy = "*"
psycopg2-binary = "*"
geoalchemy2 = "*"
mapbox = "*"
google-cloud-storage = "*"
pydantic = "==2.8.2"
data-diff = {extras = ["postgresql"], version = "*"}
future = "*"
slack-sdk = "*"
pytest = "*"
networkx = "*"
libpysal = "*"
jenkspy = "*"
pyarrow = "*"
tqdm = "*"
geoalchemy2 ="*"

[dev-packages]
black = "*"
pytest = "*"
vulture = "*"
pylint = "*"
radon = "*"

[dev-packages]
ruff = "*"
nlebovits marked this conversation as resolved.
Show resolved Hide resolved

[requires]
python_version = "3.11"
Expand Down
2,599 changes: 1,157 additions & 1,442 deletions data/src/Pipfile.lock

Large diffs are not rendered by default.

244 changes: 98 additions & 146 deletions data/src/main.py
Original file line number Diff line number Diff line change
@@ -1,155 +1,107 @@
import sys

from new_etl.data_utils.access_process import access_process
from new_etl.data_utils.contig_neighbors import contig_neighbors
from new_etl.data_utils.dev_probability import dev_probability
from new_etl.data_utils.negligent_devs import negligent_devs
from new_etl.data_utils.opa_properties import opa_properties
from new_etl.data_utils.priority_level import priority_level
from new_etl.data_utils.vacant_properties import vacant_properties
from new_etl.data_utils.pwd_parcels import pwd_parcels
from new_etl.data_utils.city_owned_properties import city_owned_properties
from new_etl.data_utils.phs_properties import phs_properties
from new_etl.data_utils.li_violations import li_violations
from new_etl.data_utils.li_complaints import li_complaints
from new_etl.data_utils.rco_geoms import rco_geoms
from new_etl.data_utils.council_dists import council_dists
from new_etl.data_utils.tree_canopy import tree_canopy
from new_etl.data_utils.nbhoods import nbhoods
from new_etl.data_utils.gun_crimes import gun_crimes
from new_etl.data_utils.drug_crimes import drug_crimes
from new_etl.data_utils.delinquencies import delinquencies
from new_etl.data_utils.unsafe_buildings import unsafe_buildings
from new_etl.data_utils.imm_dang_buildings import imm_dang_buildings
from new_etl.data_utils.tactical_urbanism import tactical_urbanism
from new_etl.data_utils.conservatorship import conservatorship
from new_etl.data_utils.owner_type import owner_type
from new_etl.data_utils.community_gardens import community_gardens
from new_etl.data_utils.park_priority import park_priority
from new_etl.data_utils.ppr_properties import ppr_properties

import pandas as pd
import traceback

from config.psql import conn
from config.config import tiles_file_id_prefix

from new_etl.classes.slack_reporters import send_dataframe_profile_to_slack, send_pg_stats_to_slack, send_error_to_slack
from new_etl.classes.data_diff import DiffReport
from new_etl.data_utils import *
from new_etl.database import to_postgis_with_schema

# Ensure the directory containing awkde is in the Python path
awkde_path = "/usr/src/app"
if awkde_path not in sys.path:
sys.path.append(awkde_path)

services = [
# vacant designation
vacant_properties, # needs to run early so that other utils can make use of the `vacant` designation
# geometries/areas
pwd_parcels,
council_dists,
nbhoods,
rco_geoms,
# ownership
city_owned_properties,
phs_properties,
community_gardens,
ppr_properties,
owner_type,
# quality of life
li_violations,
li_complaints,
tree_canopy,
gun_crimes,
drug_crimes,
delinquencies,
unsafe_buildings,
imm_dang_buildings,
# development
contig_neighbors,
dev_probability,
negligent_devs,
# access/interventions
tactical_urbanism,
conservatorship,
park_priority,
]

dataset = opa_properties()

print("Initial Dataset:")
print("Shape:", dataset.gdf.shape)
print("Head:\n", dataset.gdf.head())
print("NA Counts:\n", dataset.gdf.isna().sum())

for service in services:
dataset = service(dataset)
print(f"After {service.__name__}:")
print("Dataset type:", type(dataset.gdf).__name__)
print("Shape:", dataset.gdf.shape)
print("Head:\n", dataset.gdf.head())
print("NA Counts:\n", dataset.gdf.isna().sum())

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
print(
f"Duplicate dataset rows dropped after initial services: {before_drop - after_drop}"
)

# Add Priority Level
dataset = priority_level(dataset)

# Print the distribution of "priority_level"
distribution = dataset.gdf["priority_level"].value_counts()
print("Distribution of priority level:")
print(distribution)

# Add Access Process
dataset = access_process(dataset)

# Print the distribution of "access_process"
distribution = dataset.gdf["access_process"].value_counts()
print("Distribution of access process:")
print(distribution)

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
print(f"Duplicate final dataset rows droppeds: {before_drop - after_drop}")

# Convert problematic columns to numeric
numeric_columns = [
"market_value",
"sale_price",
"total_assessment",
"total_due",
"num_years_owed",
"permit_count",
]
for col in numeric_columns:
dataset.gdf[col] = pd.to_numeric(dataset.gdf[col], errors="coerce")

dataset.gdf["most_recent_year_owed"] = dataset.gdf["most_recent_year_owed"].astype(str)

print("Column data types before exporting to Parquet:")
print(dataset.gdf.dtypes)

# Quick dataset profiling
print("\nQuick dataset profile:")

# 1) Number of NA values per column
print("\nNumber of NA values per column:")
print(dataset.gdf.isna().sum())

# 2) Mean, median, and std of numeric columns
print("\nMean, Median, and Standard Deviation of numeric columns:")
numeric_columns = dataset.gdf.select_dtypes(include=["float", "int"]).columns

for column in numeric_columns:
mean = dataset.gdf[column].mean()
median = dataset.gdf[column].median()
std = dataset.gdf[column].std()
print(f"{column}:\n Mean: {mean:.2f}\n Median: {median:.2f}\n Std: {std:.2f}")

# 3) Number of unique values in string columns
print("\nNumber of unique values in string columns:")
string_columns = dataset.gdf.select_dtypes(include=["object", "string"]).columns
unique_values = dataset.gdf[string_columns].nunique()
print(unique_values)

dataset.gdf.to_parquet("tmp/test_output.parquet")

try:

print("Starting ETL process.")

services = [
vacant_properties, # Run early for other utils to use the `vacant` designation
pwd_parcels,
council_dists,
nbhoods,
rco_geoms,
city_owned_properties,
phs_properties,
community_gardens,
ppr_properties,
owner_type,
li_violations,
li_complaints,
tree_canopy,
gun_crimes,
drug_crimes,
delinquencies,
unsafe_buildings,
imm_dang_buildings,
contig_neighbors,
dev_probability,
negligent_devs,
tactical_urbanism,
conservatorship,
park_priority,
]

print("Loading OPA properties dataset.")
dataset = opa_properties()

for service in services:
print(f"Running service: {service.__name__}")
dataset = service(dataset)

print("Applying final dataset transformations.")
dataset = priority_level(dataset)
dataset = access_process(dataset)

# Drop duplicates
before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
print(f"Duplicate rows dropped: {before_drop - dataset.gdf.shape[0]}")

# Convert columns to numeric where necessary
numeric_columns = [
"market_value",
"sale_price",
"total_assessment",
"total_due",
"num_years_owed",
"permit_count",
]
dataset.gdf[numeric_columns] = dataset.gdf[numeric_columns].apply(pd.to_numeric, errors="coerce")
dataset.gdf["most_recent_year_owed"] = dataset.gdf["most_recent_year_owed"].astype(str)

# Dataset profiling
send_dataframe_profile_to_slack(dataset.gdf, "all_properties_end")

# Save dataset to PostgreSQL
to_postgis_with_schema(dataset.gdf, "all_properties_end", conn)

# Generate and send diff report
diff_report = DiffReport()
diff_report.run()

send_pg_stats_to_slack(conn) # Send PostgreSQL stats to Slack

# Save local Parquet file
parquet_path = "tmp/test_output.parquet"
dataset.gdf.to_parquet(parquet_path)
print(f"Dataset saved to Parquet: {parquet_path}")

# Publish only vacant properties
dataset.gdf = dataset.gdf[dataset.gdf["vacant"]]
dataset.build_and_publish(tiles_file_id_prefix)

# Finalize
conn.commit()
conn.close()
print("ETL process completed successfully.")

except Exception as e:
error_message = f"Error in backend job: {str(e)}\n\n{traceback.format_exc()}"
send_error_to_slack(error_message)
raise # Optionally re-raise the exception
Loading
Loading