Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cron jobs to clean up the export cache (data/cache/export/) and temporary files/directories (data/tmp/) #8804

Open
wants to merge 62 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
4723737
Move export cache cleaning into the cron jobs
Marishka17 Dec 9, 2024
92f9d0b
Refactor a bit
Marishka17 Dec 10, 2024
1605bdc
Merge branch 'develop' into mk/clear_cache_cron_job
Marishka17 Dec 10, 2024
e4c24ea
Remove outdated code
Marishka17 Dec 11, 2024
3412993
Add todo comments
Marishka17 Dec 11, 2024
137386d
Merge branch 'develop' into mk/clear_cache_cron_job
Marishka17 Dec 13, 2024
2cc395c
[Backups] clean up export cache from a cron job && use locks duiring …
Marishka17 Dec 17, 2024
35137fc
Merge branch 'develop' into mk/clear_cache_cron_job
Marishka17 Dec 17, 2024
028313f
Fix pylint issues
Marishka17 Dec 17, 2024
5292d29
Remove todo
Marishka17 Dec 17, 2024
b18a008
typo
Marishka17 Dec 17, 2024
2b25b10
Resolve conflicts
Marishka17 Dec 20, 2024
8fec24a
Update tests
Marishka17 Dec 20, 2024
81ec1fc
Update after merging #8721
Marishka17 Dec 20, 2024
0a6dbdc
Move touch_last_export_date() call into worker
Marishka17 Dec 20, 2024
529b5c6
[Unit tests] Check that cron job deletes files from cache
Marishka17 Dec 20, 2024
32ef451
Move functions into separate engine module
Marishka17 Dec 20, 2024
504cd80
Run project|task|job export cache cleaning at different times
Marishka17 Dec 20, 2024
f505335
Add new module to dev/format_python_code.sh
Marishka17 Dec 20, 2024
42c82f2
Update test_can_remove_export_cache_automatically_after_successful_ex…
Marishka17 Dec 23, 2024
033db68
Merge develop
Marishka17 Dec 23, 2024
59536b6
Apply a few comments
Marishka17 Dec 24, 2024
685bdf4
Apply comments
Marishka17 Dec 24, 2024
52b0746
Update cvat/apps/engine/cron.py
Marishka17 Dec 24, 2024
f830cbb
Do not pass logger arg into clear_export_cache
Marishka17 Dec 24, 2024
7962751
Remove unused imports
Marishka17 Dec 24, 2024
5b5d0fb
Add chnagelog
Marishka17 Dec 24, 2024
e01e483
Refactor a bit
Marishka17 Dec 24, 2024
f316a90
Switch to using a common export cache dir
Marishka17 Dec 26, 2024
18a832e
Fix pylint issues && use os.scandir
Marishka17 Dec 27, 2024
5ee9082
Return re usage
Marishka17 Dec 27, 2024
1717013
Update tests
Marishka17 Dec 27, 2024
4eedeff
Merge branch 'develop' into mk/clear_cache_cron_job
Marishka17 Dec 27, 2024
4760fe7
fix
Marishka17 Dec 27, 2024
3616a25
[doc] draft migration
Marishka17 Dec 27, 2024
209e8fd
pylint
Marishka17 Dec 27, 2024
481540a
Update documentation
Marishka17 Dec 30, 2024
abbf3b5
Move cron.py into dataset_manager app
Marishka17 Dec 31, 2024
7461d3d
Add EXPORT_CACHE_ROOT for unit tests
Marishka17 Dec 31, 2024
071c493
Use kwonly arguments
Marishka17 Dec 31, 2024
3e42a3c
Refactor CleanupExportCacheThread
Marishka17 Dec 31, 2024
c658eb6
Fix path in dev/format_python_code.sh
Marishka17 Dec 31, 2024
6963465
Clenup temp export dirs after failed unit tests
Marishka17 Dec 31, 2024
a82db50
Rename class
Marishka17 Jan 3, 2025
ed7ae38
[unit tests] Remove sleep usage
Marishka17 Jan 3, 2025
5708cd8
[Export logic] Use common tmp dir && create do not create tmp dir twice
Marishka17 Jan 7, 2025
bdadb6c
Resolve conflicts
Marishka17 Jan 7, 2025
820f63d
Sort imports
Marishka17 Jan 7, 2025
eaa3024
Fix merging issues
Marishka17 Jan 7, 2025
0f88933
Cron job to clean up tmp files/dirs && small fixes
Marishka17 Jan 8, 2025
7a1db7b
Add exportcachecleanup command
Marishka17 Jan 8, 2025
476c688
Refactor a bit
Marishka17 Jan 9, 2025
edd2268
Fix imports sorting
Marishka17 Jan 9, 2025
ae68b9b
Remove todo
Marishka17 Jan 9, 2025
47c0f65
Merge branch 'develop' into mk/clear_cache_cron_job
Marishka17 Jan 9, 2025
aab5350
Run CI
Marishka17 Jan 9, 2025
716f13c
Try to fix tests
Marishka17 Jan 9, 2025
7abd5ab
os.replace -> shutil.move
Marishka17 Jan 9, 2025
f3c4120
Write to self.stdout
Marishka17 Jan 10, 2025
15cf958
Reduce max lifetime of tmp files/dirs
Marishka17 Jan 10, 2025
e393a10
Suppress exception
Marishka17 Jan 10, 2025
606c382
Update doc
Marishka17 Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions changelog.d/20241224_150942_maria_clear_cache_cron_job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
### Added

