-
We’ll show how dependency injection supports our architectural goals.
-
We’ll introduce a composition root pattern to bootstrap our system.
-
We’ll offer some guidance on managing application configuration.
-
We’ll compare different approaches to dependency injection and discuss their trade-offs.
// DIAGRAM GOES HERE
Note
|
placeholder chapter, under construction |
Depending on your particular brain type, you may have a slight feeling of unease at the back of your mind at this point. Let’s bring it out into the open. We’ve currently shown two different ways of managing dependencies, and testing them.
For our database dependency, we’ve built a careful framework of explicit dependencies and easy options for overriding them in tests:
Tip
|
If you haven’t already, it’s worth reading [chapter_03_abstractions] before continuing with this chapter. |
Our main handler functions declare an explicit dependency on the unit of work:
def allocate(
cmd: commands.Allocate, uow: unit_of_work.AbstractUnitOfWork
):
And that makes it easy to swap in a fake unit of work in our service-layer tests
uow = FakeUnitOfWork()
messagebus.handle([...], uow)
The UoW itself declares an explicit dependency on the session factory:
class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
self.session_factory = session_factory
...
We take advantage of it in our integration tests to be able to use sqlite instead of Postgres, sometimes
def test_rolls_back_uncommitted_work_by_default(sqlite_session_factory):
uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory) #(1)
-
Integration tests swap out the default postgres session_factory for a sqlite one.
If you’re used to the way things normally happen in Python, you’ll be thinking all this is a bit weird. The standard way to do things is to declare our dependency "implicitly" by simply importing it, and then if we ever need to change it for tests, we can monkeypatch, as is Right and True in dynamic languages:
from allocation import commands, events, email, exceptions, model, redis_pubsub #(1)
...
def send_out_of_stock_notification(
event: events.OutOfStock, uow: unit_of_work.AbstractUnitOfWork,
):
email.send( #(2)
'[email protected]',
f'Out of stock for {event.sku}',
)
-
hardcoded import
-
calls specific email sender directly.
Why pollute our application code with unnecessary arguments just for the
sake of our tests? mock.patch
makes monkeypatching nice and easy:
with mock.patch("allocation.email.send") as mock_send_mail:
...
The trouble is that we’ve made it look easy because our toy example doesn’t
send real emails (email.send_mail
just does a print
), but in real life
you’d end up having to call mock.patch
for every single test that might
cause an out-of-stock notification. If you’ve worked on codebases with lots of
mocks used to prevent unwanted side-effects, you’ll know how annoying that
mocky boilerplate gets.
And, you’ll know that mocks tightly couple us to the implementation. By
choosing to monkeypatch email.send_mail
, we are tied to doing import email
,
and if we ever want to do from email import send_mail
, a trivial refactor,
we’d have to change all our mocks.
So it’s a trade-off. Yes declaring explicit dependencies is "unnecessary," strictly speaking, and using them would make our application code marginally more complex. But in return, we’d get tests that are easier to write and manage.
On top of which, declaring an explicit dependency is an implementation of the DIP — rather than having an (implicit) dependency on a specific detail, we have an (explicit) dependency on an abstraction:
def send_out_of_stock_notification(
event: events.OutOfStock, send_mail: Callable,
):
send_mail(
'[email protected]',
f'Out of stock for {event.sku}',
)
But if we do declare these dependencies explicitly, who will inject them and how? So far, we’ve only really been dealing with passing the UoW around. What about all these other things?
Since we’ve now made the messagebus into the core of our application, it’s the ideal place to manage these dependencies.
Here’s one way to do it:
class MessageBus: #(1)
def __init__(
self,
uow: unit_of_work.AbstractUnitOfWork, #(2)
send_mail: Callable, #(2)
publish: Callable, #(2)
):
self.uow = uow
self.dependencies = dict(uow=uow, send_mail=send_mail, publish=publish) #(3)
def handle(self, message: Message):
if isinstance(message, events.Event):
self.handle_event(message)
elif isinstance(message, commands.Command):
self.handle_command(message)
else:
raise Exception(f'{message} was not an Event or Command')
-
The messagebus becomes a class…
-
…which asks for all our dependencies in one place
-
and stores them into a dict
What else changes in the bus?
def handle_event(self, event: events.Event): #(1)
for handler in EVENT_HANDLERS[type(event)]:
try:
print('handling event', event, 'with handler', handler, flush=True)
self.call_handler_with_dependencies(handler, event) #(2)
except:
print(f'Exception handling event {event}\n:{traceback.format_exc()}')
continue
def handle_command(self, command: commands.Command): #(1)
print('handling command', command, flush=True)
try:
handler = COMMAND_HANDLERS[type(command)]
self.call_handler_with_dependencies(handler, command) #(2)
except Exception as e:
print(f'Exception handling command {command}: {e}')
raise e
-
handle_event
andhandle_command
are substantially the same, but instead of calling handlers directly and only passing in the UoW, they call a new method: -
self.call_handler_with_dependencies()
, which takes the handler function and the event we want to call:
Here’s the core of our dependency injection approach then. As you’ll see there’s not much to it:
def call_handler_with_dependencies(self, handler: Callable, message: Message):
params = inspect.signature(handler).parameters #(1)
deps = {
name: dependency for name, dependency in self.dependencies.items() #(2)
if name in params
}
handler(message, **deps) #(3)
-
We inspect our command/event handler’s arguments
-
We match them by name to our dependencies
-
And we inject them in as kwargs when we actually call the handler
Note this is simple approach is only really possible because we’ve made the messagebus into the core of our app — if we still had a mixture of service functions and event handlers and other entrypoints, our dependencies would be all over the place.
In our flask app, we can just initialise the messagebus inline with the rest of our app config and setup, passing it in the actual dependencies we want to use:
from allocation import (
commands, email, exceptions, messagebus, orm, redis_pubsub, unit_of_work,
views,
)
app = Flask(__name__)
orm.start_mappers()
uow = unit_of_work.SqlAlchemyUnitOfWork()
bus = messagebus.MessageBus(
uow=uow,
send_mail=email.send,
publish=redis_pubsub.publish
)
uow.set_bus(bus)
def get_bus(): #(1)
uow = unit_of_work.SqlAlchemyUnitOfWork()
bus = messagebus.MessageBus(
uow=uow,
notifications=email.send,
publish=publish
)
uow.set_bus(bus)
return bus
def main():
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe('change_batch_quantity')
bus = get_bus() #(1)
for m in pubsub.listen():
handle_change_batch_quantity(m, bus)
def handle_change_batch_quantity(m, bus: messagebus.MessageBus):
-
In the redis case we can’t do the initialisation at import-time, because we have a circular dependency between flask and redis (we’ll look at fixing that in [appendix_bootstrap].
class FakeBus(messagebus.MessageBus):
def __init__(self):
uow = FakeUnitOfWork()
super().__init__(
uow=uow,
send_mail=mock.Mock(),
publish=mock.Mock(),
)
uow.set_bus(self)
...
class TestAddBatch:
@staticmethod
def test_for_new_product():
bus = FakeBus()
bus.handle(commands.CreateBatch("b1", "CRUNCHY-ARMCHAIR", 100, None))
assert bus.uow.products.get("CRUNCHY-ARMCHAIR") is not None
assert bus.uow.committed
We’ve got two types of dependency:
uow: unit_of_work.AbstractUnitOfWork, #(1)
send_mail: Callable, #(2)
publish: Callable, #(2)
-
the UoW has an abstract base class. This is the heavyweight option for declaring and managing your external dependency. We’d use this for case when the dependency is relatively complex
-
our email sender and pubsub publisher are just defined as functions. This works just fine for simple things.
Here are some of the things we find ourselves injecting at work:
-
an S3 filesystem client
-
a key/value store client
-
a
requests
session object.
Most of these will have more complex APIs that you can’t capture as a single function. Read and write, GET and POST, and so on.
Even though it’s simple, let’s use send_mail
as an example to talk
through how you might define a more complex dependency.
We’ll imagine a more generic "notifications" API. Could be email, could be SMS, could be slack posts one day.
class AbstractNotifications(abc.ABC):
@abc.abstractmethod
def send(self, destination, message):
raise NotImplementedError
...
class EmailNotifications(AbstractNotifications):
def __init__(self, smtp_host=DEFAULT_HOST, port=DEFAULT_PORT):
self.server = smtplib.SMTP(smtp_host, port=port)
self.server.noop()
def send(self, destination, message):
msg = f'Subject: allocation service notification\n{message}'
self.server.sendmail(
from_addr='[email protected]',
to_addrs=[destination],
msg=msg
)
we change the dependency in the messagebus:
class MessageBus:
def __init__(
self,
uow: unit_of_work.AbstractUnitOfWork,
notifications: notifications.AbstractNotifications,
publish: Callable,
):
We work through and define a fake version for unit testing:
class FakeNotifications(notifications.AbstractNotifications):
def __init__(self):
self.sent = defaultdict(list) # type: Dict[str, str]
def send(self, destination, message):
self.sent[destination].append(message)
...
class FakeBus(messagebus.MessageBus):
def __init__(self):
uow = FakeUnitOfWork()
super().__init__(
uow=uow,
notifications=FakeNotifications(),
publish=mock.Mock(),
)
uow.set_bus(self)
we can use it in our tests:
def test_sends_email_on_out_of_stock_error():
bus = FakeBus()
bus.handle(commands.CreateBatch("b1", "POPULAR-CURTAINS", 9, None))
bus.handle(commands.Allocate("o1", "POPULAR-CURTAINS", 10))
assert bus.dependencies['notifications'].sent['[email protected]'] == [
f"Out of stock for POPULAR-CURTAINS",
]
Now we test the real thing, usally with an end-to-end or integration test. We’ve used MailHog as a real-ish email server for our docker dev environment.
cfg = config.get_email_host_and_port()
@pytest.fixture
def bus(sqlite_session_factory):
uow = unit_of_work.SqlAlchemyUnitOfWork(sqlite_session_factory)
bus = messagebus.MessageBus(
uow=uow,
notifications=notifications.EmailNotifications(
smtp_host=cfg['host'],
port=cfg['port'],
),
publish=lambda *_, **__: None
)
uow.set_bus(bus)
return bus
def random_sku():
return uuid.uuid4().hex[:6]
def test_out_of_stock_email(bus):
sku = random_sku()
bus.handle(commands.CreateBatch('batch1', sku, 9, None))
bus.handle(commands.Allocate('order1', sku, 10))
messages = requests.get(
f'http://{cfg["host"]}:{cfg["http_port"]}/api/v2/messages'
).json()
message = next(
m for m in messages['items']
if sku in str(m)
)
assert message['Raw']['From'] == '[email protected]'
assert message['Raw']['To'] == ['[email protected]']
assert f'Out of stock for {sku}' in message['Raw']['Data']
against all the odds this actually worked, pretty much first go!
And, erm, that’s it really.
-
Define your API using an ABC
-
Implement the real thing
-
Build a fake and use it for unit / service-layer / handler tests
-
Find a less-fake version you can put into your docker environment
-
Test the less-fake "real" thing
-
Profit!
Note
|
TODO, under construction |
Why not have a go at changing from email to, idk, twilio or slack notifications or something?
Oh yeah, step 4 is a bit challenging…
Or, do the same thing for redis. You’ll need to split pub from sub.