-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce cron jobs to clean up the export cache (data/cache/export/
) and temporary files/directories (data/tmp/
)
#8804
base: develop
Are you sure you want to change the base?
Changes from 59 commits
4723737
92f9d0b
1605bdc
e4c24ea
3412993
137386d
2cc395c
35137fc
028313f
5292d29
b18a008
2b25b10
8fec24a
81ec1fc
0a6dbdc
529b5c6
32ef451
504cd80
f505335
42c82f2
033db68
59536b6
685bdf4
52b0746
f830cbb
7962751
5b5d0fb
e01e483
f316a90
18a832e
5ee9082
1717013
4eedeff
4760fe7
3616a25
209e8fd
481540a
abbf3b5
7461d3d
071c493
3e42a3c
c658eb6
6963465
a82db50
ed7ae38
5708cd8
bdadb6c
820f63d
eaa3024
0f88933
7a1db7b
476c688
edd2268
ae68b9b
47c0f65
aab5350
716f13c
7abd5ab
f3c4120
15cf958
e393a10
606c382
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
### Added | ||
|
||
- Setting `TMP_FILE_OR_DIR_RETENTION_DAYS`, which defines maximum retention period | ||
of a file or dir in temporary directory | ||
(<https://github.com/cvat-ai/cvat/pull/8804>) | ||
- Cron job to remove outdated files and directories from CVAT tmp directory | ||
(<https://github.com/cvat-ai/cvat/pull/8804>) | ||
|
||
### Changed | ||
|
||
- Export cache cleaning moved to a separate cron job | ||
(<https://github.com/cvat-ai/cvat/pull/8804>) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
# Copyright (C) 2025 CVAT.ai Corporation | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
||
from __future__ import annotations | ||
|
||
import os | ||
import shutil | ||
from abc import ABCMeta, abstractmethod | ||
from datetime import timedelta | ||
from functools import wraps | ||
from pathlib import Path | ||
from threading import Event, Thread | ||
from time import sleep | ||
from typing import Callable, ClassVar | ||
|
||
from django.conf import settings | ||
from django.utils import timezone | ||
from django.utils.module_loading import import_string | ||
from rq import get_current_job | ||
|
||
from cvat.apps.dataset_manager.util import ( | ||
CacheFileOrDirPathParseError, | ||
ExportCacheManager, | ||
TmpDirManager, | ||
get_export_cache_lock, | ||
) | ||
from cvat.apps.dataset_manager.views import ( | ||
EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, | ||
EXPORT_CACHE_LOCK_TTL, | ||
get_export_cache_ttl, | ||
log_exception, | ||
) | ||
from cvat.apps.engine.log import ServerLogManager | ||
|
||
logger = ServerLogManager(__name__).glob | ||
|
||
|
||
def suppress_exceptions(func: Callable[[CleanupExportCacheThread], None]): | ||
@wraps(func) | ||
def wrapper(self: CleanupExportCacheThread): | ||
try: | ||
func(self) | ||
except Exception as ex: | ||
self.set_exception(ex) | ||
|
||
return wrapper | ||
|
||
|
||
def clear_export_cache(file_path: Path) -> bool: | ||
with get_export_cache_lock( | ||
file_path, | ||
block=True, | ||
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, | ||
ttl=EXPORT_CACHE_LOCK_TTL, | ||
): | ||
parsed_filename = ExportCacheManager.parse_filename(file_path.name) | ||
cache_ttl = get_export_cache_ttl(parsed_filename.instance_type) | ||
|
||
if timezone.now().timestamp() <= file_path.stat().st_mtime + cache_ttl.total_seconds(): | ||
logger.debug(f"Export cache file {file_path.name!r} was recently accessed") | ||
return False | ||
|
||
os.remove(file_path) | ||
logger.debug(f"Export cache file {file_path.name!r} was successfully removed") | ||
return True | ||
|
||
|
||
class BaseCleanupThread(Thread, metaclass=ABCMeta): | ||
description: ClassVar[str] | ||
|
||
def __init__(self, stop_event: Event, *args, **kwargs) -> None: | ||
self._stop_event = stop_event | ||
self._number_of_removed_objects = 0 | ||
self._exception = None | ||
super().__init__(*args, **kwargs, target=self._cleanup) | ||
|
||
@property | ||
def number_of_removed_objects(self) -> int: | ||
return self._number_of_removed_objects | ||
|
||
@abstractmethod | ||
def _cleanup(self) -> None: ... | ||
|
||
def set_exception(self, ex: Exception) -> None: | ||
assert isinstance(ex, Exception) | ||
self._exception = ex | ||
|
||
def raise_if_exception(self) -> None: | ||
if isinstance(self._exception, Exception): | ||
raise self._exception | ||
|
||
|
||
class CleanupTmpDirThread(BaseCleanupThread): | ||
description: ClassVar[str] = "Cleanup common temporary directory" | ||
|
||
@suppress_exceptions | ||
def _cleanup(self) -> None: | ||
# we do not use locks here when handling objects from tmp directory | ||
# because undesired race conditions are not possible here: | ||
# 1. A temporary file/directory can be removed while checking access time. | ||
# In that case an exception is expected and is handled by the cron process. | ||
# 2. A temporary file/directory can be removed by the cron job only when it is outdated. | ||
# 3. Each temporary file/directory has a unique name, so the race condition when one process is creating an object | ||
# and another is removing it - impossible. | ||
for child in os.scandir(TmpDirManager.TMP_ROOT): | ||
# stop clean up process correctly before rq job timeout is ended | ||
if self._stop_event.is_set(): | ||
return | ||
|
||
try: | ||
if ( | ||
child.stat().st_atime | ||
+ timedelta(days=TmpDirManager.TMP_FILE_OR_DIR_RETENTION_DAYS).total_seconds() | ||
< timezone.now().timestamp() | ||
): | ||
if child.is_dir(): | ||
shutil.rmtree(child.path) | ||
else: | ||
os.remove(child.path) | ||
logger.debug(f"The {child.name} was successfully removed") | ||
self._number_of_removed_objects += 1 | ||
except FileNotFoundError: | ||
# file or directory has been removed by another process | ||
continue | ||
except Exception: | ||
log_exception(logger) | ||
|
||
|
||
class CleanupExportCacheThread(BaseCleanupThread): | ||
description: ClassVar[str] = "Cleanup export cache" | ||
|
||
@suppress_exceptions | ||
def _cleanup(self) -> None: | ||
export_cache_dir_path = settings.EXPORT_CACHE_ROOT | ||
assert os.path.exists(export_cache_dir_path) | ||
|
||
for child in os.scandir(export_cache_dir_path): | ||
# stop clean up process correctly before rq job timeout is ended | ||
if self._stop_event.is_set(): | ||
return | ||
|
||
# export cache directory is expected to contain only files | ||
if not child.is_file(): | ||
logger.warning(f"The {child.name} is not a file, skipping...") | ||
continue | ||
|
||
try: | ||
if clear_export_cache(child): | ||
self._number_of_removed_objects += 1 | ||
except CacheFileOrDirPathParseError: | ||
logger.warning(f"Cannot parse {child.name}, skipping...") | ||
continue | ||
|
||
except Exception: | ||
log_exception(logger) | ||
|
||
|
||
def cleanup(thread_class_path: str) -> None: | ||
ThreadClass = import_string(thread_class_path) | ||
assert issubclass(ThreadClass, BaseCleanupThread) | ||
|
||
started_at = timezone.now() | ||
rq_job = get_current_job() | ||
seconds_left = rq_job.timeout - 60 | ||
sleep_interval = 10 | ||
assert seconds_left > sleep_interval | ||
finish_before = started_at + timedelta(seconds=seconds_left) | ||
|
||
stop_event = Event() | ||
cleanup_thread = ThreadClass(stop_event=stop_event) | ||
cleanup_thread.start() | ||
|
||
while timezone.now() < finish_before: | ||
if not cleanup_thread.is_alive(): | ||
stop_event.set() | ||
break | ||
sleep(sleep_interval) | ||
|
||
if not stop_event.is_set(): | ||
stop_event.set() | ||
|
||
cleanup_thread.join() | ||
cleanup_thread.raise_if_exception() | ||
|
||
finished_at = timezone.now() | ||
logger.info( | ||
f"The {cleanup_thread.description!r} process has been successfully " | ||
f"completed after {int((finished_at - started_at).total_seconds())} seconds. " | ||
f"{cleanup_thread.number_of_removed_objects} elements have been removed" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# Copyright (C) 2025 CVAT.ai Corporation | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
# Copyright (C) 2025 CVAT.ai Corporation | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
# Copyright (C) 2025 CVAT.ai Corporation | ||
# | ||
# SPDX-License-Identifier: MIT | ||
|
||
import os | ||
import shutil | ||
from pathlib import Path | ||
|
||
from django.core.management.base import BaseCommand | ||
from django.utils import timezone | ||
|
||
from cvat.apps.engine.models import Job, Project, Task | ||
|
||
|
||
class Command(BaseCommand): | ||
help = "Cleanup outdated export cache" | ||
|
||
def handle(self, *args, **options): | ||
def update_progress(): | ||
progress = (i + 1) / objects_count | ||
done = int(progress_bar_len * progress) | ||
progress_bar = "#" * done + "-" * (progress_bar_len - done) | ||
self.stdout.write(f"\rProgress: |{progress_bar}| {progress:.0%}", ending="") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might not work on mac and windows. Why not use tqdm? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ask Roman about it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @SpecLad, I saw you were saying it's not installed, but why not install it? Do you see any problems with that? |
||
|
||
now = timezone.now() | ||
progress_bar_len = os.get_terminal_size().columns // 2 | ||
|
||
for Model in (Project, Task, Job): | ||
self.stdout.write(f"\nDeleting the export cache for {Model.__name__.lower()}s...") | ||
queryset = Model.objects.filter(created_date__lt=now) | ||
objects_count = queryset.count() | ||
if objects_count < 1: | ||
continue | ||
|
||
msg = ( | ||
f"The {objects_count} folders are going to be checked" | ||
if objects_count > 1 | ||
else "The 1 folder is going to be checked" | ||
) | ||
self.stdout.write(msg) | ||
|
||
for i, obj in enumerate(queryset.iterator()): | ||
update_progress() | ||
export_cache_dir = Path(obj.get_dirname()) / "export_cache" | ||
if export_cache_dir.exists(): | ||
Marishka17 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
shutil.rmtree(export_cache_dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
python manage.py runperiodicjob cron_export_cache_cleanup
TypeError: cleanup() missing 1 required positional argument: 'thread_class_path'
Probably, default args should be passed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
runperiodicjob
command was added this week. This command should support passing args rather than setting the default value for thecleanup
function.because this command should be executed only from the worker process (by the current design). If you think that it will be useful to allow running this command not only by worker process, please provide reasons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a periodic job, and we have a command to run periodic jobs manually. Probably, it should support such execution or clarify why it doesn't.