From a091d15bcbeb5b2b47359bd12a44fcc5492a02e3 Mon Sep 17 00:00:00 2001 From: Andrey Zhavoronkov Date: Fri, 6 Dec 2024 15:52:38 +0300 Subject: [PATCH] Fix infinite lock for chunk preparing (#8769) ### Motivation and context ### How has this been tested? ### Checklist - [x] I submit my changes into the `develop` branch - [x] I have created a changelog fragment - [ ] I have updated the documentation accordingly - [ ] I have added tests to cover my changes - [ ] I have linked related issues (see [GitHub docs]( https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword)) - [ ] I have increased versions of npm packages if it is necessary ([cvat-canvas](https://github.com/cvat-ai/cvat/tree/develop/cvat-canvas#versioning), [cvat-core](https://github.com/cvat-ai/cvat/tree/develop/cvat-core#versioning), [cvat-data](https://github.com/cvat-ai/cvat/tree/develop/cvat-data#versioning) and [cvat-ui](https://github.com/cvat-ai/cvat/tree/develop/cvat-ui#versioning)) ### License - [x] I submit _my code changes_ under the same [MIT License]( https://github.com/cvat-ai/cvat/blob/develop/LICENSE) that covers the project. Feel free to contact the maintainers if that's a concern. ## Summary by CodeRabbit - **New Features** - Introduced a new constant `CVAT_CHUNK_LOCK_TIMEOUT` to manage lock acquisition duration during chunk cache operations. - **Improvements** - Enhanced concurrency handling in cache operations with a new locking mechanism. - Streamlined cache item creation by simplifying method logic and reducing redundancy. - **Bug Fixes** - Updated method signatures to improve functionality and maintainability of the caching mechanism. --- ...303_andrey_fix_infinite_lock_for_chunks.md | 4 ++ cvat/apps/engine/cache.py | 42 ++++++++++--------- cvat/apps/engine/utils.py | 5 ++- 3 files changed, 31 insertions(+), 20 deletions(-) create mode 100644 changelog.d/20241206_104303_andrey_fix_infinite_lock_for_chunks.md diff --git a/changelog.d/20241206_104303_andrey_fix_infinite_lock_for_chunks.md b/changelog.d/20241206_104303_andrey_fix_infinite_lock_for_chunks.md new file mode 100644 index 000000000000..7f11306a7cef --- /dev/null +++ b/changelog.d/20241206_104303_andrey_fix_infinite_lock_for_chunks.md @@ -0,0 +1,4 @@ +### Fixed + +- Possible endless lock acquisition for chunk preparation job + () diff --git a/cvat/apps/engine/cache.py b/cvat/apps/engine/cache.py index 30122a1826f0..0ecd7fcc010c 100644 --- a/cvat/apps/engine/cache.py +++ b/cvat/apps/engine/cache.py @@ -84,12 +84,11 @@ def enqueue_create_chunk_job( rq_job_id: str, create_callback: Callback, *, - blocking_timeout: int = 50, rq_job_result_ttl: int = 60, rq_job_failure_ttl: int = 3600 * 24 * 14, # 2 weeks ) -> rq.job.Job: try: - with get_rq_lock_for_job(queue, rq_job_id, blocking_timeout=blocking_timeout): + with get_rq_lock_for_job(queue, rq_job_id): rq_job = queue.fetch_job(rq_job_id) if not rq_job or ( @@ -205,11 +204,13 @@ def _get_or_set_cache_item( cache_item_ttl=cache_item_ttl, ) - def _get_queue(self) -> rq.Queue: - return django_rq.get_queue(self._QUEUE_NAME) + @classmethod + def _get_queue(cls) -> rq.Queue: + return django_rq.get_queue(cls._QUEUE_NAME) - def _make_queue_job_id(self, key: str) -> str: - return f"{self._QUEUE_JOB_PREFIX_TASK}{key}" + @classmethod + def _make_queue_job_id(cls, key: str) -> str: + return f"{cls._QUEUE_JOB_PREFIX_TASK}{key}" @staticmethod def _drop_return_value(func: Callable[..., DataWithMime], *args: Any, **kwargs: Any): @@ -228,7 +229,15 @@ def _create_and_set_cache_item( item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes), timestamp) if item_data_bytes: cache = cls._cache() - cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout) + with get_rq_lock_for_job( + cls._get_queue(), + key, + ): + cached_item = cache.get(key) + if cached_item is not None and timestamp <= cached_item[3]: + item = cached_item + else: + cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout) return item @@ -239,22 +248,17 @@ def _create_cache_item( *, cache_item_ttl: Optional[int] = None, ) -> _CacheItem: - - queue = self._get_queue() - rq_id = self._make_queue_job_id(key) - slogger.glob.info(f"Starting to prepare chunk: key {key}") if _is_run_inside_rq(): - with get_rq_lock_for_job(queue, rq_id, timeout=None, blocking_timeout=None): - item = self._create_and_set_cache_item( - key, - create_callback, - cache_item_ttl=cache_item_ttl, - ) + item = self._create_and_set_cache_item( + key, + create_callback, + cache_item_ttl=cache_item_ttl, + ) else: rq_job = enqueue_create_chunk_job( - queue=queue, - rq_job_id=rq_id, + queue=self._get_queue(), + rq_job_id=self._make_queue_job_id(key), create_callback=Callback( callable=self._drop_return_value, args=[ diff --git a/cvat/apps/engine/utils.py b/cvat/apps/engine/utils.py index 72cb52eb5168..59409ceb69dd 100644 --- a/cvat/apps/engine/utils.py +++ b/cvat/apps/engine/utils.py @@ -210,8 +210,11 @@ def get_rq_lock_by_user(queue: DjangoRQ, user_id: int, *, timeout: Optional[int] ) return nullcontext() -def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str, *, timeout: Optional[int] = 60, blocking_timeout: Optional[int] = None) -> Lock: +def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str, *, timeout: int = 60, blocking_timeout: int = 50) -> Lock: # lock timeout corresponds to the nginx request timeout (proxy_read_timeout) + + assert timeout is not None + assert blocking_timeout is not None return queue.connection.lock( name=f'lock-for-job-{rq_id}'.lower(), timeout=timeout,