Skip to content

Commit

Permalink
Merge pull request #480 from LACMTA:2023-api-optimization
Browse files Browse the repository at this point in the history
Refactor WebSocket endpoint for vehicle positions
  • Loading branch information
albertkun authored Feb 12, 2024
2 parents d05d592 + 3180396 commit 50d409b
Showing 1 changed file with 8 additions and 45 deletions.
53 changes: 8 additions & 45 deletions fastapi/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,12 +510,13 @@ async def get_trip_detail_by_vehicle(agency_id: AgencyIdEnum, vehicle_id: Option
from shapely.geometry import mapping
from geoalchemy2 import WKBElement


from .utils.gtfs_rt_swiftly import connect_to_swiftly, SWIFTLY_API_REALTIME, SWIFTLY_GTFS_RT_TRIP_UPDATES, SWIFTLY_GTFS_RT_VEHICLE_POSITIONS, SERVICE_DICT

connected_clients = 0

@app.websocket("/ws/{agency_id}/vehicle_positions")
async def websocket_endpoint(websocket: WebSocket, agency_id: str):
async def websocket_vehicle_positions_endpoint(websocket: WebSocket, agency_id: str):
global connected_clients
connected_clients += 1
try:
Expand All @@ -527,19 +528,16 @@ async def websocket_endpoint(websocket: WebSocket, agency_id: str):
async def reader(channel: aioredis.client.PubSub):
while True:
try:
async with async_timeout.timeout(2.5): # Set a timeout of 2.5 seconds
async with async_timeout.timeout(1):
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None:
if message["type"] == "message":
try:
item = json.loads(message['data'])
if item == 'ping':
await websocket.send_text('pong')
else:
await websocket.send_text(json.dumps(item))
await websocket.send_text(json.dumps(item))
except Exception as e:
await websocket.send_text(f"Error: {str(e)}")
await asyncio.sleep(1.5)
await asyncio.sleep(0.1)
except asyncio.TimeoutError:
pass

Expand All @@ -561,14 +559,12 @@ async def publisher():
response_data = await connect_to_swiftly(service, SWIFTLY_GTFS_RT_VEHICLE_POSITIONS, Config.SWIFTLY_AUTH_KEY_BUS, Config.SWIFTLY_AUTH_KEY_RAIL)
if response_data is not False:
data = json.loads(response_data)
# Filter out items with no trip_ids
data = [item for item in data if item.get('trip_id')]
# Store the result in Redis cache
await redis.set(cache_key, json.dumps(data), ex=3) # Set an expiration time of 3 seconds
await redis.set(cache_key, json.dumps(data), ex=5) # Set an expiration time of 5 seconds
# Publish the data
if data is not None:
await redis.publish(f'vehicle_positions_{agency_id}', json.dumps(data))
await asyncio.sleep(5) # Sleep for 3.6 seconds
await asyncio.sleep(3.6) # Sleep for 3.6 seconds
except Exception as e:
print(f"Error: {str(e)}")
# Start the publisher and reader as separate tasks
Expand Down Expand Up @@ -632,7 +628,7 @@ async def publisher():
if response_data is not False:
data = json.loads(response_data)
# Store the result in Redis cache
await redis.set(cache_key, json.dumps(data), ex=60) # Set an expiration time of 60 seconds
await redis.set(cache_key, json.dumps(data), ex=15) # Set an expiration time of 60 seconds
# Publish the data
if data is not None:
await redis.publish(f'trip_updates_{agency_id}', json.dumps(data))
Expand Down Expand Up @@ -686,39 +682,6 @@ async def websocket_endpoint(websocket: WebSocket, agency_id: str, route_code: s



@app.websocket("/ws/{agency_id}/vehicle_positions")
async def websocket_endpoint(websocket: WebSocket, agency_id: str, async_db: AsyncSession = Depends(get_async_db)):
await websocket.accept()

channel = (await crud.redis_connection.subscribe('live_vehicle_positions'))[0]

async def listen_to_redis():
async for message in channel.iter(encoding='utf-8'):
# Unserialize message with json before sending
message_data = json.loads(message)
await websocket.send_json(message_data)

listen_task = asyncio.create_task(listen_to_redis())

try:
data = await asyncio.wait_for(crud.get_all_data_async(async_db, models.VehiclePositions, agency_id), timeout=120)
if data is not None:
# Serialize data with json before publishing
data_json = json.dumps(data)
await crud.redis_connection.publish('live_vehicle_positions', data_json)
except asyncio.TimeoutError:
raise HTTPException(status_code=408, detail="Request timed out")
except WebSocketDisconnect:
# Handle the WebSocket disconnect event
print("WebSocket disconnected")
finally:
listen_task.cancel()
crud.redis_connection.unsubscribe('live_vehicle_positions')
crud.redis_connection.close()
await crud.redis_connection.wait_closed()
from geojson import Feature, Point, FeatureCollection


@app.websocket("/ws/{agency_id}/vehicle_positions/{field}/{ids}")
async def websocket_vehicle_positions_by_ids(websocket: WebSocket, agency_id: AgencyIdEnum, field: VehiclePositionsFieldsEnum, ids: str, async_db: AsyncSession = Depends(get_async_db)):
await websocket.accept()
Expand Down

0 comments on commit 50d409b

Please sign in to comment.