Skip to content

Commit

Permalink
Merge pull request #53 from olxbr/feature/addCallbackEndpoint
Browse files Browse the repository at this point in the history
Added feature that enables the creation of an endpoint that call a hook and returns information about its usage
  • Loading branch information
timotta authored Aug 15, 2022
2 parents 954ce6c + 66d1a1e commit c2d7269
Show file tree
Hide file tree
Showing 6 changed files with 428 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .bandit
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
[bandit]
exclude: ./.eggs/
exclude: ./.eggs/,./tests_unit/,./tests_integration/
128 changes: 126 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def your_consumer(msg: Message): # you receive only one message and we par
data=msg.body
)
if msg.body == "fail":

my_metric.inc() # you can use prometheus metrics
healthcheck.force_fail() # you can use your hooks inside consumer too
msg.reject(requeue=False) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage
Expand Down Expand Up @@ -266,7 +266,131 @@ This configuration just controls BarterDude's default Logging Hook and doesn't h
from baterdude.conf import BARTERDUDE_LOG_REDACTED
```

### Testing
## HTTP Endpoints

### Simple endpoints

If you want to expose and HTTP endpoint, you can easily do that to your Barterdude worker by adding a route mapped to a hook.

```python
barterdude.add_endpoint(
routes=["/some_hook"],
methods=["GET"],
hook=some_hook,
)
```

### Barterdude's callback endpoint

You can also expose an HTTP endpoint that calls the worker's callback to emulate a message being consumed and processed from a queue. This way you can make a request passing a body and header of a message and the response of this request will have all information of what the worker would do without really publishing the message.

```python
barterdude.add_callback_endpoint(
routes=["/execute"],
hook=execute,
)
```

In order to use a mock instance of the barterdude object, you also need to modify the signature of your callback method to receive a optional argument for the barterdude mock. Then you'll have to choose which one to use. Only the callback endpoint calls will pass the barterdude object to your callback.

```python
async def execute(rabbitmq_message: RabbitMQMessage, barterdude_arg=None):
bd = barterdude_arg if barterdude_arg is not None else barterdude
```

#### Request and response example:
```json
# Request
{
"body": {
"list_id": 105515152
},
"headers": {
"trace_id": "random_id"
}
}

# Response
{
"message_calls": [
{
"method": "accept",
"args": [],
"kwargs": {}
}
],
"barterdude_calls": [
{
"method": "publish_amqp",
"args": [],
"kwargs": {
"exchange": "NEXT_EXCHANGE_TO_BE_CALLED",
"data": {
"list_id": 1055151521,
"subject": "vendo samsung galaxy s21",
"timestamp": 1657231231000
},
"properties": {
"headers": {
"has_failed": false,
"trace_id": "random_id"
}
}
}
}
]
}
```

### Side-effects

If your callback has services with side-effects such as inserting a row in a database or updating an API, you can pass fake instances of these services that are going to be injected to prevent side-effects from happenning.

```python
barterdude.add_callback_endpoint(
routes=["/execute"],
methods=["POST"],
hook=execute,
mock_dependencies=[
(
fake_database_service, # fake service instance to be used by the worker
"database_service", # name used in the data sharing/dependency injection
),
]
)
```

#### Forcing side-effects

If you want the message to be published when calling the callback endpoint, you can pass the parameter `should_mock_barterdude: false`. This way the message will be published. Also, you don't have to mock the services used by your worker, all side-effects will happen and you'll have your worker processing your message just like it would be when consuming from a queue.

#### Request and response example:
```json
# Request
{
"body": {
"list_id": 105515152
},
"headers": {
"trace_id": "random_id"
},
"should_mock": "should_mock_barterdude"
}