- Setting `TMP_FILE_OR_DIR_RETENTION_DAYS`, which defines maximum retention period
of a file or dir in temporary directory
(<https://github.com/cvat-ai/cvat/pull/8804>)
- Cron job to remove outdated files and directories from CVAT tmp directory
(<https://github.com/cvat-ai/cvat/pull/8804>)

### Changed

- Export cache cleaning moved to a separate cron job
(<https://github.com/cvat-ai/cvat/pull/8804>)
191 changes: 191 additions & 0 deletions cvat/apps/dataset_manager/cron.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
# Copyright (C) 2025 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

from __future__ import annotations

import os
import shutil
from abc import ABCMeta, abstractmethod
from datetime import timedelta
from functools import wraps
from pathlib import Path
from threading import Event, Thread
from time import sleep
from typing import Callable, ClassVar

from django.conf import settings
from django.utils import timezone
from django.utils.module_loading import import_string
from rq import get_current_job

from cvat.apps.dataset_manager.util import (
CacheFileOrDirPathParseError,
ExportCacheManager,
TmpDirManager,
get_export_cache_lock,
)
from cvat.apps.dataset_manager.views import (
EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
EXPORT_CACHE_LOCK_TTL,
get_export_cache_ttl,
log_exception,
)
from cvat.apps.engine.log import ServerLogManager

logger = ServerLogManager(__name__).glob


def suppress_exceptions(func: Callable[[CleanupExportCacheThread], None]):
@wraps(func)
def wrapper(self: CleanupExportCacheThread):
try:
func(self)
except Exception as ex:
self.set_exception(ex)

return wrapper


def clear_export_cache(file_path: Path) -> bool:
with get_export_cache_lock(
file_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
ttl=EXPORT_CACHE_LOCK_TTL,
):
parsed_filename = ExportCacheManager.parse_filename(file_path.name)
cache_ttl = get_export_cache_ttl(parsed_filename.instance_type)

if timezone.now().timestamp() <= file_path.stat().st_mtime + cache_ttl.total_seconds():
logger.debug(f"Export cache file {file_path.name!r} was recently accessed")
return False

os.remove(file_path)
logger.debug(f"Export cache file {file_path.name!r} was successfully removed")
return True


class BaseCleanupThread(Thread, metaclass=ABCMeta):
description: ClassVar[str]

def __init__(self, stop_event: Event, *args, **kwargs) -> None:
self._stop_event = stop_event
self._number_of_removed_objects = 0
self._exception = None
super().__init__(*args, **kwargs, target=self._cleanup)

@property
def number_of_removed_objects(self) -> int:
return self._number_of_removed_objects

@abstractmethod
def _cleanup(self) -> None: ...

def set_exception(self, ex: Exception) -> None:
assert isinstance(ex, Exception)
self._exception = ex

def raise_if_exception(self) -> None:
if isinstance(self._exception, Exception):
raise self._exception


class CleanupTmpDirThread(BaseCleanupThread):
description: ClassVar[str] = "Cleanup common temporary directory"

@suppress_exceptions
def _cleanup(self) -> None:
# we do not use locks here when handling objects from tmp directory
# because undesired race conditions are not possible here:
# 1. A temporary file/directory can be removed while checking access time.
# In that case an exception is expected and is handled by the cron process.
# 2. A temporary file/directory can be removed by the cron job only when it is outdated.
# 3. Each temporary file/directory has a unique name, so the race condition when one process is creating an object
# and another is removing it - impossible.
for child in os.scandir(TmpDirManager.TMP_ROOT):
# stop clean up process correctly before rq job timeout is ended
if self._stop_event.is_set():
return

try:
if (
child.stat().st_atime
+ timedelta(days=TmpDirManager.TMP_FILE_OR_DIR_RETENTION_DAYS).total_seconds()
< timezone.now().timestamp()
):
if child.is_dir():
shutil.rmtree(child.path)
else:
os.remove(child.path)
logger.debug(f"The {child.name} was successfully removed")
self._number_of_removed_objects += 1
except FileNotFoundError:
# file or directory has been removed by another process
continue
except Exception:
log_exception(logger)


class CleanupExportCacheThread(BaseCleanupThread):
description: ClassVar[str] = "Cleanup export cache"

@suppress_exceptions
def _cleanup(self) -> None:
export_cache_dir_path = settings.EXPORT_CACHE_ROOT
assert os.path.exists(export_cache_dir_path)

for child in os.scandir(export_cache_dir_path):
# stop clean up process correctly before rq job timeout is ended
if self._stop_event.is_set():
return

# export cache directory is expected to contain only files
if not child.is_file():
logger.warning(f"The {child.name} is not a file, skipping...")
continue

try:
if clear_export_cache(child):
self._number_of_removed_objects += 1
except CacheFileOrDirPathParseError:
logger.warning(f"Cannot parse {child.name}, skipping...")
continue

except Exception:
log_exception(logger)


def cleanup(thread_class_path: str) -> None:
Copy link
Contributor

@zhiltsov-max zhiltsov-max Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python manage.py runperiodicjob cron_export_cache_cleanup

TypeError: cleanup() missing 1 required positional argument: 'thread_class_path'

    seconds_left = rq_job.timeout - 60
AttributeError: 'NoneType' object has no attribute 'timeout'

Probably, default args should be passed.

Copy link
Contributor Author

@Marishka17 Marishka17 Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The runperiodicjob command was added this week. This command should support passing args rather than setting the default value for the cleanup function.

AttributeError: 'NoneType' object has no attribute 'timeout'

because this command should be executed only from the worker process (by the current design). If you think that it will be useful to allow running this command not only by worker process, please provide reasons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a periodic job, and we have a command to run periodic jobs manually. Probably, it should support such execution or clarify why it doesn't.

ThreadClass = import_string(thread_class_path)
assert issubclass(ThreadClass, BaseCleanupThread)

started_at = timezone.now()
rq_job = get_current_job()
seconds_left = rq_job.timeout - 60
sleep_interval = 10
assert seconds_left > sleep_interval
finish_before = started_at + timedelta(seconds=seconds_left)

stop_event = Event()
cleanup_thread = ThreadClass(stop_event=stop_event)
cleanup_thread.start()

while timezone.now() < finish_before:
if not cleanup_thread.is_alive():
stop_event.set()
break
sleep(sleep_interval)

if not stop_event.is_set():
stop_event.set()

cleanup_thread.join()
cleanup_thread.raise_if_exception()

finished_at = timezone.now()
logger.info(
f"The {cleanup_thread.description!r} process has been successfully "
f"completed after {int((finished_at - started_at).total_seconds())} seconds. "
f"{cleanup_thread.number_of_removed_objects} elements have been removed"
)
4 changes: 4 additions & 0 deletions cvat/apps/dataset_manager/management/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (C) 2025 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

4 changes: 4 additions & 0 deletions cvat/apps/dataset_manager/management/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (C) 2025 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright (C) 2025 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

import os
import shutil
from pathlib import Path

from django.core.management.base import BaseCommand
from django.utils import timezone

from cvat.apps.engine.models import Job, Project, Task


class Command(BaseCommand):
help = "Cleanup outdated export cache"

def handle(self, *args, **options):
def update_progress():
progress = (i + 1) / objects_count
done = int(progress_bar_len * progress)
progress_bar = "#" * done + "-" * (progress_bar_len - done)
self.stdout.write(f"\rProgress: |{progress_bar}| {progress:.0%}", ending="")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not work on mac and windows. Why not use tqdm?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ask Roman about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SpecLad, I saw you were saying it's not installed, but why not install it? Do you see any problems with that?


now = timezone.now()
progress_bar_len = os.get_terminal_size().columns // 2

for Model in (Project, Task, Job):
self.stdout.write(f"\nDeleting the export cache for {Model.__name__.lower()}s...")
queryset = Model.objects.filter(created_date__lt=now)
objects_count = queryset.count()
if objects_count < 1:
continue

msg = (
f"The {objects_count} folders are going to be checked"
if objects_count > 1
else "The 1 folder is going to be checked"
)
self.stdout.write(msg)

for i, obj in enumerate(queryset.iterator()):
update_progress()
export_cache_dir = Path(obj.get_dirname()) / "export_cache"
if export_cache_dir.exists():
Marishka17 marked this conversation as resolved.
Show resolved Hide resolved
shutil.rmtree(export_cache_dir)
40 changes: 28 additions & 12 deletions cvat/apps/dataset_manager/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
#
# SPDX-License-Identifier: MIT

import os
import io
from collections.abc import Mapping
from tempfile import TemporaryDirectory
from contextlib import nullcontext
from typing import Any, Callable

import rq
Expand All @@ -14,6 +14,7 @@
from django.db import transaction

from cvat.apps.dataset_manager.task import TaskAnnotation
from cvat.apps.dataset_manager.util import TmpDirManager
from cvat.apps.engine import models
from cvat.apps.engine.log import DatasetLogManager
from cvat.apps.engine.rq_job_handler import RQJobMetaField
Expand All @@ -26,8 +27,15 @@

dlogger = DatasetLogManager()

def export_project(project_id, dst_file, format_name,
server_url=None, save_images=False):
def export_project(
project_id: int,
dst_file: str,
*,
format_name: str,
server_url: str | None = None,
save_images: bool = False,
temp_dir: str | None = None,
):
# For big tasks dump function may run for a long time and
# we dont need to acquire lock after the task has been initialized from DB.
# But there is the bug with corrupted dump file in case 2 or
Expand All @@ -39,7 +47,7 @@ def export_project(project_id, dst_file, format_name,

exporter = make_exporter(format_name)
with open(dst_file, 'wb') as f:
project.export(f, exporter, host=server_url, save_images=save_images)
project.export(f, exporter, host=server_url, save_images=save_images, temp_dir=temp_dir)

class ProjectAnnotationAndData:
def __init__(self, pk: int):
Expand Down Expand Up @@ -131,16 +139,26 @@ def init_from_db(self):
self.task_annotations[task.id] = annotation
self.annotation_irs[task.id] = annotation.ir_data

def export(self, dst_file: str, exporter: Callable, host: str='', **options):
def export(
self,
dst_file: io.BufferedWriter,
exporter: Callable[..., None],
*,
host: str = '',
temp_dir: str | None = None,
**options
):
project_data = ProjectData(
annotation_irs=self.annotation_irs,
db_project=self.db_project,
host=host
)

temp_dir_base = self.db_project.get_tmp_dirname()
os.makedirs(temp_dir_base, exist_ok=True)
zhiltsov-max marked this conversation as resolved.
Show resolved Hide resolved
with TemporaryDirectory(dir=temp_dir_base) as temp_dir:
with (
TmpDirManager.get_tmp_directory_for_export(
instance_type=self.db_project.__class__.__name__,
) if not temp_dir else nullcontext(temp_dir)
) as temp_dir:
exporter(dst_file, temp_dir, project_data, **options)

def load_dataset_data(self, *args, **kwargs):
Expand All @@ -155,9 +173,7 @@ def import_dataset(self, dataset_file, importer, **options):
)
project_data.soft_attribute_import = True

temp_dir_base = self.db_project.get_tmp_dirname()
os.makedirs(temp_dir_base, exist_ok=True)
with TemporaryDirectory(dir=temp_dir_base) as temp_dir:
with TmpDirManager.get_tmp_directory() as temp_dir:
try:
importer(dataset_file, temp_dir, project_data, load_data_callback=self.load_dataset_data, **options)
except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found:
Expand Down
Loading
Loading