Skip to content

Commit

Permalink
Merge branch 'db-connection-errors' into 'main'
Browse files Browse the repository at this point in the history
Db connection errors

See merge request reportcreator/reportcreator!815
  • Loading branch information
MWedl committed Jan 2, 2025
2 parents fb99401 + 887d63a commit 63966e2
Show file tree
Hide file tree
Showing 10 changed files with 986 additions and 44 deletions.
2 changes: 1 addition & 1 deletion api/src/reportcreator_api/api_utils/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async def get(self, request, *args, **kwargs):

if res.status_code == 200:
# Run periodic tasks
run_in_background(PeriodicTask.objects.run_all_pending_tasks())
run_in_background(PeriodicTask.objects.run_all_pending_tasks)()

# Memory cleanup of worker process
gc.collect()
Expand Down
16 changes: 11 additions & 5 deletions api/src/reportcreator_api/notifications/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
from datetime import timedelta

import httpx
Expand All @@ -7,7 +8,7 @@
from django.core.serializers.json import DjangoJSONEncoder

from reportcreator_api.notifications.serializers import NotificationSpecSerializer
from reportcreator_api.tasks.models import periodic_task
from reportcreator_api.tasks.models import TaskStatus, periodic_task
from reportcreator_api.utils import license


Expand All @@ -31,8 +32,13 @@ async def fetch_notifications(task_info):
if not settings.NOTIFICATION_IMPORT_URL:
return

data = await fetch_notifications_request()
serializer = NotificationSpecSerializer(data=data, many=True)
serializer.is_valid(raise_exception=True)
await sync_to_async(serializer.save)()
try:
data = await fetch_notifications_request()
serializer = NotificationSpecSerializer(data=data, many=True)
serializer.is_valid(raise_exception=True)
await sync_to_async(serializer.save)()
return TaskStatus.SUCCESS
except httpx.TransportError as ex:
logging.warning(f'Failed to fetch notifications: {ex}. Check your internet connection.')
return TaskStatus.FAILED

2 changes: 1 addition & 1 deletion api/src/reportcreator_api/tasks/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def unregister(self, task: PeriodicTaskSpec):

def periodic_task(schedule: timedelta, id: str|None = None):
def inner(func):
periodic_task_registry.register( PeriodicTaskSpec(
periodic_task_registry.register(PeriodicTaskSpec(
id=id or f'{func.__module__}.{func.__name__}',
schedule=schedule,
func=func,
Expand Down
15 changes: 9 additions & 6 deletions api/src/reportcreator_api/tasks/querysets.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,19 @@ async def run_task(self, task_info):
try:
async with elasticapm.async_capture_span(task_info.id):
if iscoroutinefunction(task_info.spec.func):
await task_info.spec.func(task_info)
res = await task_info.spec.func(task_info)
else:
await sync_to_async(task_info.spec.func)(task_info)
task_info.model.status = TaskStatus.SUCCESS
task_info.model.last_success = timezone.now()
task_info.model.completed = task_info.model.last_success
res = await sync_to_async(task_info.spec.func)(task_info)
task_info.model.status = res if isinstance(res, TaskStatus) else TaskStatus.SUCCESS
except Exception:
logging.exception(f'Error while running periodic task "{task_info.id}"')
task_info.model.status = TaskStatus.FAILED
task_info.model.completed = timezone.now()

# Set completed time
task_info.model.completed = timezone.now()
if task_info.model.status == TaskStatus.SUCCESS:
task_info.model.last_success = task_info.model.completed

log.info(f'Completed periodic task "{task_info.id}" with status "{task_info.model.status}"')

await task_info.model.asave()
Expand Down
4 changes: 2 additions & 2 deletions api/src/reportcreator_api/tasks/rendering/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ async def weasyprint_start_process():

@log_timing(log_start=True)
async def weasyprint_render_to_pdf(proc, **kwargs) -> RenderStageResult:
@sync_to_async
@sync_to_async()
def encode_data():
return json.dumps(kwargs, cls=DjangoJSONEncoder).encode()

@sync_to_async
@sync_to_async()
def decode_data(stdout):
return RenderStageResult.from_dict(json.loads(stdout.decode()))

Expand Down
27 changes: 22 additions & 5 deletions api/src/reportcreator_api/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import asyncio
import logging
import uuid
from datetime import date
from itertools import groupby
from typing import Any, Iterable, OrderedDict

from asgiref.sync import sync_to_async
from django.db import close_old_connections, connections
from django.utils import dateparse, timezone


Expand Down Expand Up @@ -137,8 +140,22 @@ def groupby_to_dict(data: dict, key) -> dict:


_background_tasks = set()
def run_in_background(coro):
task = asyncio.create_task(coro)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)

def run_in_background(func):
def inner(*args, **kwargs):
@sync_to_async()
def task_finished():
if not connections['default'].in_atomic_block:
close_old_connections()

async def wrapper():
try:
await func(*args, **kwargs)
except Exception:
logging.exception(f'Error while running run_in_background({func.__name__})')
finally:
await task_finished()

task = asyncio.create_task(wrapper())
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
return inner
Loading

0 comments on commit 63966e2

Please sign in to comment.