Skip to content

Commit

Permalink
Merge pull request #20 from olxbr/feature/connection-fail
Browse files Browse the repository at this point in the history
Add on_connection_fail callback
  • Loading branch information
dmvieira authored Mar 18, 2020
2 parents f50984b + 98549b0 commit 1392d3c
Show file tree
Hide file tree
Showing 16 changed files with 141 additions and 5 deletions.
4 changes: 3 additions & 1 deletion barterdude/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ async def process_message(message: RabbitMQMessage):
type=RouteTypes.AMQP_RABBITMQ,
options={
Options.BULK_SIZE: coroutines,
Options.BULK_FLUSH_INTERVAL: bulk_flush_interval
Options.BULK_FLUSH_INTERVAL: bulk_flush_interval,
Options.CONNECTION_FAIL_CALLBACK:
monitor.dispatch_on_connection_fail,
}
)
async def wrapper(messages: RabbitMQMessage):
Expand Down
7 changes: 7 additions & 0 deletions barterdude/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ async def on_fail(self, message: RabbitMQMessage, error: Exception):
async def before_consume(self, message: RabbitMQMessage):
'''Called before consuming the message'''

@abstractmethod
async def on_connection_fail(self, error: Exception, retries: int):
'''Called when the consumer fails to connect to the broker'''


class HttpHook(BaseHook):
def __init__(self, barterdude: BarterDude, path: str):
Expand All @@ -37,3 +41,6 @@ async def on_fail(self, message: RabbitMQMessage, error: Exception):

async def before_consume(self, message: RabbitMQMessage):
raise NotImplementedError

async def on_connection_fail(self, error: Exception, retries: int):
raise NotImplementedError
16 changes: 15 additions & 1 deletion barterdude/hooks/healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ def __init__(
barterdude: BarterDude,
path: str = "/healthcheck",
success_rate: float = 0.95,
health_window: float = 60.0 # seconds
health_window: float = 60.0, # seconds
max_connection_fails: int = 3
):
self.__success_rate = success_rate
self.__health_window = health_window
self.__success = deque()
self.__fail = deque()
self.__force_fail = False
self.__connection_fails = 0
self.__max_connection_fails = max_connection_fails
super(Healthcheck, self).__init__(barterdude, path)

def force_fail(self):
Expand All @@ -48,12 +51,23 @@ async def on_success(self, message: RabbitMQMessage):
async def on_fail(self, message: RabbitMQMessage, error: Exception):
self.__fail.append(time())

async def on_connection_fail(self, error: Exception, retries: int):
self.__connection_fails = retries

async def __call__(self, req: web.Request):
if self.__force_fail:
return _response(500, {
"message": "Healthcheck fail called manually"
})

if self.__connection_fails >= self.__max_connection_fails:
return _response(500, {
"message": (
"Reached max connection fails "
f"({self.__max_connection_fails})"
)
})

old_timestamp = time() - self.__health_window
success = _remove_old(self.__success, old_timestamp)
fail = _remove_old(self.__fail, old_timestamp)
Expand Down
8 changes: 8 additions & 0 deletions barterdude/hooks/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,11 @@ async def on_fail(self, message: RabbitMQMessage, error: Exception):
"exception": repr(error),
"traceback": format_tb(error.__traceback__),
})

async def on_connection_fail(self, error: Exception, retries: int):
logger.error({
"message": "Failed to connect to the broker",
"retries": retries,
"exception": repr(error),
"traceback": format_tb(error.__traceback__),
})
5 changes: 5 additions & 0 deletions barterdude/hooks/metrics/prometheus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ async def on_success(self, message: RabbitMQMessage):
async def on_fail(self, message: RabbitMQMessage, error: Exception):
await self._on_complete(message, self.__definitions.FAIL, error)

async def on_connection_fail(self, error: Exception, retries: int):
self.metrics[self.__definitions.CONNECTION_FAIL].labels(
**self.__labels
).inc()

async def __call__(self, req: web.Request):
return web.Response(
content_type=CONTENT_TYPE_LATEST.split(";")[0],
Expand Down
20 changes: 20 additions & 0 deletions barterdude/hooks/metrics/prometheus/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
class Definitions:

MESSAGE_UNITS = "messages"
ERROR_UNITS = "errors"
TIME_UNITS = "seconds"
NAMESPACE = "barterdude"
BEFORE_CONSUME = "before_consume"
SUCCESS = "success"
FAIL = "fail"
TIME_MEASURE = "time_measure"
CONNECTION_FAIL = "connection_fail"

def __init__(
self,
Expand Down Expand Up @@ -51,6 +53,11 @@ def save_metrics(self):
namespace=self.NAMESPACE,
unit=self.TIME_UNITS,
)
self._prepare_on_connection_fail(
self.CONNECTION_FAIL,
namespace=self.NAMESPACE,
unit=self.ERROR_UNITS,
)

def _prepare_before_consume(
self, name: str, namespace: str = "", unit: str = ""):
Expand Down Expand Up @@ -90,3 +97,16 @@ def _prepare_time_measure(
unit=unit,
registry=self.__registry,
)

def _prepare_on_connection_fail(
self, state: str, namespace: str, unit: str):

self.__metrics[state] = Counter(
name=f"connection_fail",
documentation=("Number of times barterdude failed "
"to connect to the AMQP broker"),
labelnames=self.__labelkeys,
namespace=namespace,
unit=unit,
registry=self.__registry
)
7 changes: 7 additions & 0 deletions barterdude/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,10 @@ async def dispatch_on_success(self, message: RabbitMQMessage):
async def dispatch_on_fail(self, message: RabbitMQMessage,
error: Exception):
await gather(*self._prepare_callbacks("on_fail", message, error))

async def dispatch_on_connection_fail(
self, error: Exception, retries: int
):
await gather(*self._prepare_callbacks(
"on_connection_fail", error, retries
))
2 changes: 1 addition & 1 deletion requirements/requirements_base.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
async-worker==0.11.3
async-worker==0.12.1
aioamqp==0.14.0
python-json-logger==0.1.11
7 changes: 5 additions & 2 deletions tests/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ def test_should_create_connection(self):
self.App.assert_called_once_with(connections=[self.connection])

def test_should_call_route_when_created(self):
monitor = Mock()
self.barterdude.consume_amqp(
["queue"]
["queue"], monitor=monitor
)(CoroutineMock())
self.app.route.assert_called_once_with(
["queue"],
type=RouteTypes.AMQP_RABBITMQ,
options={
Options.BULK_SIZE: 10,
Options.BULK_FLUSH_INTERVAL: 60
Options.BULK_FLUSH_INTERVAL: 60,
Options.CONNECTION_FAIL_CALLBACK:
monitor.dispatch_on_connection_fail,
}
)

Expand Down
2 changes: 2 additions & 0 deletions tests/test_hooks/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ async def test_should_fail_when_calling_unimplemented_methods(self):
await hook.on_success(None)
with self.assertRaises(NotImplementedError):
await hook.on_fail(None, None)
with self.assertRaises(NotImplementedError):
await hook.on_connection_fail(None, None)
8 changes: 8 additions & 0 deletions tests/test_hooks/test_healthcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,11 @@ async def test_should_erase_old_messages(self):
'{"message": "Success rate: 0.125 (expected: 0.9)", '
'"fail": 7, "success": 1, "status": "fail"}'
)

