Skip to content

Commit

Permalink
Fix: reopen the API MQ channels if they are closed (#442)
Browse files Browse the repository at this point in the history
Problem: the MQ channels can close for a variety of issues.
Once closed, the channel cannot be used anymore.

Solution: detect if the channel is closed and reopen it if needed.
  • Loading branch information
odesenfans authored May 16, 2023
1 parent f16abaa commit 64c97e2
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 7 deletions.
41 changes: 36 additions & 5 deletions src/aleph/web/controllers/app_state_getters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This module provides an abstraction layer over the dictionary keys used to
address these objects.
"""

import logging
from typing import Optional, cast, TypeVar

import aio_pika.abc
Expand Down Expand Up @@ -50,12 +50,43 @@ def get_mq_conn_from_request(request: web.Request) -> aio_pika.abc.AbstractConne
return cast(aio_pika.abc.AbstractConnection, request.app[APP_STATE_MQ_CONN])


def get_mq_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel:
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_CHANNEL])
async def _get_open_channel(
request: web.Request, channel_name: str, logger: logging.Logger
) -> aio_pika.abc.AbstractChannel:
channel = cast(aio_pika.abc.AbstractChannel, request.app[channel_name])
if channel.is_closed:
# This should not happen, but does happen in practice because of RabbitMQ
# RPC timeouts. We need to figure out where this timeout comes from,
# but reopening the channel is mandatory to keep the endpoints using the MQ
# functional.
logger.error("%s channel is closed, reopening it", channel_name)
await channel.reopen()

return channel


async def get_mq_channel_from_request(
request: web.Request, logger: logging.Logger
) -> aio_pika.abc.AbstractChannel:
"""
Gets the MQ channel from the app state and reopens it if needed.
"""

return await _get_open_channel(
request=request, channel_name=APP_STATE_MQ_CHANNEL, logger=logger
)


async def get_mq_ws_channel_from_request(
request: web.Request, logger: logging.Logger
) -> aio_pika.abc.AbstractChannel:
"""
Gets the websocket MQ channel from the app state and reopens it if needed.
"""

def get_mq_ws_channel_from_request(request: web.Request) -> aio_pika.abc.AbstractChannel:
return cast(aio_pika.abc.AbstractChannel, request.app[APP_STATE_MQ_WS_CHANNEL])
return await _get_open_channel(
request=request, channel_name=APP_STATE_MQ_WS_CHANNEL, logger=logger
)


def get_node_cache_from_request(request: web.Request) -> NodeCache:
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/web/controllers/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ async def messages_ws(request: web.Request) -> web.WebSocketResponse:

config = get_config_from_request(request)
session_factory = get_session_factory_from_request(request)
mq_channel = get_mq_ws_channel_from_request(request)
mq_channel = await get_mq_ws_channel_from_request(request=request, logger=LOGGER)

try:
query_params = WsMessageQueryParams.parse_obj(request.query)
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/web/controllers/p2p.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def pub_message(request: web.Request):
config = get_config_from_request(request)

if request_data.sync:
mq_channel = get_mq_channel_from_request(request)
mq_channel = await get_mq_channel_from_request(request=request, logger=LOGGER)
mq_queue = await mq_make_aleph_message_topic_queue(
channel=mq_channel,
config=config,
Expand Down

0 comments on commit 64c97e2

Please sign in to comment.