diff --git a/data-loading-service/app/utils/gtfs_rt_helper.py b/data-loading-service/app/utils/gtfs_rt_helper.py index ea85b25..9074c0f 100644 --- a/data-loading-service/app/utils/gtfs_rt_helper.py +++ b/data-loading-service/app/utils/gtfs_rt_helper.py @@ -6,9 +6,8 @@ import datetime from multiprocessing.resource_sharer import stop import time - - -from sqlalchemy.exc import ProgrammingError +# from ..gtfs_rt import * +# from ..models import * import json import requests @@ -20,13 +19,14 @@ from soupsieve import match from sqlalchemy.orm import Session,sessionmaker from sqlalchemy import create_engine, inspect - +# from sqlalchemy.dialects.postgresql import ARRAY,JSON from models.gtfs_rt import * from config import Config from utils.gtfs_realtime_pb2 import FeedMessage from .database_connector import * +# from ..schemas import TripUpdates, StopTimeUpdates,VehiclePositions from datetime import datetime @@ -41,6 +41,13 @@ SWIFTLY_GTFS_RT_TRIP_UPDATES = 'gtfs-rt-trip-updates' SWIFTLY_GTFS_RT_VEHICLE_POSITIONS = 'gtfs-rt-vehicle-positions' +# engine = create_engine(Config.API_DB_URI, echo=False,executemany_mode="values") + +# Session = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +# insp = inspect(engine) +# session = Session() + SERVICE_DICT = { 'LACMTA': 'lametro', 'LACMTA_Rail': 'lametro-rail' @@ -207,33 +214,24 @@ def update_gtfs_realtime_data(): # logging('vehicle_position_updates Data Frame: ' + str(vehicle_position_updates)) combined_trip_update_df = pd.concat(combined_trip_update_dataframes) combined_stop_time_df = pd.concat(combined_stop_time_dataframes) - combined_trip_update_df = combined_trip_update_df[combined_trip_update_df['trip_id'].notnull()] - combined_stop_time_df = combined_stop_time_df[combined_stop_time_df['trip_id'].notnull()] combined_vehicle_position_df = gpd.GeoDataFrame(pd.concat(combined_vehicle_position_dataframes, ignore_index=True), geometry='geometry') - combined_vehicle_position_df = combined_vehicle_position_df[combined_vehicle_position_df['trip_id'].notnull()] combined_vehicle_position_df.crs = 'EPSG:4326' combined_vehicle_position_df.to_postgis('vehicle_position_updates',engine,index=True,if_exists="replace",schema=Config.TARGET_DB_SCHEMA) combined_stop_time_df.to_sql('stop_time_updates',engine,index=True,if_exists="replace",schema=Config.TARGET_DB_SCHEMA) combined_trip_update_df['stop_time_json'].astype(str) combined_trip_update_df.to_sql('trip_updates',engine,index=True,if_exists="replace",schema=Config.TARGET_DB_SCHEMA) - index_creation_statements = [ - "CREATE INDEX idx_vehicle_position_updates_route_code ON vehicle_position_updates(route_code);", - "CREATE INDEX idx_vehicle_position_updates_vehicle_id ON vehicle_position_updates(vehicle_id);", - "CREATE INDEX idx_vehicle_position_updates_trip_route_id ON vehicle_position_updates(trip_route_id);", - "CREATE INDEX idx_vehicle_position_updates_stop_id ON vehicle_position_updates(stop_id);", - "CREATE INDEX idx_stop_time_updates_trip_id ON stop_time_updates(trip_id);", - "CREATE INDEX idx_stop_time_updates_route_id ON stop_time_updates(route_id);", - "CREATE INDEX idx_trip_updates_trip_id ON trip_updates(trip_id);", - "CREATE INDEX idx_trip_updates_route_id ON trip_updates(route_id);" - ] # Create the indexes with engine.connect() as connection: - for statement in index_creation_statements: - try: - connection.execute(text(statement)) - except ProgrammingError as e: - print(f"Error creating index: {e}") + connection.execute(text("CREATE INDEX idx_vehicle_position_updates_route_code ON vehicle_position_updates(route_code);")) + connection.execute(text("CREATE INDEX idx_vehicle_position_updates_vehicle_id ON vehicle_position_updates(vehicle_id);")) + connection.execute(text("CREATE INDEX idx_vehicle_position_updates_trip_route_id ON vehicle_position_updates(trip_route_id);")) + connection.execute(text("CREATE INDEX idx_vehicle_position_updates_stop_id ON vehicle_position_updates(stop_id);")) + connection.execute(text("CREATE INDEX idx_stop_time_updates_trip_id ON stop_time_updates(trip_id);")) + connection.execute(text("CREATE INDEX idx_stop_time_updates_route_id ON stop_time_updates(route_id);")) + connection.execute(text("CREATE INDEX idx_trip_updates_trip_id ON trip_updates(trip_id);")) + connection.execute(text("CREATE INDEX idx_trip_updates_route_id ON trip_updates(route_id);")) + process_end = timeit.default_timer() print('===GTFS Update process took {} seconds'.format(process_end - process_start)+"===")