Skip to content

Commit

Permalink
Refactor to move MQ logic to IrisConnector class
Browse files Browse the repository at this point in the history
Update to support streaming MQ connections
  • Loading branch information
NeonDaniel committed Jan 21, 2025
1 parent eff769b commit 99ba1c6
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 20 deletions.
24 changes: 6 additions & 18 deletions neon_iris/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,9 @@
from uuid import uuid4
from ovos_bus_client.message import Message
from ovos_utils.json_helper import merge_dict
from pika.exceptions import StreamLostError
from neon_iris.mq_connector import IrisConnector
from neon_utils.configuration_utils import get_neon_user_config
from neon_utils.metrics_utils import Stopwatch
from neon_mq_connector.utils.client_utils import NeonMQHandler
from neon_utils.socket_utils import b64_to_dict
from neon_utils.file_utils import decode_base64_string_to_file, \
encode_file_to_base64_string
Expand Down Expand Up @@ -107,21 +106,10 @@ def user_config(self) -> dict:
return json.loads(json.dumps(self._user_config.content))

@property
def connection(self) -> NeonMQHandler:
def connection(self) -> IrisConnector:
"""
Returns a connected NeonMQHandler object
"""
if not self._connection.connection.is_open:
LOG.warning("Connection closed")
self._connection.stop()
self._connection = self._init_mq_connection()
try:
self._connection.connection.channel()
except StreamLostError:
LOG.warning("Connection unexpectedly closed, recreating")
self._connection.stop()
self._connection = self._init_mq_connection()

return self._connection

def shutdown(self):
Expand Down Expand Up @@ -382,17 +370,17 @@ def _send_serialized_message(self, serialized: dict):

def _init_mq_connection(self):
mq_config = self._config.get("MQ") or self._config
NeonMQHandler.async_consumers_enabled = mq_config.get("async_consumers",
True)
mq_connection = NeonMQHandler(mq_config, "mq_handler", self._vhost)
mq_connection = IrisConnector(vhost=self._vhost, config=mq_config,
service_name="mq_handler")
mq_connection.register_consumer("neon_response_handler", self._vhost,
self.uid, self.handle_neon_response,
auto_ack=False)
mq_connection.register_consumer("neon_error_handler", self._vhost,
"neon_chat_api_error",
self.handle_neon_error,
auto_ack=False)
mq_connection.run(daemonize_consumers=True)
mq_connection.start()
mq_connection.wait_for_connection()
return mq_connection


Expand Down
130 changes: 130 additions & 0 deletions neon_iris/mq_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# NEON AI (TM) SOFTWARE, Software Development Kit & Application Development System
# All trademark and other rights reserved by their respective owners
# Copyright 2008-2021 Neongecko.com Inc.
# BSD-3
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# 3. Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from this
# software without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
# OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from time import sleep

import pika.exceptions

from asyncio import Event as AsyncEvent
from threading import Event, Thread
from neon_mq_connector.utils.client_utils import MQConnector
from ovos_utils import LOG
from pika.adapters.select_connection import SelectConnection
from pika.channel import Channel


class IrisConnector(MQConnector, Thread):
async_consumers_enabled = True

def __init__(self, *args, **kwargs):
vhost = kwargs.pop('vhost')
Thread.__init__(self, daemon=True)
MQConnector.__init__(self, *args, **kwargs)
self.vhost = vhost
self._ready = AsyncEvent()
self._channel_closed = Event()
self._stopping = False

self._connection = self.init_connection()

def wait_for_connection(self):
LOG.info("Waiting for connection")
while not self._ready.is_set():
sleep(0.5)
LOG.info("Connected!")

@property
def connection(self) -> SelectConnection:
if self._connection is None:
self._connection = self.init_connection()
return self._connection

def run(self, *args, **kwargs):
MQConnector.run(self, daemonize_consumers=True)
self._connection.ioloop.start()

def init_connection(self) -> SelectConnection:
return SelectConnection(
parameters=self.get_connection_params(self.vhost),
on_open_callback=self.on_connected,
on_open_error_callback=self.on_connection_fail,
on_close_callback=self.on_close)

def on_connected(self, _):
"""Called when we are fully connected to RabbitMQ"""
LOG.info("MQ Connected")
self.connection.channel(on_open_callback=self.on_channel_open)

def on_connection_fail(self, *_, **__):
""" Called when connection to RabbitMQ fails"""
LOG.error(f"Failed to connect to MQ")
self._connection = None

def on_channel_open(self, new_channel: Channel):
"""Called when our channel has opened"""
LOG.info(f"MQ Channel opened.")
new_channel.add_on_close_callback(self.on_channel_close)
self._ready.set()

def on_channel_close(self, *_, **__):
LOG.info(f"Channel closed.")
self._channel_closed.set()

def on_close(self, _, e):
if isinstance(e, pika.exceptions.ConnectionClosed):
LOG.info(f"Connection closed normally: {e}")
elif isinstance(e, pika.exceptions.StreamLostError):
LOG.warning("MQ connection lost; "
"RabbitMQ is likely temporarily unavailable.")
else:
LOG.error(f"MQ connection closed due to exception: {e}")
if not self._stopping:
# Connection was gracefully closed by the server. Try to re-connect
LOG.info(f"Trying to reconnect after server closed connection")
self._connection = self.init_connection()

def shutdown(self):
try:
self._stopping = True
if self.connection and not (self.connection.is_closed or
self.connection.is_closing):
self.connection.close()
LOG.info(f"Waiting for channel close")
if not self._channel_closed.wait(15):
raise TimeoutError(f"Timeout waiting for channel close.")

# Wait for the connection to close
waiter = Event()
while not self.connection.is_closed:
waiter.wait(1)
LOG.info(f"Connection closed")

if self.connection:
self.connection.ioloop.stop()
# self.connection = None

except Exception as e:
LOG.error(f"Failed to close connection: {e}")
5 changes: 3 additions & 2 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
click~=8.0
click-default-group~=1.2
neon-utils~=1.0,>=1.11.1a7
neon-utils~=1.12
pyyaml>=5.4,<7.0.0
neon-mq-connector~=0.7,>=0.7.2a11
#neon-mq-connector~=0.7,>=0.7.2a11
neon-mq-connector@git+https://github.com/neongeckocom/neon_mq_connector@FEAT_SupportSelectConnections
ovos-bus-client~=0.0,>=0.0.3,<0.2.0
ovos-config~=0.1,<0.2.0

0 comments on commit 99ba1c6

Please sign in to comment.