Skip to content

Commit

Permalink
Fix infinite lock for chunk preparing (#8769)
Browse files Browse the repository at this point in the history
<!-- Raise an issue to propose your change
(https://github.com/cvat-ai/cvat/issues).
It helps to avoid duplication of efforts from multiple independent
contributors.
Discuss your ideas with maintainers to be sure that changes will be
approved and merged.
Read the [Contribution guide](https://docs.cvat.ai/docs/contributing/).
-->

<!-- Provide a general summary of your changes in the Title above -->

### Motivation and context
<!-- Why is this change required? What problem does it solve? If it
fixes an open
issue, please link to the issue here. Describe your changes in detail,
add
screenshots. -->

### How has this been tested?
<!-- Please describe in detail how you tested your changes.
Include details of your testing environment, and the tests you ran to
see how your change affects other areas of the code, etc. -->

### Checklist
<!-- Go over all the following points, and put an `x` in all the boxes
that apply.
If an item isn't applicable for some reason, then ~~explicitly
strikethrough~~ the whole
line. If you don't do that, GitHub will show incorrect progress for the
pull request.
If you're unsure about any of these, don't hesitate to ask. We're here
to help! -->
- [x] I submit my changes into the `develop` branch
- [x] I have created a changelog fragment <!-- see top comment in
CHANGELOG.md -->
- [ ] I have updated the documentation accordingly
- [ ] I have added tests to cover my changes
- [ ] I have linked related issues (see [GitHub docs](

https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword))
- [ ] I have increased versions of npm packages if it is necessary

([cvat-canvas](https://github.com/cvat-ai/cvat/tree/develop/cvat-canvas#versioning),

[cvat-core](https://github.com/cvat-ai/cvat/tree/develop/cvat-core#versioning),

[cvat-data](https://github.com/cvat-ai/cvat/tree/develop/cvat-data#versioning)
and

[cvat-ui](https://github.com/cvat-ai/cvat/tree/develop/cvat-ui#versioning))

### License

- [x] I submit _my code changes_ under the same [MIT License](
https://github.com/cvat-ai/cvat/blob/develop/LICENSE) that covers the
project.
  Feel free to contact the maintainers if that's a concern.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **New Features**
- Introduced a new constant `CVAT_CHUNK_LOCK_TIMEOUT` to manage lock
acquisition duration during chunk cache operations.

- **Improvements**
- Enhanced concurrency handling in cache operations with a new locking
mechanism.
- Streamlined cache item creation by simplifying method logic and
reducing redundancy.

- **Bug Fixes**
- Updated method signatures to improve functionality and maintainability
of the caching mechanism.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
azhavoro authored Dec 6, 2024
1 parent b5c0971 commit a091d15
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Possible endless lock acquisition for chunk preparation job
(<https://github.com/cvat-ai/cvat/pull/8769>)
42 changes: 23 additions & 19 deletions cvat/apps/engine/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,11 @@ def enqueue_create_chunk_job(
rq_job_id: str,
create_callback: Callback,
*,
blocking_timeout: int = 50,
rq_job_result_ttl: int = 60,
rq_job_failure_ttl: int = 3600 * 24 * 14, # 2 weeks
) -> rq.job.Job:
try:
with get_rq_lock_for_job(queue, rq_job_id, blocking_timeout=blocking_timeout):
with get_rq_lock_for_job(queue, rq_job_id):
rq_job = queue.fetch_job(rq_job_id)

if not rq_job or (
Expand Down Expand Up @@ -205,11 +204,13 @@ def _get_or_set_cache_item(
cache_item_ttl=cache_item_ttl,
)

def _get_queue(self) -> rq.Queue:
return django_rq.get_queue(self._QUEUE_NAME)
@classmethod
def _get_queue(cls) -> rq.Queue:
return django_rq.get_queue(cls._QUEUE_NAME)

def _make_queue_job_id(self, key: str) -> str:
return f"{self._QUEUE_JOB_PREFIX_TASK}{key}"
@classmethod
def _make_queue_job_id(cls, key: str) -> str:
return f"{cls._QUEUE_JOB_PREFIX_TASK}{key}"

@staticmethod
def _drop_return_value(func: Callable[..., DataWithMime], *args: Any, **kwargs: Any):
Expand All @@ -228,7 +229,15 @@ def _create_and_set_cache_item(
item = (item_data[0], item_data[1], cls._get_checksum(item_data_bytes), timestamp)
if item_data_bytes:
cache = cls._cache()
cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout)
with get_rq_lock_for_job(
cls._get_queue(),
key,
):
cached_item = cache.get(key)
if cached_item is not None and timestamp <= cached_item[3]:
item = cached_item
else:
cache.set(key, item, timeout=cache_item_ttl or cache.default_timeout)

return item

Expand All @@ -239,22 +248,17 @@ def _create_cache_item(
*,
cache_item_ttl: Optional[int] = None,
) -> _CacheItem:

queue = self._get_queue()
rq_id = self._make_queue_job_id(key)

slogger.glob.info(f"Starting to prepare chunk: key {key}")
if _is_run_inside_rq():
with get_rq_lock_for_job(queue, rq_id, timeout=None, blocking_timeout=None):
item = self._create_and_set_cache_item(
key,
create_callback,
cache_item_ttl=cache_item_ttl,
)
item = self._create_and_set_cache_item(
key,
create_callback,
cache_item_ttl=cache_item_ttl,
)
else:
rq_job = enqueue_create_chunk_job(
queue=queue,
rq_job_id=rq_id,
queue=self._get_queue(),
rq_job_id=self._make_queue_job_id(key),
create_callback=Callback(
callable=self._drop_return_value,
args=[
Expand Down
5 changes: 4 additions & 1 deletion cvat/apps/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,11 @@ def get_rq_lock_by_user(queue: DjangoRQ, user_id: int, *, timeout: Optional[int]
)
return nullcontext()

def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str, *, timeout: Optional[int] = 60, blocking_timeout: Optional[int] = None) -> Lock:
def get_rq_lock_for_job(queue: DjangoRQ, rq_id: str, *, timeout: int = 60, blocking_timeout: int = 50) -> Lock:
# lock timeout corresponds to the nginx request timeout (proxy_read_timeout)

assert timeout is not None
assert blocking_timeout is not None
return queue.connection.lock(
name=f'lock-for-job-{rq_id}'.lower(),
timeout=timeout,
Expand Down

0 comments on commit a091d15

Please sign in to comment.