Skip to content

Commit

Permalink
retry async task incase of exception
Browse files Browse the repository at this point in the history
  • Loading branch information
kelvin-muchiri committed Mar 28, 2024
1 parent 0ced18e commit 1f83755
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
15 changes: 12 additions & 3 deletions onadata/apps/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.core.files.storage import default_storage
from django.contrib.auth import get_user_model
from django.db import DatabaseError
from django.utils import timezone
from django.utils.datastructures import MultiValueDict

Expand Down Expand Up @@ -188,7 +189,15 @@ def regenerate_form_instance_json(xform_id: int):
safe_delete(cache_key)


@app.task()
class ShareProjectBaseTask(app.Task):
autoretry_for = (
DatabaseError,
ConnectionError,
)
retry_backoff = 3


@app.task(base=ShareProjectBaseTask)
def add_org_user_and_share_projects_async(
org_id, user_id, role=None
): # pylint: disable=invalid-name
Expand All @@ -207,7 +216,7 @@ def add_org_user_and_share_projects_async(
tools.add_org_user_and_share_projects(organization, user, role)


@app.task()
@app.task(base=ShareProjectBaseTask)
def remove_org_user_async(org_id, user_id):
"""Remove user from organization asynchronously"""
try:
Expand All @@ -224,7 +233,7 @@ def remove_org_user_async(org_id, user_id):
tools.remove_user_from_organization(organization, user)


@app.task()
@app.task(base=ShareProjectBaseTask)
def share_project_async(project_id, username, role, remove=False):
"""Share project asynchronously"""
try:
Expand Down
71 changes: 64 additions & 7 deletions onadata/apps/api/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from django.core.cache import cache
from django.contrib.auth import get_user_model
from django.db import DatabaseError, OperationalError

from onadata.apps.api.tasks import (
send_project_invitation_email_async,
Expand Down Expand Up @@ -131,28 +132,57 @@ def setUp(self):

def test_user_added_to_org(self, mock_add):
"""User is added to organization"""
add_org_user_and_share_projects_async(self.org.pk, self.user.pk, "manager")
add_org_user_and_share_projects_async.delay(
self.org.pk, self.user.pk, "manager"
)
mock_add.assert_called_once_with(self.org, self.user, "manager")

def test_role_optional(self, mock_add):
"""role param is optional"""
add_org_user_and_share_projects_async(self.org.pk, self.user.pk)
add_org_user_and_share_projects_async.delay(self.org.pk, self.user.pk)
mock_add.assert_called_once_with(self.org, self.user, None)

@patch("onadata.apps.api.tasks.logger.exception")
def test_invalid_org_id(self, mock_log, mock_add):
"""Invalid org_id is handled"""
add_org_user_and_share_projects_async(sys.maxsize, self.user.pk)
add_org_user_and_share_projects_async.delay(sys.maxsize, self.user.pk)
mock_add.assert_not_called()
mock_log.assert_called_once()

@patch("onadata.apps.api.tasks.logger.exception")
def test_invalid_user_id(self, mock_log, mock_add):
"""Invalid org_id is handled"""
add_org_user_and_share_projects_async(self.org.pk, sys.maxsize)
add_org_user_and_share_projects_async.delay(self.org.pk, sys.maxsize)
mock_add.assert_not_called()
mock_log.assert_called_once()

@patch("onadata.apps.api.tasks.add_org_user_and_share_projects_async.retry")
def test_database_error(self, mock_retry, mock_add):
"""We retry calls if DatabaseError is raised"""
mock_add.side_effect = DatabaseError()
add_org_user_and_share_projects_async.delay(self.org.pk, self.user.pk)
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], DatabaseError))

@patch("onadata.apps.api.tasks.add_org_user_and_share_projects_async.retry")
def test_connection_error(self, mock_retry, mock_add):
"""We retry calls if ConnectionError is raised"""
mock_add.side_effect = ConnectionError()
add_org_user_and_share_projects_async.delay(self.org.pk, self.user.pk)
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], ConnectionError))

@patch("onadata.apps.api.tasks.add_org_user_and_share_projects_async.retry")
def test_operation_error(self, mock_retry, mock_add):
"""We retry calls if OperationError is raised"""
mock_add.side_effect = OperationalError()
add_org_user_and_share_projects_async.delay(self.org.pk, self.user.pk)
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], OperationalError))


@patch("onadata.apps.api.tasks.tools.remove_user_from_organization")
class RemoveOrgUserAsyncTestCase(TestBase):
Expand All @@ -169,19 +199,46 @@ def setUp(self):

def test_user_removed_from_org(self, mock_remove):
"""User is removed from organization"""
remove_org_user_async(self.org.pk, self.user.pk)
remove_org_user_async.delay(self.org.pk, self.user.pk)
mock_remove.assert_called_once_with(self.org, self.user)

@patch("onadata.apps.api.tasks.logger.exception")
def test_invalid_org_id(self, mock_log, mock_remove):
"""Invalid org_id is handled"""
remove_org_user_async(sys.maxsize, self.user.pk)
remove_org_user_async.delay(sys.maxsize, self.user.pk)
mock_remove.assert_not_called()
mock_log.assert_called_once()

@patch("onadata.apps.api.tasks.logger.exception")
def test_invalid_user_id(self, mock_log, mock_remove):
"""Invalid org_id is handled"""
remove_org_user_async(self.org.pk, sys.maxsize)
remove_org_user_async.delay(self.org.pk, sys.maxsize)
mock_remove.assert_not_called()
mock_log.assert_called_once()

@patch("onadata.apps.api.tasks.remove_org_user_async.retry")
def test_database_error(self, mock_retry, mock_remove):
"""We retry calls if DatabaseError is raised"""
mock_remove.side_effect = DatabaseError()
remove_org_user_async.delay(self.org.pk, self.user.pk)
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], DatabaseError))

@patch("onadata.apps.api.tasks.remove_org_user_async.retry")
def test_connection_error(self, mock_retry, mock_remove):
"""We retry calls if ConnectionError is raised"""
mock_remove.side_effect = ConnectionError()
remove_org_user_async.delay(self.org.pk, self.user.pk)
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], ConnectionError))

@patch("onadata.apps.api.tasks.remove_org_user_async.retry")
def test_operation_error(self, mock_retry, mock_remove):
"""We retry calls if OperationError is raised"""
mock_remove.side_effect = OperationalError()
remove_org_user_async.delay(self.org.pk, self.user.pk)
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], OperationalError))

0 comments on commit 1f83755

Please sign in to comment.