Skip to content

Commit

Permalink
Merge pull request #475 from LACMTA:2023-api-optimization
Browse files Browse the repository at this point in the history
Refactor GTFS-RT data loading and indexing
  • Loading branch information
albertkun authored Feb 2, 2024
2 parents 8ec3875 + 6b2d3c7 commit b166cf8
Showing 1 changed file with 20 additions and 22 deletions.
42 changes: 20 additions & 22 deletions data-loading-service/app/utils/gtfs_rt_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
import datetime
from multiprocessing.resource_sharer import stop
import time


from sqlalchemy.exc import ProgrammingError
# from ..gtfs_rt import *
# from ..models import *

import json
import requests
Expand All @@ -20,13 +19,14 @@
from soupsieve import match
from sqlalchemy.orm import Session,sessionmaker
from sqlalchemy import create_engine, inspect

# from sqlalchemy.dialects.postgresql import ARRAY,JSON
from models.gtfs_rt import *
from config import Config

from utils.gtfs_realtime_pb2 import FeedMessage
from .database_connector import *

# from ..schemas import TripUpdates, StopTimeUpdates,VehiclePositions
from datetime import datetime


Expand All @@ -41,6 +41,13 @@
SWIFTLY_GTFS_RT_TRIP_UPDATES = 'gtfs-rt-trip-updates'
SWIFTLY_GTFS_RT_VEHICLE_POSITIONS = 'gtfs-rt-vehicle-positions'

# engine = create_engine(Config.API_DB_URI, echo=False,executemany_mode="values")

# Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# insp = inspect(engine)
# session = Session()

SERVICE_DICT = {
'LACMTA': 'lametro',
'LACMTA_Rail': 'lametro-rail'
Expand Down Expand Up @@ -207,33 +214,24 @@ def update_gtfs_realtime_data():
# logging('vehicle_position_updates Data Frame: ' + str(vehicle_position_updates))
combined_trip_update_df = pd.concat(combined_trip_update_dataframes)
combined_stop_time_df = pd.concat(combined_stop_time_dataframes)
combined_trip_update_df = combined_trip_update_df[combined_trip_update_df['trip_id'].notnull()]
combined_stop_time_df = combined_stop_time_df[combined_stop_time_df['trip_id'].notnull()]
combined_vehicle_position_df = gpd.GeoDataFrame(pd.concat(combined_vehicle_position_dataframes, ignore_index=True), geometry='geometry')
combined_vehicle_position_df = combined_vehicle_position_df[combined_vehicle_position_df['trip_id'].notnull()]
combined_vehicle_position_df.crs = 'EPSG:4326'
combined_vehicle_position_df.to_postgis('vehicle_position_updates',engine,index=True,if_exists="replace",schema=Config.TARGET_DB_SCHEMA)
combined_stop_time_df.to_sql('stop_time_updates',engine,index=True,if_exists="replace",schema=Config.TARGET_DB_SCHEMA)
combined_trip_update_df['stop_time_json'].astype(str)
combined_trip_update_df.to_sql('trip_updates',engine,index=True,if_exists="replace",schema=Config.TARGET_DB_SCHEMA)
index_creation_statements = [
"CREATE INDEX idx_vehicle_position_updates_route_code ON vehicle_position_updates(route_code);",
"CREATE INDEX idx_vehicle_position_updates_vehicle_id ON vehicle_position_updates(vehicle_id);",
"CREATE INDEX idx_vehicle_position_updates_trip_route_id ON vehicle_position_updates(trip_route_id);",
"CREATE INDEX idx_vehicle_position_updates_stop_id ON vehicle_position_updates(stop_id);",
"CREATE INDEX idx_stop_time_updates_trip_id ON stop_time_updates(trip_id);",
"CREATE INDEX idx_stop_time_updates_route_id ON stop_time_updates(route_id);",
"CREATE INDEX idx_trip_updates_trip_id ON trip_updates(trip_id);",
"CREATE INDEX idx_trip_updates_route_id ON trip_updates(route_id);"
]

# Create the indexes
with engine.connect() as connection:
for statement in index_creation_statements:
try:
connection.execute(text(statement))
except ProgrammingError as e:
print(f"Error creating index: {e}")
connection.execute(text("CREATE INDEX idx_vehicle_position_updates_route_code ON vehicle_position_updates(route_code);"))
connection.execute(text("CREATE INDEX idx_vehicle_position_updates_vehicle_id ON vehicle_position_updates(vehicle_id);"))
connection.execute(text("CREATE INDEX idx_vehicle_position_updates_trip_route_id ON vehicle_position_updates(trip_route_id);"))
connection.execute(text("CREATE INDEX idx_vehicle_position_updates_stop_id ON vehicle_position_updates(stop_id);"))
connection.execute(text("CREATE INDEX idx_stop_time_updates_trip_id ON stop_time_updates(trip_id);"))
connection.execute(text("CREATE INDEX idx_stop_time_updates_route_id ON stop_time_updates(route_id);"))
connection.execute(text("CREATE INDEX idx_trip_updates_trip_id ON trip_updates(trip_id);"))
connection.execute(text("CREATE INDEX idx_trip_updates_route_id ON trip_updates(route_id);"))

process_end = timeit.default_timer()
print('===GTFS Update process took {} seconds'.format(process_end - process_start)+"===")

Expand Down

0 comments on commit b166cf8

Please sign in to comment.