Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(exports): tasks stuck in processing TASK-1243 #5436

Merged
merged 11 commits into from
Jan 28, 2025
Merged
35 changes: 21 additions & 14 deletions kobo/apps/audit_log/views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.db import transaction
from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action
from rest_framework.renderers import BrowsableAPIRenderer, JSONRenderer
Expand Down Expand Up @@ -618,12 +619,14 @@ def export(self, request, *args, **kwargs):
'type': 'project_history_logs_export',
},
)

export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
)
)

return Response(
{f'status: {export_task.status}'},
status=status.HTTP_202_ACCEPTED,
Expand Down Expand Up @@ -961,10 +964,12 @@ def export(self, request, *args, **kwargs):
},
)

export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
)
)
return Response(
{f'status: {export_task.status}'},
Expand All @@ -985,12 +990,14 @@ def create_task(self, request, get_all_logs):
'type': 'access_logs_export',
},
)

export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.AccessLogExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.AccessLogExportTask',
)
)

return Response(
{f'status: {export_task.status}'},
status=status.HTTP_202_ACCEPTED,
Expand Down
23 changes: 10 additions & 13 deletions kobo/apps/project_views/views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Union, Optional
from typing import Optional, Union

from django.conf import settings
from django.db import transaction
from django.db.models.query import QuerySet
from django.http import Http404
from rest_framework import viewsets
Expand All @@ -9,10 +10,7 @@

from kobo.apps.kobo_auth.shortcuts import User
from kpi.constants import ASSET_TYPE_SURVEY
from kpi.filters import (
AssetOrderingFilter,
SearchFilter,
)
from kpi.filters import AssetOrderingFilter, SearchFilter
from kpi.mixins.asset import AssetViewSetListMixin
from kpi.mixins.object_permission import ObjectPermissionViewSetMixin
from kpi.models import Asset, ProjectViewExportTask
Expand All @@ -22,10 +20,7 @@
from kpi.serializers.v2.user import UserListSerializer
from kpi.tasks import export_task_in_background
from kpi.utils.object_permission import get_database_user
from kpi.utils.project_views import (
get_region_for_view,
user_has_view_perms,
)
from kpi.utils.project_views import get_region_for_view, user_has_view_perms
from .models.project_view import ProjectView
from .serializers import ProjectViewSerializer

Expand Down Expand Up @@ -110,10 +105,12 @@ def export(self, request, uid, obj_type):
)

# Have Celery run the export in the background
export_task_in_background.delay(
export_task_uid=export_task.uid,
username=user.username,
export_task_name='kpi.ProjectViewExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=user.username,
export_task_name='kpi.ProjectViewExportTask',
)
)

return Response({'status': export_task.status})
Expand Down
2 changes: 2 additions & 0 deletions kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,8 @@ def __init__(self, *args, **kwargs):
# REMOVE the oldest if a user exceeds this many exports for a particular form
MAXIMUM_EXPORTS_PER_USER_PER_FORM = 10

MAX_RETRIES_FOR_IMPORT_EXPORT_TASK = 10

# Private media file configuration
PRIVATE_STORAGE_ROOT = os.path.join(BASE_DIR, 'media')
PRIVATE_STORAGE_AUTH_FUNCTION = \
Expand Down
15 changes: 9 additions & 6 deletions kpi/serializers/v2/export_task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# coding: utf-8
from typing import Optional

from django.db import transaction
from django.utils.translation import gettext as t
from rest_framework import serializers
from rest_framework.request import Request
from rest_framework.reverse import reverse
from formpack.constants import (
EXPORT_SETTING_FIELDS,
EXPORT_SETTING_FIELDS_FROM_ALL_VERSIONS,
Expand All @@ -26,9 +24,12 @@
VALID_EXPORT_TYPES,
VALID_MULTIPLE_SELECTS,
)
from rest_framework import serializers
from rest_framework.request import Request
from rest_framework.reverse import reverse

from kpi.fields import ReadOnlyJSONField
from kpi.models import SubmissionExportTask, Asset
from kpi.models import Asset, SubmissionExportTask
from kpi.tasks import export_in_background
from kpi.utils.export_task import format_exception_values
from kpi.utils.object_permission import get_database_user
Expand Down Expand Up @@ -65,7 +66,9 @@ def create(self, validated_data: dict) -> SubmissionExportTask:
user=user, data=validated_data
)
# Have Celery run the export in the background
export_in_background.delay(export_task_uid=export_task.uid)
transaction.on_commit(
lambda: export_in_background.delay(export_task_uid=export_task.uid)
)

return export_task

Expand Down Expand Up @@ -152,7 +155,7 @@ def validate_fields(self, data: dict) -> list:
{EXPORT_SETTING_FIELDS: t('Must be an array')}
)