# Response
{
"message_calls": [
{
"method": "accept",
"args": [],
"kwargs": {}
}
]
# message will be published, so we won't have information about publish method's calls
}
```

## Testing

To test async consumers we recommend `asynctest` lib

Expand Down
60 changes: 59 additions & 1 deletion barterdude/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import json
import traceback
from aiohttp import web
from asyncio import gather
from asyncworker import App, RouteTypes
from asyncworker.options import Options
from asyncworker.connections import AMQPConnection
from asyncworker.rabbitmq.message import RabbitMQMessage
from collections import MutableMapping
from typing import Iterable, Optional
from typing import Iterable, Optional, Callable, Any, Tuple
from barterdude.monitor import Monitor
from barterdude.message import MessageValidation, ValidationException
from barterdude.mocks import RabbitMQMessageMock, BarterdudeMock


class BarterDude(MutableMapping):
Expand Down Expand Up @@ -34,6 +38,60 @@ def add_endpoint(self, routes, methods, hook):
type=RouteTypes.HTTP
)(hook)

def add_callback_endpoint(
self,
routes: Iterable[str],
hook: Callable,
mock_dependencies: Iterable[Tuple[Any, str]] = None,
):
def hook_to_callback(req):
return self._call_callback_endpoint(req, hook, mock_dependencies)

self.add_endpoint(
routes=routes,
methods=['POST'],
hook=hook_to_callback
)

async def _call_callback_endpoint(
self,
request: web.Request,
hook: Callable,
mock_dependencies: Iterable[Tuple[Any, str]],
):
payload = await request.json()
body = payload.get('body')
headers = payload.get('headers')
should_mock_barterdude = payload.get('should_mock_barterdude', True)

if body is None:
return web.Response(
status=400,
body=json.dumps({
'msg': 'Missing "body" attribute in payload.'
})
)

rabbitmq_message_mock = RabbitMQMessageMock(body, headers)

barterdude_mock = None
if should_mock_barterdude:
barterdude_mock = BarterdudeMock(mock_dependencies)

response = {}

try:
await hook(rabbitmq_message_mock, barterdude=barterdude_mock)
except Exception:
response['exception'] = traceback.format_exc()

response['message_calls'] = rabbitmq_message_mock.get_calls()

if barterdude_mock is not None:
response['barterdude_calls'] = barterdude_mock.get_calls()

return web.Response(status=200, body=json.dumps(response))

def consume_amqp(
self,
queues: Iterable[str],
Expand Down
90 changes: 90 additions & 0 deletions barterdude/mocks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import asyncio
from typing import Dict, Any, Iterable, Tuple
from collections import MutableMapping
from aioamqp.properties import Properties


class ObjectWithCallsTracking:

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.method_calls = []

def __getattribute__(self, name: str) -> Any:
attr = super().__getattribute__(name)
if not hasattr(attr, '__call__') or name == 'get_calls':
return attr

if asyncio.iscoroutine(attr) or asyncio.iscoroutinefunction(attr):
async def async_track_calls(*args, **kwargs):
self.method_calls.append((
name, args, kwargs
))
return await attr(*args, **kwargs)
return async_track_calls

def track_calls(*args, **kwargs):
self.method_calls.append((
name, args, kwargs
))
return attr(*args, **kwargs)
return track_calls

def get_calls(self):
return [
{'method': method, 'args': args, 'kwargs': kwargs}
for method, args, kwargs in self.method_calls
]


class RabbitMQMessageMock(ObjectWithCallsTracking):
def __init__(
self,
body: Dict = None,
headers: Dict = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.body = body
self.properties = Properties()
self.properties.headers = headers

def accept(self, *args, **kwargs):
pass

def reject(self, *args, **kwargs):
pass


class BarterdudeMock(MutableMapping, ObjectWithCallsTracking):
def __init__(
self,
mock_dependencies: Iterable[Tuple[Any, str]] = None,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.__app = {}

if mock_dependencies:
for service, name in mock_dependencies:
self[name] = service

async def publish_amqp(self, *args, **kwargs):
pass

def __getitem__(self, key):
return self.__app[key]

def __setitem__(self, key, value):
self.__app[key] = value

def __delitem__(self, key):
del self.__app[key]

def __len__(self):
return len(self.__app)

def __iter__(self):
return iter(self.__app)
Loading

0 comments on commit c2d7269

Please sign in to comment.