From 3180396667fa0e3ee28a1d5aec967efa941aa4dd Mon Sep 17 00:00:00 2001 From: albertkun Date: Mon, 12 Feb 2024 11:14:42 -0800 Subject: [PATCH] refactor: WebSocket endpoint for vehicle positions --- fastapi/app/main.py | 53 +++++++-------------------------------------- 1 file changed, 8 insertions(+), 45 deletions(-) diff --git a/fastapi/app/main.py b/fastapi/app/main.py index 46fdb1c..4408112 100644 --- a/fastapi/app/main.py +++ b/fastapi/app/main.py @@ -510,12 +510,13 @@ async def get_trip_detail_by_vehicle(agency_id: AgencyIdEnum, vehicle_id: Option from shapely.geometry import mapping from geoalchemy2 import WKBElement + from .utils.gtfs_rt_swiftly import connect_to_swiftly, SWIFTLY_API_REALTIME, SWIFTLY_GTFS_RT_TRIP_UPDATES, SWIFTLY_GTFS_RT_VEHICLE_POSITIONS, SERVICE_DICT connected_clients = 0 @app.websocket("/ws/{agency_id}/vehicle_positions") -async def websocket_endpoint(websocket: WebSocket, agency_id: str): +async def websocket_vehicle_positions_endpoint(websocket: WebSocket, agency_id: str): global connected_clients connected_clients += 1 try: @@ -527,19 +528,16 @@ async def websocket_endpoint(websocket: WebSocket, agency_id: str): async def reader(channel: aioredis.client.PubSub): while True: try: - async with async_timeout.timeout(2.5): # Set a timeout of 2.5 seconds + async with async_timeout.timeout(1): message = await channel.get_message(ignore_subscribe_messages=True) if message is not None: if message["type"] == "message": try: item = json.loads(message['data']) - if item == 'ping': - await websocket.send_text('pong') - else: - await websocket.send_text(json.dumps(item)) + await websocket.send_text(json.dumps(item)) except Exception as e: await websocket.send_text(f"Error: {str(e)}") - await asyncio.sleep(1.5) + await asyncio.sleep(0.1) except asyncio.TimeoutError: pass @@ -561,14 +559,12 @@ async def publisher(): response_data = await connect_to_swiftly(service, SWIFTLY_GTFS_RT_VEHICLE_POSITIONS, Config.SWIFTLY_AUTH_KEY_BUS, Config.SWIFTLY_AUTH_KEY_RAIL) if response_data is not False: data = json.loads(response_data) - # Filter out items with no trip_ids - data = [item for item in data if item.get('trip_id')] # Store the result in Redis cache - await redis.set(cache_key, json.dumps(data), ex=3) # Set an expiration time of 3 seconds + await redis.set(cache_key, json.dumps(data), ex=5) # Set an expiration time of 5 seconds # Publish the data if data is not None: await redis.publish(f'vehicle_positions_{agency_id}', json.dumps(data)) - await asyncio.sleep(5) # Sleep for 3.6 seconds + await asyncio.sleep(3.6) # Sleep for 3.6 seconds except Exception as e: print(f"Error: {str(e)}") # Start the publisher and reader as separate tasks @@ -632,7 +628,7 @@ async def publisher(): if response_data is not False: data = json.loads(response_data) # Store the result in Redis cache - await redis.set(cache_key, json.dumps(data), ex=60) # Set an expiration time of 60 seconds + await redis.set(cache_key, json.dumps(data), ex=15) # Set an expiration time of 60 seconds # Publish the data if data is not None: await redis.publish(f'trip_updates_{agency_id}', json.dumps(data)) @@ -686,39 +682,6 @@ async def websocket_endpoint(websocket: WebSocket, agency_id: str, route_code: s -@app.websocket("/ws/{agency_id}/vehicle_positions") -async def websocket_endpoint(websocket: WebSocket, agency_id: str, async_db: AsyncSession = Depends(get_async_db)): - await websocket.accept() - - channel = (await crud.redis_connection.subscribe('live_vehicle_positions'))[0] - - async def listen_to_redis(): - async for message in channel.iter(encoding='utf-8'): - # Unserialize message with json before sending - message_data = json.loads(message) - await websocket.send_json(message_data) - - listen_task = asyncio.create_task(listen_to_redis()) - - try: - data = await asyncio.wait_for(crud.get_all_data_async(async_db, models.VehiclePositions, agency_id), timeout=120) - if data is not None: - # Serialize data with json before publishing - data_json = json.dumps(data) - await crud.redis_connection.publish('live_vehicle_positions', data_json) - except asyncio.TimeoutError: - raise HTTPException(status_code=408, detail="Request timed out") - except WebSocketDisconnect: - # Handle the WebSocket disconnect event - print("WebSocket disconnected") - finally: - listen_task.cancel() - crud.redis_connection.unsubscribe('live_vehicle_positions') - crud.redis_connection.close() - await crud.redis_connection.wait_closed() -from geojson import Feature, Point, FeatureCollection - - @app.websocket("/ws/{agency_id}/vehicle_positions/{field}/{ids}") async def websocket_vehicle_positions_by_ids(websocket: WebSocket, agency_id: AgencyIdEnum, field: VehiclePositionsFieldsEnum, ids: str, async_db: AsyncSession = Depends(get_async_db)): await websocket.accept()