Skip to content

Commit

Permalink
updates 2025-01-11 - Refactored pipeline for Version 0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Jan 11, 2025
1 parent 96fb024 commit 07fff9a
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 53 deletions.
46 changes: 25 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Open Street Works Data Pipeline

[![codecov](https://codecov.io/github/CHRISCARLON/Open-Street-Works-Data-Pipeline/branch/new-data-dev-branch/graph/badge.svg?token=T4PLSPAXDE)](https://codecov.io/github/CHRISCARLON/Open-Street-Works-Data-Pipeline)

**Example pipeline processing Street Manager permit data, Ordnance Survey Open USRN Data, and Geoplace SWA Code data.**
Expand All @@ -21,32 +22,35 @@

4. Scottish Road Works Register (SRWR) archived permit data (TBC)

## Open Street Works Data Pipeline in 3 points:
## Open Street Works Data Pipeline in 3 points

>[!NOTE]
> [!NOTE]
> The aim of this project is simple.
>
> Reduce the time it takes to deliver value from open street works data.
1. **It's fast**
- Process an entire month of Street Manager archived permit data ready for analysis in 5 minutes.
- Process an entire year of Street Manager archived permit data ready for anylsis in around 1 hour.
- Process all Street Manager archived pemit data from 2020 to 2024 in the morning and be ready to analyse the data in the afternoon.
- The pipeline utilises batch processing so no need to download, unzip, and deal with saving files to disk - everything is kept in memory.

2. **It's not fussy**
- Run it where you want.
- Run it locally with Docker or a Python Venv if you want.
- Run it on a Google Cloud Function/AWS Lambda Function (with some caveats).
- Run it as Fargate Task on AWS or a Google Compute Engine.

3. **It's flexible**
- The project is modular so you can customise it to fit your own needs.
- Don't want to use AWS Secrets Manager for environment variables? Use another provider or a simple .env file (recommended for local dev only).
- Don't want to use MotherDuck as your data warehouse? Add in a function so the end destination is Google Big Query instead.
- Only want to focus on Street Manager data? Launch the entry point that doesn't process SRWR data.
- You can integrate other tools from the Modern Data Stack such as DLT, DBT, or orchestrators like Airflow and Mage if you want more functionality.
- You can run several instances of the project for different analytical requirements.
**It's fast**

- Process an entire month of Street Manager archived permit data ready for analysis in 5 minutes.
- Process an entire year of Street Manager archived permit data ready for anylsis in around 1 hour.
- Process all Street Manager archived pemit data from 2020 to 2024 in the morning and be ready to analyse the data in the afternoon.
- The pipeline utilises batch processing so no need to download, unzip, and deal with saving files to disk - everything is kept in memory.

**It's not fussy**

- Run it where you want.
- Run it locally with Docker or a Python Venv if you want.
- Run it on a Google Cloud Function/AWS Lambda Function (with some caveats).
- Run it as Fargate Task on AWS or a Google Compute Engine.

**It's flexible**

- The project is modular so you can customise it to fit your own needs.
- Don't want to use AWS Secrets Manager for environment variables? Use another provider or a simple .env file (recommended for local dev only).
- Don't want to use MotherDuck as your data warehouse? Add in a function so the end destination is Google Big Query instead.
- Only want to focus on Street Manager data? Launch the entry point that doesn't process SRWR data.
- You can integrate other tools from the Modern Data Stack such as DLT, DBT, or orchestrators like Airflow and Mage if you want more functionality.
- You can run several instances of the project for different analytical requirements.

## Why did I create this Project?

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ idna = "^3.7"
requests = "^2.31.0"
diagrams = "^0.23.4"
graphviz = "^0.20.3"
dbt-duckdb = "^1.8.0"
python-dotenv = "^1.0.1"
pytest = "^8.2.1"
geopandas = "^0.14.4"
Expand All @@ -36,6 +35,7 @@ fastexcel = "^0.11.5"
msoffcrypto-tool = "^5.4.1"
beautifulsoup4 = "^4.12.3"
odfpy = "^1.4.1"
dbt-duckdb = "^1.9.1"


[build-system]
Expand Down
18 changes: 9 additions & 9 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from pipeline_launch_scripts import (
sm_monthly_permit_main,
os_open_usrns_main,
swa_codes_main,
os_open_linked_ids_main,
# sm_monthly_permit_main,
# os_open_usrns_main,
# swa_codes_main,
# os_open_linked_ids_main,
os_open_roads_main,
)

def main():
sm_monthly_permit_main.main(100000)
os_open_usrns_main.main(100000)
swa_codes_main.main()
os_open_linked_ids_main.main(200000)
# sm_monthly_permit_main.main(100000)
# os_open_usrns_main.main(100000)
# swa_codes_main.main()
# os_open_linked_ids_main.main(200000)
os_open_roads_main.main(100000)

if __name__ == "__main__":
main()
main()
10 changes: 5 additions & 5 deletions src/os_open_roads/create_motherduck_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def create_table(conn):
try:
table_command = f"""CREATE OR REPLACE TABLE "{schema}"."open_roads_latest" (
id VARCHAR,
fictitious BOOLEAN,
fictitious VARCHAR,
road_classification VARCHAR,
road_function VARCHAR,
form_of_way VARCHAR,
Expand All @@ -23,11 +23,11 @@ def create_table(conn):
name_2 VARCHAR,
name_2_lang VARCHAR,
road_structure VARCHAR,
length DOUBLE,
length VARCHAR,
length_uom VARCHAR,
loop BOOLEAN,
primary_route BOOLEAN,
trunk_road BOOLEAN,
loop VARCHAR,
primary_route VARCHAR,
trunk_road VARCHAR,
start_node VARCHAR,
end_node VARCHAR,
road_number_toid VARCHAR,
Expand Down
75 changes: 67 additions & 8 deletions src/os_open_roads/os_open_roads_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,24 +76,56 @@ def load_geopackage_open_roads(url, conn, limit):
try:
# Convert geometry to WKT string
geom = shape(feature['geometry'])
feature['properties']['geometry'] = wkt.dumps(geom)
properties = feature['properties']
properties['geometry'] = wkt.dumps(geom)

# Append the flattened properties to features list
features.append(properties)

except Exception as e:
# If there's an error converting the geometry, set it to None
# and log the index of the feature that failed so we're aware of what features failed
# This could be made better - could we store these errors somewhere for future use?
feature['properties']['geometry'] = None
# If there's an error converting the geometry, get properties and set geometry to None
properties = feature['properties']
properties['geometry'] = None

error_msg = f"Error converting geometry for feature {i}: {e}"
logger.warning(error_msg)
errors.append(error_msg)

# Append each feature to the list
features.append(feature['properties'])
# Append the properties with null geometry
features.append(properties)

# When the list hits the limit size - e.g. 75,000
# Process list into DataFrame
if len(features) == chunk_size:
# Process the chunk
df_chunk = pd.DataFrame(features)

dtype_mapping = {
'id': str,
'fictitious': str,
'road_classification': str,
'road_function': str,
'form_of_way': str,
'road_classification_number': str,
'name_1': str,
'name_1_lang': str,
'name_2': str,
'name_2_lang': str,
'road_structure': str,
'length': str,
'length_uom': str,
'loop': str,
'primary_route': str,
'trunk_road': str,
'start_node': str,
'end_node': str,
'road_number_toid': str,
'road_name_toid': str,
'geometry': str
}

df_chunk = df_chunk.astype(dtype_mapping)

process_chunk(df_chunk, conn)
logger.info(f"Processed features {i-chunk_size+1} to {i}")

Expand All @@ -103,7 +135,34 @@ def load_geopackage_open_roads(url, conn, limit):
# Process any remaining features outside the loop
if features:
df_chunk = pd.DataFrame(features)

dtype_mapping = {
'id': str,
'fictitious': str,
'road_classification': str,
'road_function': str,
'form_of_way': str,
'road_classification_number': str,
'name_1': str,
'name_1_lang': str,
'name_2': str,
'name_2_lang': str,
'road_structure': str,
'length': str,
'length_uom': str,
'loop': str,
'primary_route': str,
'trunk_road': str,
'start_node': str,
'end_node': str,
'road_number_toid': str,
'road_name_toid': str,
'geometry': str
}

df_chunk = df_chunk.astype(dtype_mapping)
process_chunk(df_chunk, conn)

logger.info("Processed remaining features")
# Empty the list
features = []
Expand Down
2 changes: 1 addition & 1 deletion src/os_open_roads/process_into_motherduck.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def process_chunk(df, conn):
Dataframe
Connection object
"""
schema = "os_open_usrns"
schema = "os_open_roads"

if conn:
try:
Expand Down
16 changes: 8 additions & 8 deletions src/pipeline_launch_scripts/os_open_linked_ids_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ def main(batch_limit: int):
database = secrets["motherdb"]

# # 1
# conn_usrn_uprn = connect_to_motherduck(token, database)
conn_usrn_uprn = connect_to_motherduck(token, database)

# logger.success("OS USRN to URPN DATA STARTED")
# create_table(conn_usrn_uprn, schema="os_open_linked_identifiers", name="os_open_linked_identifiers_uprn_usrn_latest")
# url = fetch_redirect_url(url="https://api.os.uk/downloads/v1/products/LIDS/downloads?area=GB&format=CSV&fileName=lids-2024-12_csv_BLPU-UPRN-Street-USRN-11.zip&redirect")
# load_csv_data(url, conn_usrn_uprn, batch_limit, schema="os_open_linked_identifiers", name="os_open_linked_identifiers_uprn_usrn_latest")
# logger.success("OS USRN to UPRN DATA PROCESSED")
logger.success("OS USRN to URPN DATA STARTED")
create_table_1(conn_usrn_uprn, schema="os_open_linked_identifiers", name="os_open_linked_identifiers_uprn_usrn_latest")
url = fetch_redirect_url(url="https://api.os.uk/downloads/v1/products/LIDS/downloads?area=GB&format=CSV&fileName=lids-2024-12_csv_BLPU-UPRN-Street-USRN-11.zip&redirect")
load_csv_data(url, conn_usrn_uprn, batch_limit, schema="os_open_linked_identifiers", name="os_open_linked_identifiers_uprn_usrn_latest")
logger.success("OS USRN to UPRN DATA PROCESSED")

# if conn_usrn_uprn:
# conn_usrn_uprn.close()
if conn_usrn_uprn:
conn_usrn_uprn.close()

# 2
conn_road_toid_usrn = connect_to_motherduck(token, database)
Expand Down

0 comments on commit 07fff9a

Please sign in to comment.