Skip to content

Commit

Permalink
Merge pull request #430 from LACMTA:2023-api-optimization
Browse files Browse the repository at this point in the history
fix: `route_id` as index doesnt work for `route_overview` because it is repeated
  • Loading branch information
albertkun authored Dec 18, 2023
2 parents 589aabb + 9005c8c commit be63a7c
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 26 deletions.
19 changes: 10 additions & 9 deletions data-loading-service/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@

lock = threading.Lock()

def retry_on_failure(task, retries=3, delay=15):
def retry_on_failure(task, retries=5, delay=15):
for i in range(retries):
try:
task()
break
return # If the task succeeds, return immediately
except Exception as e:
print(f'Error on attempt {i+1}: {str(e)}')
time.sleep(delay)
raise Exception('Task failed after all retries') # If all retries fail, raise an exception

@crython.job(second='*/15')
def gtfs_rt_scheduler():
Expand All @@ -41,17 +42,17 @@ def canceled_trips_update_scheduler():
except Exception as e:
print('Error updating canceled trips: ' + str(e))

@crython.job(expr='@weekly')
def calendar_dates_update_scheduler():
try:
gtfs_static_helper.update_calendar_dates()
except Exception as e:
print('Error updating calendar dates: ' + str(e))
# @crython.job(expr='@weekly')
# def calendar_dates_update_scheduler():
# try:
# gtfs_static_helper.update_calendar_dates()
# except Exception as e:
# print('Error updating calendar dates: ' + str(e))

def initial_load():
gopass_helper.update_go_pass_data()
update_canceled_trips.run_update()
# gtfs_rt_helper.update_gtfs_realtime_data()
gtfs_rt_helper.update_gtfs_realtime_data()
gtfs_static_helper.update_calendar_dates()


Expand Down
16 changes: 11 additions & 5 deletions fastapi/app/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,16 +129,12 @@ def get_unique_keys(db: Session, model, agency_id, key_column=None):
unique_keys = [row.__dict__ for row in this_data]
return unique_keys




####
async def get_vehicle_data_async(db: AsyncSession, agency_id: str, vehicle_id: str):
result = await db.execute(select(models.VehiclePositions).where(models.VehiclePositions.agency_id == agency_id,models.VehiclePositions.vehicle_id == vehicle_id))
data = result.scalars().one_or_none()
return data

import pickle
async def get_data_async(async_session: Session, model: Type[DeclarativeMeta], agency_id: str, field_name: Optional[str] = None, field_value: Optional[str] = None, cache_expiration: int = None):
# Create a unique key for this query
key = f"{model.__name__}:{agency_id}:{field_name}:{field_value}"
Expand Down Expand Up @@ -481,7 +477,15 @@ async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code(session: AsyncSe


async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(session,route_code: str, geojson:bool,agency_id:str):
the_query = await session.execute(select(models.VehiclePositions).where(models.VehiclePositions.route_code == route_code,models.VehiclePositions.agency_id == agency_id).order_by(models.VehiclePositions.route_code))
cache_key = f'vehicle_positions_trip_data:{agency_id}:{route_code}:{geojson}'
if redis_connection is None:
initialize_redis()
cached_result = await redis_connection.get(cache_key)
if cached_result is not None:
yield pickle.loads(cached_result)

the_query = session.execute(select(models.VehiclePositions).where(models.VehiclePositions.route_code == route_code,models.VehiclePositions.agency_id == agency_id).order_by(models.VehiclePositions.route_code))
result = the_query.scalars().all()
if geojson == True:
this_json = {}
count = 0
Expand Down Expand Up @@ -517,6 +521,7 @@ async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(sessio
this_json['metadata'] = {'title': 'Vehicle Positions'}
this_json['type'] = "FeatureCollection"
this_json['features'] = features
await redis_connection.set(cache_key, pickle.dumps(this_json))
yield this_json
else:
result = []
Expand All @@ -543,6 +548,7 @@ async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(sessio
message_object = [{'message': 'No vehicle data for this vehicle id: ' + str(route_code)}]
yield message_object
else:
await redis_connection.set(cache_key, pickle.dumps(result))
yield result

def get_distinct_stop_ids(the_query):
Expand Down
4 changes: 3 additions & 1 deletion fastapi/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,9 @@ async def websocket_vehicle_positions_by_ids(websocket: WebSocket, agency_id: Ag

@app.get("/{agency_id}/trip_detail/route_code/{route_code}",tags=["Real-Time data"])
async def get_trip_detail_by_route_code(agency_id: AgencyIdEnum, route_code: str, geojson:bool=False, db: AsyncSession = Depends(get_db)):
result = await crud.get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(session=db, route_code=route_code, geojson=geojson, agency_id=agency_id.value)
result = []
async for item in crud.get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(session=db, route_code=route_code, geojson=geojson, agency_id=agency_id.value):
result.append(item)
return result

@app.get("/{agency_id}/trip_detail/vehicle/{vehicle_id?}", tags=["Real-Time data"])
Expand Down
4 changes: 2 additions & 2 deletions fastapi/app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ class Routes(Base):

class RouteOverview(Base):
__tablename__ = "route_overview"
route_id = Column(Integer, primary_key=True, index=True)
route_code = Column(String)
route_id = Column(Integer)
route_code = Column(String,primary_key=True, index=True)
route_code_padded= Column(Integer)
route_short_name = Column(String)
route_long_name = Column(String)
Expand Down
18 changes: 9 additions & 9 deletions fastapi/tests/load_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ def load_vehicle_positions_bus(self):
def load_vehicle_positions_rail(self):
self.client.get("/LACMTA_Rail/vehicle_positions?format=geojson")

@task
def load_trip_updates_for_801(self):
self.client.get("/LACMTA_Rail/trip_updates/route_id/801")
# @task
# def load_trip_updates_for_801(self):
# self.client.get("/LACMTA_Rail/trip_updates/route_id/801")

@task
def load_trip_updates_all_bus(self):
self.client.get("/LACMTA/trip_updates")
# @task
# def load_trip_updates_all_bus(self):
# self.client.get("/LACMTA/trip_updates")

@task
def load_trip_updates_all_rail(self):
self.client.get("/LACMTA_Rail/trip_updates")
# @task
# def load_trip_updates_all_rail(self):
# self.client.get("/LACMTA_Rail/trip_updates")

# @task
# def load_agency(self):
Expand Down

0 comments on commit be63a7c

Please sign in to comment.