Skip to content

Commit

Permalink
Include zarrs in unembargo_dandiset
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnesbitt committed Nov 4, 2024
1 parent 9cca565 commit e729824
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 46 deletions.
53 changes: 11 additions & 42 deletions dandiapi/api/services/embargo/__init__.py
Original file line number Diff line number Diff line change
@@ -1,67 +1,32 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import logging
from typing import TYPE_CHECKING

from botocore.config import Config
from django.conf import settings
from django.db import transaction
from more_itertools import chunked

from dandiapi.api.mail import send_dandiset_unembargoed_message
from dandiapi.api.models import AssetBlob, Dandiset, Version
from dandiapi.api.services import audit
from dandiapi.api.services.asset.exceptions import DandisetOwnerRequiredError
from dandiapi.api.services.embargo.utils import _delete_object_tags, remove_dandiset_embargo_tags
from dandiapi.api.services.exceptions import DandiError
from dandiapi.api.services.metadata import validate_version_metadata
from dandiapi.api.storage import get_boto_client
from dandiapi.api.tasks import unembargo_dandiset_task
from dandiapi.zarr.models import ZarrArchive

from .exceptions import (
AssetBlobEmbargoedError,
AssetTagRemovalError,
DandisetActiveUploadsError,
DandisetNotEmbargoedError,
)

if TYPE_CHECKING:
from django.contrib.auth.models import User
from mypy_boto3_s3 import S3Client


logger = logging.getLogger(__name__)
ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE = 5000


def _delete_asset_blob_tags(client: S3Client, blob: str):
client.delete_object_tagging(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME,
Key=blob,
)


# NOTE: In testing this took ~2 minutes for 100,000 files
def _remove_dandiset_asset_blob_embargo_tags(dandiset: Dandiset):
client = get_boto_client(config=Config(max_pool_connections=100))
embargoed_asset_blobs = (
AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=dandiset)
.values_list('blob', flat=True)
.iterator(chunk_size=ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
)

# Chunk the blobs so we're never storing a list of all embargoed blobs
chunks = chunked(embargoed_asset_blobs, ASSET_BLOB_TAG_REMOVAL_CHUNK_SIZE)
for chunk in chunks:
with ThreadPoolExecutor(max_workers=100) as e:
futures = [
e.submit(_delete_asset_blob_tags, client=client, blob=blob) for blob in chunk
]

# Check if any failed and raise exception if so
failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some blobs failed to remove tags', blobs=failed)


@transaction.atomic()
Expand All @@ -80,13 +45,17 @@ def unembargo_dandiset(ds: Dandiset, user: User):

# Remove tags in S3
logger.info('Removing tags...')
_remove_dandiset_asset_blob_embargo_tags(ds)
remove_dandiset_embargo_tags(ds)

