Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UnitOfWork #175

Merged
merged 3 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 17 additions & 44 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of Invenio.
# Copyright (C) 2020 CERN.
# Copyright (C) 2020-2024 CERN.
# Copyright (C) 2022 Graz University of Technology.
#
# Invenio is free software; you can redistribute it and/or modify it
Expand All @@ -23,37 +23,17 @@ on:
description: 'Reason'
required: false
default: 'Manual trigger'

jobs:
Tests:
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
strategy:
fail-fast: false
matrix:
python-version: [3.7, 3.8, 3.9]
requirements-level: [pypi]
db-service: [postgresql11, postgresql14, mysql8, sqlite]
exclude:
- python-version: 3.7
db-service: postgresql14
requirements-level: pypi

- python-version: 3.7
db-service: mysql8
requirements-level: pypi

- python-version: 3.8
db-service: postgresql11

- python-version: 3.9
db-service: postgresql11

- python-version: 3.7
db-service: sqlite

- python-version: 3.8
db-service: sqlite
python-version: [3.9, 3.12]
db-service: [postgresql11, postgresql14, mysql8, sqlite]

include:
include:
- db-service: postgresql11
EXTRAS: "tests,postgresql"

Expand All @@ -69,34 +49,27 @@ jobs:
env:
DB: ${{ matrix.db-service }}
EXTRAS: ${{ matrix.EXTRAS }}

steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: pip
cache-dependency-path: setup.cfg

- name: Generate dependencies
run: |
pip install wheel requirements-builder
requirements-builder -e "$EXTRAS" --level=${{ matrix.requirements-level }} setup.py > .${{ matrix.requirements-level }}-${{ matrix.python-version }}-requirements.txt

- name: Cache pip
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('.${{ matrix.requirements-level }}-${{ matrix.python-version }}-requirements.txt') }}
- name: Pre-install
uses: ./.github/actions/pre-install
if: ${{ hashFiles('.github/actions/pre-install/action.yml') != '' }}

- name: Install dependencies
run: |
pip install -r .${{ matrix.requirements-level }}-${{ matrix.python-version }}-requirements.txt -c constraints-${{ matrix.requirements-level }}.txt
pip install ".[$EXTRAS]"
pip freeze
docker --version
docker-compose --version
docker version

- name: Run tests
run: |
./run-tests.sh
run: ./run-tests.sh
244 changes: 244 additions & 0 deletions invenio_db/services/uow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2024 CERN.
#
# Invenio is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Unit of work.

Moved from invenio-records-resources.

Used to group multiple service operations into a single atomic unit. The Unit
of Work maintains a list of operations and coordinates the commit, indexing and
task execution.

Note, this is NOT a clean implementation of the UoW design pattern. The
main purpose of the Unit of Work in Invenio is to coordinate when the database
transaction commit is called, and ensure tasks that have to run after the
transcation are executed (such as indexing and running celery tasks).

This ensures that we can group multiple service calls into a single database
transaction and perform the necessary indexing/task execution afterwards.

**When to use?**

You should use the unit of work instead of running an explicit
``db.session.commit()`` or ``db.session.rollback()`` in the code. Basically
any function where you would normally have called a ``db.session.commit()``
should be changed to something like:

.. code-block:: python

from invenio_db.services.uow import \
ModelCommit, unit_of_work,

@unit_of_work()
def create(self, ... , uow=None):
# ...
uow.register(ModelCommit(model))
# ...
# Do not use `db.session.commit()` in service.

**When not to use?**

If you're not changing the database state there's no need to use the unit of
work. Examples include:

- Reading a model
- Search for models

**How to group multiple service calls?**

In order to group multiple service calls into one atomic operation you can use
the following pattern:

.. code-block:: python

from invenio_db.services.uow import UnitOfWork

with UnitOfWork() as uow:
# Be careful to always inject "uow" to the service. If not, the
# service will create its own unit of work and commit.
service.communities.add(..., uow=uow)
service.publish(... , uow=uow)
uow.commit()

If you're not grouping multiple service calls, then simply just call the
service method (and it will commit automatically):

.. code-block:: python

service.publish(...)

**Writing your own operation?**

You can write your own unit of work operation by subclassing the operation
class and implementing the desired methods:

.. code-block:: python

from invenio_db.services.uow import Operation

class BulkIndexOp(Operation):
def on_commit(self, uow):
# ... executed after the database transaction commit ...
"""

from functools import wraps

from ..shared import db


#
# Unit of work operations
#
class Operation:
"""Base class for unit of work operations."""

def on_register(self, uow):
"""Called upon operation registration."""
pass

def on_commit(self, uow):
"""Called in the commit phase (after the transaction is committed)."""
pass

def on_post_commit(self, uow):
"""Called right after the commit phase."""
pass

def on_rollback(self, uow):
"""Called in the rollback phase (after the transaction rollback)."""
pass

def on_post_rollback(self, uow):
"""Called right after the rollback phase."""
pass


class ModelCommitOp(Operation):
"""SQLAlchemy model add/update operation."""

def __init__(self, model):
"""Initialize the commit operation."""
super().__init__()
self._model = model

def on_register(self, uow):
"""Add model to db session."""
uow.session.add(self._model)


class ModelDeleteOp(Operation):
"""SQLAlchemy model delete operation."""

def __init__(self, model):
"""Initialize the set delete operation."""
super().__init__()
self._model = model

def on_register(self, uow):
"""Delete model."""
uow.session.delete(self._model)


#
# Unit of work context manager
#
class UnitOfWork:
"""Unit of work context manager.

Note, the unit of work does not currently take duplication of work into
account. Thus, you can e.g. add two record commit operations of the same
record which will then index the record twice, even though only one time
is needed.
"""

def __init__(self, session=None):
"""Initialize unit of work context."""
self._session = session or db.session
self._operations = []
self._dirty = False

def __enter__(self):
"""Entering the context."""
return self

def __exit__(self, exc_type, *args):
"""Rollback on exception."""
if exc_type is not None:
self.rollback()
self._mark_dirty()

@property
def session(self):
"""The SQLAlchemy database session associated with this UoW."""
return self._session

def _mark_dirty(self):
"""Mark the unit of work as dirty."""
if self._dirty:
raise RuntimeError("The unit of work is already committed or rolledback.")
self._dirty = True

def commit(self):
"""Commit the unit of work."""
self.session.commit()
# Run commit operations
for op in self._operations:
op.on_commit(self)
# Run post commit operations
for op in self._operations:
op.on_post_commit(self)
self._mark_dirty()

def rollback(self):
"""Rollback the database session."""
self.session.rollback()
# Run rollback operations
for op in self._operations:
op.on_rollback(self)
# Run post rollback operations
for op in self._operations:
op.on_post_rollback(self)

def register(self, op):
"""Register an operation."""
# Run on register
op.on_register(self)
# Append to list of operations.
self._operations.append(op)


def unit_of_work(**kwargs):
"""Decorator to auto-inject a unit of work if not provided.

If no unit of work is provided, this decorator will create a new unit of
work and commit it after the function has been executed.

.. code-block:: python

@unit_of_work()
def aservice_method(self, ...., uow=None):
# ...
uow.register(...)

"""

def decorator(f):
@wraps(f)
def inner(self, *args, **kwargs):
if "uow" not in kwargs:
# Migration path - start a UoW and commit
with UnitOfWork(db.session) as uow:
kwargs["uow"] = uow
res = f(self, *args, **kwargs)
uow.commit()
return res
else:
return f(self, *args, **kwargs)

return inner

return decorator