Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry aggregate_assets_summary_task on version metadata race condition #1803

Merged
merged 1 commit into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dandiapi/api/services/metadata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dandiapi.api.services.metadata.exceptions import (
AssetHasBeenPublishedError,
VersionHasBeenPublishedError,
VersionMetadataConcurrentlyModified,
)
from dandiapi.api.services.publish import _build_publishable_version_from_draft

Expand Down Expand Up @@ -95,6 +96,7 @@ def version_aggregate_assets_summary(version: Version) -> None:
)
if updated_count == 0:
logger.info('Skipped updating assetsSummary for version %s', version.id)
raise VersionMetadataConcurrentlyModified
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this trigger a Sentry error? I imagine we wouldn't want that to happen in this case.

Copy link
Member Author

@jjnesbitt jjnesbitt Jan 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it will. I believe what happens is the following:

  1. This exception is raised
  2. Since the task is decorated with autoretry_for, it catches that exception and raises a celery Retry exception (documented here)
  3. Sentry (specific the celery integration) sees the Retry exception is raised, but ignores it, as it's recognized as part of celery's control flow. Here is the code that controls that.

So we should be good, but I'll do a little more testing to make sure things behave as we expect before merging.



def validate_version_metadata(*, version: Version) -> None:
Expand Down
5 changes: 5 additions & 0 deletions dandiapi/api/services/metadata/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ class AssetHasBeenPublishedError(DandiError):
class VersionHasBeenPublishedError(DandiError):
http_status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
message = 'This version has been published and cannot be modified.'


class VersionMetadataConcurrentlyModified(DandiError):
http_status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
message = 'The metadata for this version has been modified since the request began.'
7 changes: 6 additions & 1 deletion dandiapi/api/tasks/scheduled.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dandiapi.api.models import UserMetadata, Version
from dandiapi.api.models.asset import Asset
from dandiapi.api.services.metadata import version_aggregate_assets_summary
from dandiapi.api.services.metadata.exceptions import VersionMetadataConcurrentlyModified
from dandiapi.api.tasks import (
validate_asset_metadata_task,
validate_version_metadata_task,
Expand All @@ -43,7 +44,11 @@ def throttled_iterator(iterable: Iterable, max_per_second: int = 100) -> Iterabl
time.sleep(1 / max_per_second)


@shared_task(soft_time_limit=60)
@shared_task(
soft_time_limit=60,
autoretry_for=(VersionMetadataConcurrentlyModified,),
retry_backoff=True,
)
def aggregate_assets_summary_task(version_id: int):
version = Version.objects.get(id=version_id)
version_aggregate_assets_summary(version)
Expand Down
15 changes: 15 additions & 0 deletions dandiapi/api/tests/test_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import pytest

from dandiapi.api.services.metadata import version_aggregate_assets_summary
from dandiapi.api.services.metadata.exceptions import VersionMetadataConcurrentlyModified

if TYPE_CHECKING:
from rest_framework.test import APIClient
Expand Down Expand Up @@ -359,6 +360,20 @@ def test_version_aggregate_assets_summary(draft_version_factory, draft_asset_fac
assert version.metadata['assetsSummary']['schemaKey'] == 'AssetsSummary'


@pytest.mark.django_db()
def test_version_aggregate_assets_summary_metadata_modified(
draft_version_factory, draft_asset_factory
):
version = draft_version_factory(status=Version.Status.VALID)
asset = draft_asset_factory(status=Asset.Status.VALID)
version.assets.add(asset)

# Modify the metadata passed to the function so that it's mismatched
version.metadata['foo'] = 'bar'
with pytest.raises(VersionMetadataConcurrentlyModified):
version_aggregate_assets_summary(version)


@pytest.mark.django_db()
def test_version_size(
version,
Expand Down