Skip to content

Commit

Permalink
Delete bulk submissions asynchronously (#2738)
Browse files Browse the repository at this point in the history
* delete submissions asynchronously

* add type hints, docstring

* update docstring

* fix redefined argument

* only allow hard deletion if setting enabled

* update instance date_modified when deleting instance

* update docstring

* fix failing tests

* exclude deleting submissions from data list

* suppress lint warning

* add test

* add test

* add test

* return with message after deleting submissions

* add tests

* add tests

* make deleted_by required

* update doc string

* refactor code
  • Loading branch information
kelvin-muchiri authored Nov 21, 2024
1 parent bd97270 commit 1cffdf3
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 132 deletions.
47 changes: 37 additions & 10 deletions onadata/apps/api/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,33 @@
"""
Celery api.tasks module.
"""

import logging
import os
import sys
import logging
from datetime import timedelta

from celery.result import AsyncResult
from django.conf import settings
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.core.files.storage import default_storage
from django.contrib.auth import get_user_model
from django.core.files.storage import default_storage
from django.core.files.uploadedfile import TemporaryUploadedFile
from django.db import DatabaseError
from django.utils import timezone
from django.utils.datastructures import MultiValueDict

from celery.result import AsyncResult

from onadata.apps.api import tools
from onadata.apps.logger.models import Instance, ProjectInvitation, XForm, Project
from onadata.apps.logger.models import Instance, Project, ProjectInvitation, XForm
from onadata.celeryapp import app
from onadata.libs.utils.email import send_generic_email
from onadata.libs.utils.model_tools import queryset_iterator
from onadata.libs.models.share_project import ShareProject
from onadata.libs.utils.cache_tools import (
safe_delete,
XFORM_REGENERATE_INSTANCE_JSON_TASK,
safe_delete,
)
from onadata.libs.models.share_project import ShareProject
from onadata.libs.utils.email import ProjectInvitationEmail
from onadata.libs.utils.email import ProjectInvitationEmail, send_generic_email
from onadata.libs.utils.logger_tools import delete_xform_submissions
from onadata.libs.utils.model_tools import queryset_iterator

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -200,3 +202,28 @@ def share_project_async(project_id, username, role, remove=False):
else:
share = ShareProject(project, username, role, remove)
share.save()


@app.task(retry_backoff=3, autoretry_for=(DatabaseError, ConnectionError))
def delete_xform_submissions_async(
xform_id: int,
deleted_by_id: int,
instance_ids: list[int] | None = None,
soft_delete: bool = True,
):
"""Delete xform submissions asynchronously
:param xform_id: XForm id
:param deleted_by_id: User id who deleted the instances
:param instance_ids: List of instance ids to delete, None to delete all
:param soft_delete: Soft delete instances if True, otherwise hard delete
"""
try:
xform = XForm.objects.get(pk=xform_id)
deleted_by = User.objects.get(pk=deleted_by_id)

except (XForm.DoesNotExist, User.DoesNotExist) as err:
logger.exception(err)

else:
delete_xform_submissions(xform, deleted_by, instance_ids, soft_delete)
57 changes: 50 additions & 7 deletions onadata/apps/api/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
"""Tests for module onadata.apps.api.tasks"""

import sys

from unittest.mock import patch

from django.core.cache import cache
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.db import DatabaseError, OperationalError

from onadata.apps.api.tasks import (
send_project_invitation_email_async,
ShareProject,
delete_xform_submissions_async,
regenerate_form_instance_json,
send_project_invitation_email_async,
share_project_async,
ShareProject,
)
from onadata.apps.logger.models import ProjectInvitation, Instance
from onadata.apps.logger.models import Instance, ProjectInvitation
from onadata.apps.main.tests.test_base import TestBase
from onadata.libs.permissions import ManagerRole
from onadata.libs.serializers.organization_serializer import OrganizationSerializer
from onadata.libs.utils.user_auth import get_user_default_project
from onadata.libs.utils.email import ProjectInvitationEmail
from onadata.libs.utils.cache_tools import ORG_PROFILE_CACHE
from onadata.libs.utils.email import ProjectInvitationEmail
from onadata.libs.utils.user_auth import get_user_default_project

User = get_user_model()

Expand Down Expand Up @@ -185,3 +185,46 @@ def test_operation_error(self, mock_retry, mock_share):
self.assertTrue(mock_retry.called)
_, kwargs = mock_retry.call_args_list[0]
self.assertTrue(isinstance(kwargs["exc"], OperationalError))


@patch("onadata.apps.api.tasks.delete_xform_submissions")
class DeleteXFormSubmissionsAsyncTestCase(TestBase):
"""Tests for delete_xform_submissions_async"""

def setUp(self):
super().setUp()

self._publish_transportation_form()

def test_delete(self, mock_delete):
"""Submissions are deleted"""
delete_xform_submissions_async.delay(self.xform.pk, self.user.pk, [1, 2], False)
mock_delete.assert_called_once_with(self.xform, self.user, [1, 2], False)

@patch("onadata.apps.api.tasks.delete_xform_submissions_async.retry")
def test_database_error(self, mock_retry, mock_delete):
"""We retry calls if DatabaseError is raised"""
mock_delete.side_effect = DatabaseError()
delete_xform_submissions_async.delay(self.xform.pk, self.user.pk)
self.assertTrue(mock_retry.called)

@patch("onadata.apps.api.tasks.delete_xform_submissions_async.retry")
def test_connection_error(self, mock_retry, mock_delete):
"""We retry calls if ConnectionError is raised"""
mock_delete.side_effect = ConnectionError()
delete_xform_submissions_async.delay(self.xform.pk, self.user.pk)
self.assertTrue(mock_retry.called)

@patch("onadata.apps.api.tasks.logger.exception")
def test_xform_id_invalid(self, mock_logger, mock_delete):
"""Invalid xform_id is handled"""
delete_xform_submissions_async.delay(sys.maxsize, self.user.pk)
self.assertFalse(mock_delete.called)
mock_logger.assert_called_once()

@patch("onadata.apps.api.tasks.logger.exception")
def test_user_id_invalid(self, mock_logger, mock_delete):
"""Invalid user_id is handled"""
delete_xform_submissions_async.delay(self.xform.pk, sys.maxsize)
self.assertFalse(mock_delete.called)
mock_logger.assert_called_once()
127 changes: 86 additions & 41 deletions onadata/apps/api/tests/viewsets/test_data_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""
Test /data API endpoint implementation.
"""

from __future__ import unicode_literals

import csv
Expand Down Expand Up @@ -1671,8 +1672,7 @@ def test_data_w_attachment(self):
self.assertIsInstance(response.data, dict)
self.assertDictContainsSubset(data, response.data)

@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_delete_submission(self, send_message_mock):
def test_delete_submission(self):
self._make_submissions()
formid = self.xform.pk
dataid = self.xform.instances.all().order_by("id")[0].pk
Expand All @@ -1691,15 +1691,6 @@ def test_delete_submission(self, send_message_mock):
self.assertEqual(response.status_code, 204)
first_xform_instance = self.xform.instances.filter(pk=dataid)
self.assertEqual(first_xform_instance[0].deleted_by, request.user)
# message sent upon delete
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=dataid,
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)

# second delete of same submission should return 404
request = self.factory.delete("/", **self.extra)
Expand Down Expand Up @@ -1767,8 +1758,8 @@ def test_post_save_signal_on_submission_deletion(self, mock, send_message_mock):
self.assertEqual(mock.call_count, 1)
self.assertTrue(send_message_mock.called)

@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_deletion_of_bulk_submissions(self, send_message_mock):
@patch("onadata.apps.api.viewsets.data_viewset.safe_cache_set")
def test_deletion_of_bulk_submissions(self, mock_cache_set):
self._make_submissions()
self.xform.refresh_from_db()
formid = self.xform.pk
Expand Down Expand Up @@ -1804,19 +1795,16 @@ def test_deletion_of_bulk_submissions(self, send_message_mock):
response.data.get("message"),
"%d records were deleted" % len(records_to_be_deleted),
)
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=[str(i.pk) for i in records_to_be_deleted],
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)
self.xform.refresh_from_db()
current_count = self.xform.instances.filter(deleted_at=None).count()
self.assertNotEqual(current_count, initial_count)
self.assertEqual(current_count, 2)
self.assertEqual(self.xform.num_of_submissions, 2)
mock_cache_set.assert_called_once_with(
f"xfm-submissions-deleting-{formid}",
[str(i.pk) for i in records_to_be_deleted],
3600,
)

@override_settings(ENABLE_SUBMISSION_PERMANENT_DELETE=True)
@patch("onadata.apps.api.viewsets.data_viewset.send_message")
Expand Down Expand Up @@ -1879,8 +1867,7 @@ def test_submissions_permanent_deletion(self, send_message_mock):
self.assertEqual(self.xform.num_of_submissions, 3)

@override_settings(ENABLE_SUBMISSION_PERMANENT_DELETE=True)
@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_permanent_deletions_bulk_submissions(self, send_message_mock):
def test_permanent_deletions_bulk_submissions(self):
"""
Test that permanent bulk submission deletions work
"""
Expand All @@ -1904,14 +1891,6 @@ def test_permanent_deletions_bulk_submissions(self, send_message_mock):
response.data.get("message"),
"%d records were deleted" % len(records_to_be_deleted),
)
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=[str(i.pk) for i in records_to_be_deleted],
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)
self.xform.refresh_from_db()
current_count = self.xform.num_of_submissions
self.assertNotEqual(current_count, initial_count)
Expand Down Expand Up @@ -2016,8 +1995,7 @@ def test_delete_submission_inactive_form(self, send_message_mock):
self.assertEqual(response.status_code, 400)
self.assertTrue(send_message_mock.called)

@patch("onadata.apps.api.viewsets.data_viewset.send_message")
def test_delete_submissions(self, send_message_mock):
def test_delete_submissions(self):
xls_file_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"../fixtures/tutorial/tutorial.xlsx",
Expand Down Expand Up @@ -2057,14 +2035,6 @@ def test_delete_submissions(self, send_message_mock):
response.data.get("message"),
"%d records were deleted" % len(deleted_instances_subset),
)
self.assertTrue(send_message_mock.called)
send_message_mock.assert_called_with(
instance_id=[str(i.pk) for i in deleted_instances_subset],
target_id=formid,
target_type=XFORM,
user=request.user,
message_verb=SUBMISSION_DELETED,
)

# Test that num of submissions for the form is successfully updated
self.xform.refresh_from_db()
Expand Down Expand Up @@ -3825,6 +3795,81 @@ def test_merged_dataset_geojson(self):
response.data,
)

def test_submissions_deletion_in_progress(self):
"""Submissions whose deletion is in progress are excluded from list"""
self._make_submissions()
self.assertEqual(self.xform.instances.count(), 4)
view = DataViewSet.as_view({"get": "list"})
formid = self.xform.pk
instances = self.xform.instances.all()
cache.set(
f"xfm-submissions-deleting-{self.xform.pk}",
[instances[0].pk, instances[1].pk],
)
# No query
request = self.factory.get("/", **self.extra)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# With query
data = {"query": '{"_submission_time":{"$gt":"2018-04-19"}}'}
request = self.factory.get("/", **self.extra, data=data)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# With sort
data = {"sort": 1}
request = self.factory.get("/", **self.extra, data=data)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)
# Cached submission ids saved as strings
cache.set(
f"xfm-submissions-deleting-{self.xform.pk}",
[str(instances[0].pk), str(instances[1].pk)],
)
request = self.factory.get("/", **self.extra)
response = view(request, pk=formid)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(response.data), 2)

@override_settings(ENABLE_SUBMISSION_PERMANENT_DELETE=True)
@patch(
"onadata.apps.api.viewsets.data_viewset.delete_xform_submissions_async.delay"
)
def test_deletion_of_bulk_submissions_async(self, mock_del_async):
"""Deletion of bulk submissions is done asynchronously"""
self._make_submissions()

view = DataViewSet.as_view({"delete": "destroy"})

records_to_be_deleted = self.xform.instances.all()[:2]
instance_ids = ",".join([str(i.pk) for i in records_to_be_deleted])
data = {"instance_ids": instance_ids}
request = self.factory.delete("/", data=data, **self.extra)
response = view(request, pk=self.xform.pk)

self.assertEqual(response.status_code, 200)
mock_del_async.assert_called_once_with(
self.xform.pk,
self.user.pk,
[str(records_to_be_deleted[0].pk), str(records_to_be_deleted[1].pk)],
True,
)
# Permanent deletion
mock_del_async.reset_mock() # Reset mock
data = {"permanent_delete": True, "instance_ids": instance_ids}
request = self.factory.delete("/", data=data, **self.extra)
response = view(request, pk=self.xform.pk)

self.assertEqual(response.status_code, 200)
mock_del_async.assert_called_once_with(
self.xform.pk,
self.user.pk,
[str(records_to_be_deleted[0].pk), str(records_to_be_deleted[1].pk)],
False,
)


class TestOSM(TestAbstractViewSet):
"""
Expand Down
Loading

0 comments on commit 1cffdf3

Please sign in to comment.