Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lebovits/issu1015 cleanup new pipeline #1037

Merged
merged 26 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
021dd0e
update deps
nlebovits Nov 27, 2024
0666f1f
lint, format
nlebovits Nov 27, 2024
b51385f
add docstrings, typing; set num workers dynamically; minor tweaks to …
nlebovits Nov 27, 2024
f36f1d1
switch priority level calc to use z-score instead of percentile
nlebovits Nov 27, 2024
457099b
remove some logging
nlebovits Nov 28, 2024
f1d1c58
add draft database connection --no-verify
nlebovits Nov 28, 2024
87538b4
commit working draft of hypertable creation (still needs to be cleane…
nlebovits Dec 4, 2024
f858b84
successfully posting to postgres w tsdb extension
nlebovits Dec 4, 2024
11231c6
get constituent tables set up as hypertables
nlebovits Dec 5, 2024
b8268c7
add month partitioning and compression policies to hypertables
nlebovits Dec 5, 2024
0b6f407
add slack reporter for hypertable sizes
nlebovits Dec 5, 2024
6496cf4
ruff
nlebovits Dec 5, 2024
dddca54
relocate dev deps in pipfile; reinstall
nlebovits Dec 8, 2024
54d78e1
fix issue with incorrect comment
nlebovits Dec 8, 2024
7c7fce2
remove outdated parquet write to GCS
nlebovits Dec 8, 2024
48bf627
restore post to GCP; data diff not yet working correctly
nlebovits Dec 9, 2024
b113fde
modularize large parts of featurelayer; add slack reporter for data QC
nlebovits Dec 9, 2024
8df4b56
create new draft of diff report for new timestamp approach
nlebovits Dec 9, 2024
dd998ed
modularize database, data loaders components of featurelayer.py
nlebovits Dec 9, 2024
36eb609
set up data diffing
nlebovits Dec 10, 2024
f566695
update pip deps
nlebovits Dec 10, 2024
5b436b7
get diff report working
nlebovits Dec 11, 2024
432cbb5
clean up logging; add try-except block if main.py fails
nlebovits Dec 11, 2024
0521199
track data_diff.py class
nlebovits Dec 11, 2024
928da95
re-add geoalchemy2 to pipfile
nlebovits Dec 11, 2024
6e82443
remove duplicate, outdated diff report file
nlebovits Dec 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions data/src/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tqdm = "*"
vulture = "*"
pylint = "*"
radon = "*"
ruff = "*"
nlebovits marked this conversation as resolved.
Show resolved Hide resolved

[dev-packages]

Expand Down
1,931 changes: 981 additions & 950 deletions data/src/Pipfile.lock

Large diffs are not rendered by default.

126 changes: 98 additions & 28 deletions data/src/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import sys
import pandas as pd
from config.psql import conn
from sqlalchemy import text
import traceback

from new_etl.classes.slack_pg_reporter import send_pg_stats_to_slack

from new_etl.data_utils.access_process import access_process
from new_etl.data_utils.contig_neighbors import contig_neighbors
Expand Down Expand Up @@ -28,8 +34,8 @@
from new_etl.data_utils.park_priority import park_priority
from new_etl.data_utils.ppr_properties import ppr_properties

import pandas as pd

send_pg_stats_to_slack(conn) # Send PostgreSQL stats to Slack

# Ensure the directory containing awkde is in the Python path
awkde_path = "/usr/src/app"
Expand Down Expand Up @@ -71,42 +77,15 @@

dataset = opa_properties()

print("Initial Dataset:")
print("Shape:", dataset.gdf.shape)
print("Head:\n", dataset.gdf.head())
print("NA Counts:\n", dataset.gdf.isna().sum())

for service in services:
dataset = service(dataset)
print(f"After {service.__name__}:")
print("Dataset type:", type(dataset.gdf).__name__)
print("Shape:", dataset.gdf.shape)
print("Head:\n", dataset.gdf.head())
print("NA Counts:\n", dataset.gdf.isna().sum())

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
print(
f"Duplicate dataset rows dropped after initial services: {before_drop - after_drop}"
)

# Add Priority Level
dataset = priority_level(dataset)

# Print the distribution of "priority_level"
distribution = dataset.gdf["priority_level"].value_counts()
print("Distribution of priority level:")
print(distribution)

# Add Access Process
dataset = access_process(dataset)

# Print the distribution of "access_process"
distribution = dataset.gdf["access_process"].value_counts()
print("Distribution of access process:")
print(distribution)

before_drop = dataset.gdf.shape[0]
dataset.gdf = dataset.gdf.drop_duplicates(subset="opa_id")
after_drop = dataset.gdf.shape[0]
Expand Down Expand Up @@ -152,4 +131,95 @@
unique_values = dataset.gdf[string_columns].nunique()
print(unique_values)


dataset.gdf.to_parquet("tmp/test_output.parquet")
print("Final dataset saved to tmp/ folder.")

try:
# Save GeoDataFrame to PostgreSQL
dataset.gdf.to_postgis(
"vacant_properties_end",
conn,
if_exists="replace", # Replace the table if it already exists
)

# Ensure the `create_date` column exists
conn.execute(
text("""
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_name = 'vacant_properties_end' AND column_name = 'create_date'
) THEN
ALTER TABLE vacant_properties_end ADD COLUMN create_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP;
END IF;
END $$;
""")
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the situation where a table exists without having this column? Ideally table creation would be isolated from data processing. We could consider adding the column in opa_properties, using a "add_create_date" flag if there cases where we don't want to create the date.


# Convert the table to a hypertable
try:
conn.execute(
text("""
SELECT create_hypertable('vacant_properties_end', 'create_date', migrate_data => true);
""")
)
print("Table successfully converted to a hypertable.")
except Exception as e:
nlebovits marked this conversation as resolved.
Show resolved Hide resolved
if "already a hypertable" in str(e):
print("Table is already a hypertable.")
else:
raise

# Set chunk interval to 1 month
try:
conn.execute(
text("""
SELECT set_chunk_time_interval('vacant_properties_end', INTERVAL '1 month');
""")
)
print("Chunk time interval set to 1 month.")
except Exception as e:
print(f"Error setting chunk interval: {e}")
traceback.print_exc()
nlebovits marked this conversation as resolved.
Show resolved Hide resolved

# Enable compression on the hypertable
try:
conn.execute(
text("""
ALTER TABLE vacant_properties_end SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'opa_id'
);
""")
)
print("Compression enabled on table vacant_properties_end.")
except Exception as e:
print(f"Error enabling compression on table vacant_properties_end: {e}")

# Set up compression policy for chunks older than 3 months
nlebovits marked this conversation as resolved.
Show resolved Hide resolved
try:
conn.execute(
text("""
SELECT add_compression_policy('vacant_properties_end', INTERVAL '6 months');
""")
)
print("Compression policy added for chunks older than 6 months.")
except Exception as e:
print(f"Error adding compression policy: {e}")
traceback.print_exc()

# Commit the transaction
conn.commit()
print(
"Data successfully saved and table prepared with partitioning and compression."
)

except Exception as e:
print(f"Error during the table operation: {e}")
traceback.print_exc()
conn.rollback() # Rollback the transaction in case of failure
finally:
conn.close()
Loading
Loading