Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, events: reduce memory usage when batch deleting objects #12436

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions authentik/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
)
from authentik.events.system_tasks import SystemTask, TaskStatus, prefill_task
from authentik.lib.config import CONFIG
from authentik.lib.utils.db import qs_batch_iter
from authentik.root.celery import CELERY_APP

LOGGER = get_logger()
Expand All @@ -34,14 +35,14 @@
cls.objects.all().exclude(expiring=False).exclude(expiring=True, expires__gt=now())
)
amount = objects.count()
for obj in objects:
for obj in qs_batch_iter(objects):

Check warning on line 38 in authentik/core/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/core/tasks.py#L38

Added line #L38 was not covered by tests
obj.expire_action()
LOGGER.debug("Expired models", model=cls, amount=amount)
messages.append(f"Expired {amount} {cls._meta.verbose_name_plural}")
# Special case
amount = 0

for session in AuthenticatedSession.objects.all():
for session in qs_batch_iter(AuthenticatedSession.objects.all()):

Check warning on line 45 in authentik/core/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/core/tasks.py#L45

Added line #L45 was not covered by tests
match CONFIG.get("session_storage", "cache"):
case "cache":
cache_key = f"{KEY_PREFIX}{session.session_key}"
Expand Down
7 changes: 5 additions & 2 deletions authentik/events/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
TaskStatus,
)
from authentik.events.system_tasks import SystemTask, prefill_task
from authentik.lib.utils.db import qs_batch_iter
from authentik.policies.engine import PolicyEngine
from authentik.policies.models import PolicyBinding, PolicyEngineMode
from authentik.root.celery import CELERY_APP
Expand Down Expand Up @@ -129,7 +130,8 @@
"""cleanup events from gdpr_compliance"""
events = Event.objects.filter(user__pk=user_pk)
LOGGER.debug("GDPR cleanup, removing events from user", events=events.count())
events.delete()
for event in qs_batch_iter(events):
event.delete()

Check warning on line 134 in authentik/events/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/events/tasks.py#L134

Added line #L134 was not covered by tests


@CELERY_APP.task(bind=True, base=SystemTask)
Expand All @@ -138,6 +140,7 @@
"""Cleanup seen notifications and notifications whose event expired."""
notifications = Notification.objects.filter(Q(event=None) | Q(seen=True))
amount = notifications.count()
notifications.delete()
for notification in qs_batch_iter(notifications):
notification.delete()

Check warning on line 144 in authentik/events/tasks.py

View check run for this annotation

Codecov / codecov/patch

authentik/events/tasks.py#L143-L144

Added lines #L143 - L144 were not covered by tests
LOGGER.debug("Expired notifications", amount=amount)
self.set_status(TaskStatus.SUCCESSFUL, f"Expired {amount} Notifications")
22 changes: 22 additions & 0 deletions authentik/lib/utils/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""authentik database utilities"""

import gc

from django.db.models import QuerySet


def qs_batch_iter(qs: QuerySet, batch_size: int = 10_000, gc_collect: bool = True):
pk_iter = qs.values_list("pk", flat=True).order_by("pk").distinct().iterator()
eof = False
while not eof:
pk_buffer = []
i = 0
try:
while i < batch_size:
pk_buffer.append(pk_iter.next())
i += 1

Check warning on line 17 in authentik/lib/utils/db.py

View check run for this annotation

Codecov / codecov/patch

authentik/lib/utils/db.py#L17

Added line #L17 was not covered by tests
except StopIteration:
eof = True
yield from qs.filter(pk__in=pk_buffer).order_by("pk").iterator()
if gc_collect:
gc.collect()

Check warning on line 22 in authentik/lib/utils/db.py

View check run for this annotation

Codecov / codecov/patch

authentik/lib/utils/db.py#L19-L22

Added lines #L19 - L22 were not covered by tests
Loading