Skip to content

Commit

Permalink
wip: groups 1:1
Browse files Browse the repository at this point in the history
Signed-off-by: Kairo de Araujo <[email protected]>
  • Loading branch information
kairoaraujo committed Oct 20, 2024
1 parent ca394b6 commit 11e49c6
Showing 1 changed file with 39 additions and 31 deletions.
70 changes: 39 additions & 31 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,40 +168,48 @@ def bump_online_roles(expired: bool = False) -> None:
c = chain(_end_chain_callback.s(None, start_time))()
return

# if the LOCK_BEAT is already locked, it means that another task is running
# and we should skip this one
if repository._redis.lock("LOCK_BEAT").locked():
logging.info("LOCK_BEAT is already locked. Skipping bump_online_roles")
c = chain(_end_chain_callback.s(None, start_time))()
return

status_lock_targets = False
# Lock to avoid race conditions. See `LOCK_TIMEOUT` in the Worker
# development guide documentation.
try:
with repository._redis.lock("LOCK_TARGETS", repository._timeout):
chunks_size = 500
roles = repository.get_delegated_rolenames(expired=expired)
group_update_roles = _update_online_role.chunks(zip(roles), chunks_size).group()
# c = chain(
# group(_update_online_role.s(role) for role in roles)(),
# _update_snapshot_timestamp.s(),
# _end_chain_callback.s(start_time),
# )(queue="rstuf_internals")
c = chain(
group_update_roles,
_update_snapshot_timestamp.s(),
_end_chain_callback.s(start_time),
)(queue="rstuf_internals")
return c
except redis.exceptions.LockNotOwnedError:
# The LockNotOwnedError happens when the task exceeds the timeout,
# and another task owns the lock.
# If the task time out, the lock is released. If it doesn't finish
# properly, it will raise (fail) the task. Otherwise, the ignores
# the error because another task didn't lock it.
if status_lock_targets is False:
logging.error(
"The task to bump all online roles exceeded the timeout "
f"of {repository._timeout} seconds."
)
raise redis.exceptions.LockError(
f"RSTUF: Task exceed `LOCK_TIMEOUT` ({repository._timeout} "
"seconds)"
)
with repository._redis.lock("LOCK_BEAT"):
try:
with repository._redis.lock("LOCK_TARGETS", repository._timeout):
# chunks_size = 500
roles = repository.get_delegated_rolenames(expired=expired)
# group_update_roles = _update_online_role.chunks(zip(roles), chunks_size).group()
c = chain(
group(_update_online_role.s(role) for role in roles)(),
_update_snapshot_timestamp.s(),
_end_chain_callback.s(start_time),
)(queue="rstuf_internals")
# c = chain(
# group_update_roles,
# _update_snapshot_timestamp.s(),
# _end_chain_callback.s(start_time),
# )(queue="rstuf_internals")
return c
except redis.exceptions.LockNotOwnedError:
# The LockNotOwnedError happens when the task exceeds the timeout,
# and another task owns the lock.
# If the task time out, the lock is released. If it doesn't finish
# properly, it will raise (fail) the task. Otherwise, the ignores
# the error because another task didn't lock it.
if status_lock_targets is False:
logging.error(
"The task to bump all online roles exceeded the timeout "
f"of {repository._timeout} seconds."
)
raise redis.exceptions.LockError(
f"RSTUF: Task exceed `LOCK_TIMEOUT` ({repository._timeout} "
"seconds)"
)


def _publish_signals(
Expand Down

0 comments on commit 11e49c6

Please sign in to comment.