diff --git a/.bandit b/.bandit index 8817e08..3818f7b 100644 --- a/.bandit +++ b/.bandit @@ -1,2 +1,2 @@ [bandit] -exclude: ./.eggs/ \ No newline at end of file +exclude: ./.eggs/,./tests_unit/,./tests_integration/ \ No newline at end of file diff --git a/README.md b/README.md index e6f1bf4..7430be7 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ async def your_consumer(msg: Message): # you receive only one message and we par data=msg.body ) if msg.body == "fail": - + my_metric.inc() # you can use prometheus metrics healthcheck.force_fail() # you can use your hooks inside consumer too msg.reject(requeue=False) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage @@ -266,7 +266,131 @@ This configuration just controls BarterDude's default Logging Hook and doesn't h from baterdude.conf import BARTERDUDE_LOG_REDACTED ``` -### Testing +## HTTP Endpoints + +### Simple endpoints + +If you want to expose and HTTP endpoint, you can easily do that to your Barterdude worker by adding a route mapped to a hook. + +```python +barterdude.add_endpoint( + routes=["/some_hook"], + methods=["GET"], + hook=some_hook, +) +``` + +### Barterdude's callback endpoint + +You can also expose an HTTP endpoint that calls the worker's callback to emulate a message being consumed and processed from a queue. This way you can make a request passing a body and header of a message and the response of this request will have all information of what the worker would do without really publishing the message. + +```python +barterdude.add_callback_endpoint( + routes=["/execute"], + hook=execute, +) +``` + +In order to use a mock instance of the barterdude object, you also need to modify the signature of your callback method to receive a optional argument for the barterdude mock. Then you'll have to choose which one to use. Only the callback endpoint calls will pass the barterdude object to your callback. + +```python +async def execute(rabbitmq_message: RabbitMQMessage, barterdude_arg=None): + bd = barterdude_arg if barterdude_arg is not None else barterdude +``` + +#### Request and response example: +```json +# Request +{ + "body": { + "list_id": 105515152 + }, + "headers": { + "trace_id": "random_id" + } +} + +# Response +{ + "message_calls": [ + { + "method": "accept", + "args": [], + "kwargs": {} + } + ], + "barterdude_calls": [ + { + "method": "publish_amqp", + "args": [], + "kwargs": { + "exchange": "NEXT_EXCHANGE_TO_BE_CALLED", + "data": { + "list_id": 1055151521, + "subject": "vendo samsung galaxy s21", + "timestamp": 1657231231000 + }, + "properties": { + "headers": { + "has_failed": false, + "trace_id": "random_id" + } + } + } + } + ] +} +``` + +### Side-effects + +If your callback has services with side-effects such as inserting a row in a database or updating an API, you can pass fake instances of these services that are going to be injected to prevent side-effects from happenning. + +```python +barterdude.add_callback_endpoint( + routes=["/execute"], + methods=["POST"], + hook=execute, + mock_dependencies=[ + ( + fake_database_service, # fake service instance to be used by the worker + "database_service", # name used in the data sharing/dependency injection + ), + ] +) +``` + +#### Forcing side-effects + +If you want the message to be published when calling the callback endpoint, you can pass the parameter `should_mock_barterdude: false`. This way the message will be published. Also, you don't have to mock the services used by your worker, all side-effects will happen and you'll have your worker processing your message just like it would be when consuming from a queue. + +#### Request and response example: +```json +# Request +{ + "body": { + "list_id": 105515152 + }, + "headers": { + "trace_id": "random_id" + }, + "should_mock": "should_mock_barterdude" +} + +# Response +{ + "message_calls": [ + { + "method": "accept", + "args": [], + "kwargs": {} + } + ] + # message will be published, so we won't have information about publish method's calls +} +``` + +## Testing To test async consumers we recommend `asynctest` lib diff --git a/barterdude/__init__.py b/barterdude/__init__.py index 5b89a29..d5bd5fb 100644 --- a/barterdude/__init__.py +++ b/barterdude/__init__.py @@ -1,12 +1,16 @@ +import json +import traceback +from aiohttp import web from asyncio import gather from asyncworker import App, RouteTypes from asyncworker.options import Options from asyncworker.connections import AMQPConnection from asyncworker.rabbitmq.message import RabbitMQMessage from collections import MutableMapping -from typing import Iterable, Optional +from typing import Iterable, Optional, Callable, Any, Tuple from barterdude.monitor import Monitor from barterdude.message import MessageValidation, ValidationException +from barterdude.mocks import RabbitMQMessageMock, BarterdudeMock class BarterDude(MutableMapping): @@ -34,6 +38,60 @@ def add_endpoint(self, routes, methods, hook): type=RouteTypes.HTTP )(hook) + def add_callback_endpoint( + self, + routes: Iterable[str], + hook: Callable, + mock_dependencies: Iterable[Tuple[Any, str]] = None, + ): + def hook_to_callback(req): + return self._call_callback_endpoint(req, hook, mock_dependencies) + + self.add_endpoint( + routes=routes, + methods=['POST'], + hook=hook_to_callback + ) + + async def _call_callback_endpoint( + self, + request: web.Request, + hook: Callable, + mock_dependencies: Iterable[Tuple[Any, str]], + ): + payload = await request.json() + body = payload.get('body') + headers = payload.get('headers') + should_mock_barterdude = payload.get('should_mock_barterdude', True) + + if body is None: + return web.Response( + status=400, + body=json.dumps({ + 'msg': 'Missing "body" attribute in payload.' + }) + ) + + rabbitmq_message_mock = RabbitMQMessageMock(body, headers) + + barterdude_mock = None + if should_mock_barterdude: + barterdude_mock = BarterdudeMock(mock_dependencies) + + response = {} + + try: + await hook(rabbitmq_message_mock, barterdude=barterdude_mock) + except Exception: + response['exception'] = traceback.format_exc() + + response['message_calls'] = rabbitmq_message_mock.get_calls() + + if barterdude_mock is not None: + response['barterdude_calls'] = barterdude_mock.get_calls() + + return web.Response(status=200, body=json.dumps(response)) + def consume_amqp( self, queues: Iterable[str], diff --git a/barterdude/mocks.py b/barterdude/mocks.py new file mode 100644 index 0000000..5409eaa --- /dev/null +++ b/barterdude/mocks.py @@ -0,0 +1,90 @@ +import asyncio +from typing import Dict, Any, Iterable, Tuple +from collections import MutableMapping +from aioamqp.properties import Properties + + +class ObjectWithCallsTracking: + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.method_calls = [] + + def __getattribute__(self, name: str) -> Any: + attr = super().__getattribute__(name) + if not hasattr(attr, '__call__') or name == 'get_calls': + return attr + + if asyncio.iscoroutine(attr) or asyncio.iscoroutinefunction(attr): + async def async_track_calls(*args, **kwargs): + self.method_calls.append(( + name, args, kwargs + )) + return await attr(*args, **kwargs) + return async_track_calls + + def track_calls(*args, **kwargs): + self.method_calls.append(( + name, args, kwargs + )) + return attr(*args, **kwargs) + return track_calls + + def get_calls(self): + return [ + {'method': method, 'args': args, 'kwargs': kwargs} + for method, args, kwargs in self.method_calls + ] + + +class RabbitMQMessageMock(ObjectWithCallsTracking): + def __init__( + self, + body: Dict = None, + headers: Dict = None, + *args, + **kwargs + ): + super().__init__(*args, **kwargs) + self.body = body + self.properties = Properties() + self.properties.headers = headers + + def accept(self, *args, **kwargs): + pass + + def reject(self, *args, **kwargs): + pass + + +class BarterdudeMock(MutableMapping, ObjectWithCallsTracking): + def __init__( + self, + mock_dependencies: Iterable[Tuple[Any, str]] = None, + *args, + **kwargs + ): + super().__init__(*args, **kwargs) + self.__app = {} + + if mock_dependencies: + for service, name in mock_dependencies: + self[name] = service + + async def publish_amqp(self, *args, **kwargs): + pass + + def __getitem__(self, key): + return self.__app[key] + + def __setitem__(self, key, value): + self.__app[key] = value + + def __delitem__(self, key): + del self.__app[key] + + def __len__(self): + return len(self.__app) + + def __iter__(self): + return iter(self.__app) diff --git a/tests_unit/test__init__.py b/tests_unit/test__init__.py index 1c2fdd7..2adcea6 100644 --- a/tests_unit/test__init__.py +++ b/tests_unit/test__init__.py @@ -63,6 +63,112 @@ def test_should_call_route_when_adding_endpoint(self): ) self.decorator.assert_called_once_with(hook) + async def test_should_call_route_when_adding_callback_endpoint(self): + hook = Mock() + self.barterdude.add_callback_endpoint( + ['/my_route'], + hook, + [(Mock(), 'service')] + ) + self.app.route.assert_called_once_with( + routes=['/my_route'], + methods=['POST'], + type=RouteTypes.HTTP + ) + self.decorator.assert_called_once() + + async def test_should_hook_call_on_callback_endpoint(self): + async def mock_hook(message, barterdude): + barterdude['service'].method_one() + await barterdude['service'].method_two() + message.accept() + await barterdude.publish_amqp(data={'a': 1}) + + request = Mock() + request.json = CoroutineMock(return_value={'body': {}}) + service_mock = Mock() + service_mock.method_one.return_value = 123 + service_mock.method_two = CoroutineMock(return_value=234) + dependencies = [(service_mock, 'service')] + response = await self.barterdude._call_callback_endpoint( + request, mock_hook, dependencies) + + request.json.assert_called_once() + service_mock.method_one.assert_called_once() + service_mock.method_two.assert_called_once() + assert response.status == 200 + assert response.body._value == ( + b'{"message_calls": [{"method": "accept", "args": [],' + b' "kwargs": {}}], "barterdude_calls": [{"method": ' + b'"publish_amqp", "args": [], "kwargs": {"data": {"a": 1}}}]}' + ) + + async def test_should_hook_call_on_callback_endpoint_without_body(self): + async def mock_hook(message, barterdude): + barterdude['service'].method_one() + await barterdude['service'].method_two() + message.accept() + await barterdude.publish_amqp(data={'a': 1}) + + request = Mock() + request.json = CoroutineMock(return_value={}) + service_mock = Mock() + service_mock.method_one.return_value = 123 + service_mock.method_two = CoroutineMock(return_value=234) + dependencies = [(service_mock, 'service')] + response = await self.barterdude._call_callback_endpoint( + request, mock_hook, dependencies) + + request.json.assert_called_once() + service_mock.method_one.assert_not_called() + service_mock.method_two.assert_not_called() + assert response.status == 400 + expected_msg = b'{"msg": "Missing \\"body\\" attribute in payload."}' + assert response.body._value == expected_msg + + async def test_should_hook_call_on_callback_endpoint_with_exception(self): + async def mock_hook(message, barterdude): + raise Exception + + request = Mock() + request.json = CoroutineMock(return_value={'body': {}}) + service_mock = Mock() + dependencies = [(service_mock, 'service')] + response = await self.barterdude._call_callback_endpoint( + request, mock_hook, dependencies) + + request.json.assert_called_once() + assert response.status == 200 + assert b'exception' in response.body._value + assert b'message_calls' in response.body._value + assert b'barterdude_calls' in response.body._value + + async def test_should_hook_call_on_callback_endpoint_with_dependency(self): + async def mock_hook(message, barterdude): + barterdude['service'].method_one() + barterdude['service'].method_two() + message.accept() + await barterdude.publish_amqp(data={'a': 1}) + + request = Mock() + request.json = CoroutineMock() + service_mock = Mock() + service_mock.method_one.return_value = 123 + service_mock.method_two = CoroutineMock(return_value=234) + dependencies = [(service_mock, 'service')] + response = await self.barterdude._call_callback_endpoint( + request, mock_hook, dependencies) + + request.json.assert_called_once() + service_mock.method_one.assert_called_once() + service_mock.method_two.assert_called_once() + assert response.status == 200 + assert response.body._value == ( + b'{"message_calls": [{"method": "accept", "args": [],' + b' "kwargs": {}}], "barterdude_calls": [{"method": ' + b'"publish_amqp", "args": [], "kwargs": {"data": {"a": 1}}}]}' + ) + async def test_should_call_callback_for_each_message(self): self.barterdude.consume_amqp(["queue"], self.monitor)(self.callback) self.decorator.assert_called_once() diff --git a/tests_unit/test_mocks.py b/tests_unit/test_mocks.py new file mode 100644 index 0000000..7a47a28 --- /dev/null +++ b/tests_unit/test_mocks.py @@ -0,0 +1,46 @@ +from asynctest import TestCase +from barterdude.mocks import BarterdudeMock, RabbitMQMessageMock + + +class TestMocks(TestCase): + + def test_mock_rabbitmqmessage(self): + mock = RabbitMQMessageMock({'a': 1}, {'b': 2}) + mock.accept() + mock.reject(requeue=False) + + assert mock.get_calls() == [ + { + 'method': 'accept', + 'args': (), + 'kwargs': {} + }, + { + 'method': 'reject', + 'args': (), + 'kwargs': {'requeue': False} + }, + ] + + async def test_mock_barterdude(self): + mock = BarterdudeMock() + + mock['value'] = 10 + await mock.publish_amqp(data={'a': 1}) + + assert mock['value'] == 10 + assert len(mock) == 1 + + for key in mock: + assert key == 'value' + + del mock['value'] + assert 'value' not in mock + + assert mock.get_calls() == [ + { + 'method': 'publish_amqp', + 'args': (), + 'kwargs': {'data': {'a': 1}} + }, + ]