async def test_should_fail_healthcheck_when_fail_to_connect(self):
await self.healthcheck.on_connection_fail(None, 3)
response = await self.healthcheck(Mock())
self.assertEqual(
response.body._value.decode('utf-8'),
'{"message": "Reached max connection fails (3)", "status": "fail"}'
)
18 changes: 18 additions & 0 deletions tests/test_hooks/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,21 @@ async def test_should_log_on_fail(self, format_tb, repr, dumps, logger):
"exception": repr.return_value,
"traceback": format_tb.return_value,
})

@patch("barterdude.hooks.logging.logger")
@patch("barterdude.hooks.logging.repr")
@patch("barterdude.hooks.logging.format_tb")
async def test_should_log_on_connection_fail(
self, format_tb, repr, logger
):
retries = Mock()
exception = Exception()
await self.logging.on_connection_fail(exception, retries)
repr.assert_called_once_with(exception)
format_tb.assert_called_once_with(exception.__traceback__)
logger.error.assert_called_once_with({
"message": "Failed to connect to the broker",
"retries": retries,
"exception": repr.return_value,
"traceback": format_tb.return_value,
})
10 changes: 10 additions & 0 deletions tests/test_hooks/test_metrics/test_prometheus/test__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ async def test_should_remove_message_from_timer_on_complete(
{}
)

@patch("barterdude.hooks.metrics.prometheus.Prometheus.metrics")
async def test_should_increment_on_connection_fail_counter(self, metrics):
counter = Mock()
labels = Mock(labels=Mock(return_value=counter))
metrics.__getitem__ = Mock(return_value=labels)
await self.prometheus.on_connection_fail(Mock(), Mock())
self.assertEqual(labels.labels.call_count, 1)
labels.labels.assert_called_with(test='my_test')
counter.inc.assert_called_once()

def test_should_call_counter(self):
self.assertTrue(
isinstance(self.prometheus.metrics.counter(
Expand Down
11 changes: 11 additions & 0 deletions tests/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ async def test_should_call_hooks_on_fail(self):
self.hook1.on_fail.assert_called_with({}, exception)
self.hook2.on_fail.assert_called_with({}, exception)

async def test_should_call_hooks_on_connection_fail(self):
exception = Mock()
retries = Mock()
self.hook1.on_connection_fail = CoroutineMock()
self.hook2.on_connection_fail = CoroutineMock()
await self.monitor.dispatch_on_connection_fail(exception, retries)
self.hook1.on_connection_fail.assert_called_once()
self.hook2.on_connection_fail.assert_called_once()
self.hook1.on_connection_fail.assert_called_with(exception, retries)
self.hook2.on_connection_fail.assert_called_with(exception, retries)

@patch("barterdude.monitor.logger")
@patch("barterdude.monitor.repr")
@patch("barterdude.monitor.format_tb")
Expand Down
3 changes: 3 additions & 0 deletions tests_integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ async def on_fail(self, message: RabbitMQMessage, error: Exception):

async def before_consume(self, message: RabbitMQMessage):
raise NotImplementedError

async def on_connection_fail(self, error: Exception, retries: int):
raise NotImplementedError
18 changes: 18 additions & 0 deletions tests_integration/test_rabbitmq_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,21 @@ async def handler(message):
self.assertIn("'delivery_tag': 1", cm.output[1])
self.assertIn(f"'exception': \"{error_str}\"", cm.output[1])
self.assertIn("'traceback': [", cm.output[1])

async def test_fails_to_connect_to_rabbitmq(self):
monitor = Monitor(Logging())

self.app = BarterDude(hostname="invalid_host")

@self.app.consume_amqp([self.input_queue], monitor)
async def handler(message):
pass

await self.app.startup()
with self.assertLogs("barterdude") as cm:
await asyncio.sleep(2)

self.assertIn(
"{'message': 'Failed to connect to the broker', 'retries': 1,",
cm.output[0]
)

0 comments on commit 1392d3c

Please sign in to comment.