# Update embargoed flag on asset blobs
updated = AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=ds).update(
# Update embargoed flag on asset blobs and zarrs
updated_blobs = AssetBlob.objects.filter(embargoed=True, assets__versions__dandiset=ds).update(
embargoed=False
)
logger.info('Updated %s asset blobs', updated)
updated_zarrs = ZarrArchive.objects.filter(
embargoed=True, assets__versions__dandiset=ds
).update(embargoed=False)
logger.info('Updated %s asset blobs', updated_blobs)
logger.info('Updated %s zarrs', updated_zarrs)

# Set status to OPEN
Dandiset.objects.filter(pk=ds.pk).update(embargo_status=Dandiset.EmbargoStatus.OPEN)
Expand Down Expand Up @@ -118,7 +87,7 @@ def remove_asset_blob_embargoed_tag(asset_blob: AssetBlob) -> None:
if asset_blob.embargoed:
raise AssetBlobEmbargoedError

_delete_asset_blob_tags(client=get_boto_client(), blob=asset_blob.blob.name)
_delete_object_tags(client=get_boto_client(), blob=asset_blob.blob.name)


def kickoff_dandiset_unembargo(*, user: User, dandiset: Dandiset):
Expand Down
103 changes: 103 additions & 0 deletions dandiapi/api/services/embargo/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
import logging
from typing import TYPE_CHECKING

from botocore.config import Config
from django.conf import settings
from django.db.models import Q
from more_itertools import chunked

from dandiapi.api.models.asset import Asset
from dandiapi.api.storage import get_boto_client
from dandiapi.zarr.models import zarr_s3_path

from .exceptions import AssetTagRemovalError

if TYPE_CHECKING:
from mypy_boto3_s3 import S3Client

from dandiapi.api.models.dandiset import Dandiset


logger = logging.getLogger(__name__)
TAG_REMOVAL_CHUNK_SIZE = 5000


def retry(times: int, exceptions: tuple[type[Exception]]):
"""
Retry Decorator.
Retries the wrapped function/method `times` times if the exceptions listed
in ``exceptions`` are thrown
:param times: The number of times to repeat the wrapped function/method
:param exceptions: Lists of exceptions that trigger a retry attempt
"""

def decorator(func):
def newfn(*args, **kwargs):
attempt = 0
while attempt < times:
try:
return func(*args, **kwargs)
except exceptions:
attempt += 1
return func(*args, **kwargs)

return newfn

return decorator


@retry(times=3, exceptions=(Exception,))
def _delete_object_tags(client: S3Client, blob: str):
client.delete_object_tagging(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME,
Key=blob,
)


@retry(times=3, exceptions=(Exception,))
def _delete_zarr_object_tags(client: S3Client, zarr: str):
paginator = client.get_paginator('list_objects_v2')
pages = paginator.paginate(
Bucket=settings.DANDI_DANDISETS_BUCKET_NAME, Prefix=zarr_s3_path(zarr_id=zarr)
)

with ThreadPoolExecutor(max_workers=100) as e:
for page in pages:
keys = [obj['Key'] for obj in page.get('Contents', [])]
futures = [e.submit(_delete_object_tags, client=client, blob=key) for key in keys]

# Check if any failed and raise exception if so
failed = [key for i, key in enumerate(keys) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some zarr files failed to remove tags', blobs=failed)


def remove_dandiset_embargo_tags(dandiset: Dandiset):
client = get_boto_client(config=Config(max_pool_connections=100))
embargoed_assets = (
Asset.objects.filter(versions__dandiset=dandiset)
.filter(Q(blob__embargoed=True) | Q(zarr__embargoed=True))
.values_list('blob__blob', 'zarr__zarr_id')
.iterator(chunk_size=TAG_REMOVAL_CHUNK_SIZE)
)

# Chunk the blobs so we're never storing a list of all embargoed blobs
chunks = chunked(embargoed_assets, TAG_REMOVAL_CHUNK_SIZE)
for chunk in chunks:
futures = []
with ThreadPoolExecutor(max_workers=100) as e:
for blob, zarr in chunk:
if blob is not None:
futures.append(e.submit(_delete_object_tags, client=client, blob=blob))
if zarr is not None:
futures.append(e.submit(_delete_zarr_object_tags, client=client, zarr=zarr))

# Check if any failed and raise exception if so
failed = [blob for i, blob in enumerate(chunk) if futures[i].exception() is not None]
if failed:
raise AssetTagRemovalError('Some assets failed to remove tags', blobs=failed)
13 changes: 9 additions & 4 deletions dandiapi/zarr/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
logger = logging.getLogger(name=__name__)


# TODO: Move this somewhere better?
def zarr_s3_path(zarr_id: str, zarr_path: str = ''):
return (
f'{settings.DANDI_DANDISETS_BUCKET_PREFIX}{settings.DANDI_ZARR_PREFIX_NAME}/'
f'{zarr_id}/{zarr_path}'
)


# The status of the zarr ingestion (checksums, size, file count)
class ZarrArchiveStatus(models.TextChoices):
PENDING = 'Pending'
Expand Down Expand Up @@ -76,10 +84,7 @@ def s3_url(self):

def s3_path(self, zarr_path: str) -> str:
"""Generate a full S3 object path from a path in this zarr_archive."""
return (
f'{settings.DANDI_DANDISETS_BUCKET_PREFIX}{settings.DANDI_ZARR_PREFIX_NAME}/'
f'{self.zarr_id}/{zarr_path}'
)
return zarr_s3_path(str(self.zarr_id), zarr_path)

def generate_upload_urls(self, path_md5s: list[dict]):
return [
Expand Down

0 comments on commit e729824

Please sign in to comment.