From 9bca0b953dce357506bb538bcf32b7225c084867 Mon Sep 17 00:00:00 2001 From: albertkun Date: Mon, 11 Dec 2023 12:57:13 -0800 Subject: [PATCH 1/3] fix: `route_id` as index doesnt work for `route_overview` because it is repeated refactor: get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async function --- fastapi/app/crud.py | 16 +++++++++++----- fastapi/app/main.py | 4 +++- fastapi/app/models.py | 4 ++-- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/fastapi/app/crud.py b/fastapi/app/crud.py index efce214..04ce1bc 100644 --- a/fastapi/app/crud.py +++ b/fastapi/app/crud.py @@ -129,16 +129,12 @@ def get_unique_keys(db: Session, model, agency_id, key_column=None): unique_keys = [row.__dict__ for row in this_data] return unique_keys - - - #### async def get_vehicle_data_async(db: AsyncSession, agency_id: str, vehicle_id: str): result = await db.execute(select(models.VehiclePositions).where(models.VehiclePositions.agency_id == agency_id,models.VehiclePositions.vehicle_id == vehicle_id)) data = result.scalars().one_or_none() return data -import pickle async def get_data_async(async_session: Session, model: Type[DeclarativeMeta], agency_id: str, field_name: Optional[str] = None, field_value: Optional[str] = None, cache_expiration: int = None): # Create a unique key for this query key = f"{model.__name__}:{agency_id}:{field_name}:{field_value}" @@ -481,7 +477,15 @@ async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code(session: AsyncSe async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(session,route_code: str, geojson:bool,agency_id:str): - the_query = await session.execute(select(models.VehiclePositions).where(models.VehiclePositions.route_code == route_code,models.VehiclePositions.agency_id == agency_id).order_by(models.VehiclePositions.route_code)) + cache_key = f'vehicle_positions_trip_data:{agency_id}:{route_code}:{geojson}' + if redis_connection is None: + initialize_redis() + cached_result = await redis_connection.get(cache_key) + if cached_result is not None: + yield pickle.loads(cached_result) + + the_query = session.execute(select(models.VehiclePositions).where(models.VehiclePositions.route_code == route_code,models.VehiclePositions.agency_id == agency_id).order_by(models.VehiclePositions.route_code)) + result = the_query.scalars().all() if geojson == True: this_json = {} count = 0 @@ -517,6 +521,7 @@ async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(sessio this_json['metadata'] = {'title': 'Vehicle Positions'} this_json['type'] = "FeatureCollection" this_json['features'] = features + await redis_connection.set(cache_key, pickle.dumps(this_json)) yield this_json else: result = [] @@ -543,6 +548,7 @@ async def get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(sessio message_object = [{'message': 'No vehicle data for this vehicle id: ' + str(route_code)}] yield message_object else: + await redis_connection.set(cache_key, pickle.dumps(result)) yield result def get_distinct_stop_ids(the_query): diff --git a/fastapi/app/main.py b/fastapi/app/main.py index caa41bf..84bad38 100644 --- a/fastapi/app/main.py +++ b/fastapi/app/main.py @@ -510,7 +510,9 @@ async def websocket_vehicle_positions_by_ids(websocket: WebSocket, agency_id: Ag @app.get("/{agency_id}/trip_detail/route_code/{route_code}",tags=["Real-Time data"]) async def get_trip_detail_by_route_code(agency_id: AgencyIdEnum, route_code: str, geojson:bool=False, db: AsyncSession = Depends(get_db)): - result = await crud.get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(session=db, route_code=route_code, geojson=geojson, agency_id=agency_id.value) + result = [] + async for item in crud.get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async(session=db, route_code=route_code, geojson=geojson, agency_id=agency_id.value): + result.append(item) return result @app.get("/{agency_id}/trip_detail/vehicle/{vehicle_id?}", tags=["Real-Time data"]) diff --git a/fastapi/app/models.py b/fastapi/app/models.py index 89de957..3864145 100644 --- a/fastapi/app/models.py +++ b/fastapi/app/models.py @@ -102,8 +102,8 @@ class Routes(Base): class RouteOverview(Base): __tablename__ = "route_overview" - route_id = Column(Integer, primary_key=True, index=True) - route_code = Column(String) + route_id = Column(Integer) + route_code = Column(String,primary_key=True, index=True) route_code_padded= Column(Integer) route_short_name = Column(String) route_long_name = Column(String) From c0e50bbe270f425e3a1dcd8608eae558379e96fa Mon Sep 17 00:00:00 2001 From: albertkun Date: Mon, 11 Dec 2023 12:58:00 -0800 Subject: [PATCH 2/3] fix: `route_id` as index doesnt work for `route_overview` because it is repeated refactor: get_gtfs_rt_vehicle_positions_trip_data_by_route_code_for_async function --- fastapi/tests/load_testing.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/fastapi/tests/load_testing.py b/fastapi/tests/load_testing.py index 14e7f1d..fed8f1b 100644 --- a/fastapi/tests/load_testing.py +++ b/fastapi/tests/load_testing.py @@ -33,17 +33,17 @@ def load_vehicle_positions_bus(self): def load_vehicle_positions_rail(self): self.client.get("/LACMTA_Rail/vehicle_positions?format=geojson") - @task - def load_trip_updates_for_801(self): - self.client.get("/LACMTA_Rail/trip_updates/route_id/801") + # @task + # def load_trip_updates_for_801(self): + # self.client.get("/LACMTA_Rail/trip_updates/route_id/801") - @task - def load_trip_updates_all_bus(self): - self.client.get("/LACMTA/trip_updates") + # @task + # def load_trip_updates_all_bus(self): + # self.client.get("/LACMTA/trip_updates") - @task - def load_trip_updates_all_rail(self): - self.client.get("/LACMTA_Rail/trip_updates") + # @task + # def load_trip_updates_all_rail(self): + # self.client.get("/LACMTA_Rail/trip_updates") # @task # def load_agency(self): From 9005c8c8fe804e61c1813a74535b90cb70b2a163 Mon Sep 17 00:00:00 2001 From: albertkun Date: Mon, 18 Dec 2023 09:49:53 -0800 Subject: [PATCH 3/3] refactor: added more failure attempts --- data-loading-service/app/main.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/data-loading-service/app/main.py b/data-loading-service/app/main.py index e149036..de523c2 100644 --- a/data-loading-service/app/main.py +++ b/data-loading-service/app/main.py @@ -12,14 +12,15 @@ lock = threading.Lock() -def retry_on_failure(task, retries=3, delay=15): +def retry_on_failure(task, retries=5, delay=15): for i in range(retries): try: task() - break + return # If the task succeeds, return immediately except Exception as e: print(f'Error on attempt {i+1}: {str(e)}') time.sleep(delay) + raise Exception('Task failed after all retries') # If all retries fail, raise an exception @crython.job(second='*/15') def gtfs_rt_scheduler(): @@ -41,17 +42,17 @@ def canceled_trips_update_scheduler(): except Exception as e: print('Error updating canceled trips: ' + str(e)) -@crython.job(expr='@weekly') -def calendar_dates_update_scheduler(): - try: - gtfs_static_helper.update_calendar_dates() - except Exception as e: - print('Error updating calendar dates: ' + str(e)) +# @crython.job(expr='@weekly') +# def calendar_dates_update_scheduler(): +# try: +# gtfs_static_helper.update_calendar_dates() +# except Exception as e: +# print('Error updating calendar dates: ' + str(e)) def initial_load(): gopass_helper.update_go_pass_data() update_canceled_trips.run_update() - # gtfs_rt_helper.update_gtfs_realtime_data() + gtfs_rt_helper.update_gtfs_realtime_data() gtfs_static_helper.update_calendar_dates()