if not all((isinstance(field, str) for field in fields)):
if not all(isinstance(field, str) for field in fields):
raise serializers.ValidationError(
{
EXPORT_SETTING_FIELDS: t(
Expand Down
20 changes: 17 additions & 3 deletions kpi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.apps import apps
from django.conf import settings
from django.core import mail
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import call_command

from kobo.apps.kobo_auth.shortcuts import User
Expand All @@ -15,25 +16,38 @@
from kpi.models.import_export_task import ImportTask, SubmissionExportTask


@celery_app.task
@celery_app.task(
autoretry_for=(ObjectDoesNotExist,),
max_retries=settings.MAX_RETRIES_FOR_IMPORT_EXPORT_TASK,
retry_backoff=True,
)
def import_in_background(import_task_uid):
import_task = ImportTask.objects.get(uid=import_task_uid)
import_task.run()
return import_task.uid


@celery_app.task
@celery_app.task(
autoretry_for=(ObjectDoesNotExist,),
max_retries=settings.MAX_RETRIES_FOR_IMPORT_EXPORT_TASK,
retry_backoff=True,
)
def export_in_background(export_task_uid):
export_task = SubmissionExportTask.objects.get(uid=export_task_uid)
export_task.run()


@celery_app.task
@celery_app.task(
autoretry_for=(ObjectDoesNotExist,),
max_retries=settings.MAX_RETRIES_FOR_IMPORT_EXPORT_TASK,
retry_backoff=True,
)
def export_task_in_background(
export_task_uid: str, username: str, export_task_name: str
) -> None:
user = User.objects.get(username=username)
export_task_class = apps.get_model(export_task_name)

export_task = export_task_class.objects.get(uid=export_task_uid)
export = export_task.run()
if export.status == 'complete' and export.result:
Expand Down
8 changes: 5 additions & 3 deletions kpi/tests/api/v1/test_api_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from urllib.parse import unquote_plus

from django.urls import reverse
from formpack.utils.expand_content import SCHEMA_VERSION
from rest_framework import status
from rest_framework.authtoken.models import Token

from formpack.utils.expand_content import SCHEMA_VERSION
from kobo.apps.kobo_auth.shortcuts import User
from kpi.constants import ASSET_TYPE_COLLECTION
from kpi.models import Asset, SubmissionExportTask
Expand All @@ -18,6 +18,7 @@
from kpi.tests.api.v2 import test_api_assets
from kpi.tests.base_test_case import BaseTestCase
from kpi.tests.kpi_test_case import KpiTestCase
from kpi.tests.utils.transaction import immediate_on_commit
from kpi.utils.xml import check_lxml_fromstring

EMPTY_SURVEY = {'survey': [], 'schema': SCHEMA_VERSION, 'settings': {}}
Expand Down Expand Up @@ -296,8 +297,9 @@ def test_owner_can_create_export(self):
'source': asset_url,
'type': 'csv',
}
# Create the export task
response = self.client.post(post_url, task_data)
with immediate_on_commit():
# Create the export task
response = self.client.post(post_url, task_data)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# Task should complete right away due to `CELERY_TASK_ALWAYS_EAGER`
detail_response = self.client.get(response.data['url'])
Expand Down
13 changes: 8 additions & 5 deletions kpi/tests/api/v2/test_api_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
PERM_VIEW_ASSET,
PERM_VIEW_SUBMISSIONS,
)
from kpi.models import Asset, SubmissionExportTask, AssetExportSettings
from kpi.models import Asset, AssetExportSettings, SubmissionExportTask
from kpi.tests.base_test_case import BaseTestCase
from kpi.tests.test_mock_data_exports import MockDataExportsBase
from kpi.tests.utils.transaction import immediate_on_commit
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE
from kpi.utils.object_permission import get_anonymous_user

Expand Down Expand Up @@ -335,9 +336,10 @@ def test_synchronous_csv_export_matches_async_export(self):
self._get_endpoint('asset-export-list'),
kwargs={'format': 'json', 'parent_lookup_asset': self.asset.uid},
)
exports_list_response = self.client.post(
exports_list_url, data=es.export_settings
)
with immediate_on_commit():
exports_list_response = self.client.post(
exports_list_url, data=es.export_settings
)
assert exports_list_response.status_code == status.HTTP_201_CREATED

exports_detail_response = self.client.get(
Expand Down Expand Up @@ -473,7 +475,8 @@ def test_export_asset_with_slashes(self):
'fields_from_all_versions': 'false',
'multiple_select': 'both',
}
response = self.client.post(list_url, data=data)
with immediate_on_commit():
response = self.client.post(list_url, data=data)
assert response.status_code == status.HTTP_201_CREATED
export_response = self.client.get(response.data['url'])
filepath = export_response.data['result']
Expand Down
6 changes: 5 additions & 1 deletion kpi/views/v1/export_task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding: utf-8
from django.db import transaction
from django.db.models import TextField
from django.db.models.functions import Cast
from rest_framework import exceptions, serializers, status
Expand Down Expand Up @@ -211,7 +212,10 @@ def create(self, request, *args, **kwargs):
user=request.user, data=task_data
)
# Have Celery run the export in the background
export_in_background.delay(export_task_uid=export_task.uid)
transaction.on_commit(
lambda: export_in_background.delay(export_task_uid=export_task.uid)
)

return Response({
'uid': export_task.uid,
'url': reverse(
Expand Down