Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dcas error handling #385

Merged
merged 13 commits into from
Jan 28, 2025
25 changes: 19 additions & 6 deletions django_project/dcas/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
.. note:: Admin for DCAS Models
"""

from import_export.admin import ExportMixin
from import_export_celery.admin_actions import create_export_job_action
from django.contrib import admin

from dcas.models import (
Expand All @@ -17,6 +19,7 @@
GDDConfig,
GDDMatrix
)
from dcas.resources import DCASErrorLogResource


class ConfigByCountryInline(admin.TabularInline):
Expand Down Expand Up @@ -65,13 +68,23 @@ class DCASOutputAdmin(admin.ModelAdmin):


@admin.register(DCASErrorLog)
class DCASErrorLogAdmin(admin.ModelAdmin):
"""Admin page for DCASErrorLog."""
class DCASErrorLogAdmin(ExportMixin, admin.ModelAdmin):
"""Admin class for DCASErrorLog model."""

list_display = ('logged_at', 'request', 'farm_id', 'error_message')
list_filter = ('request', 'farm_id')
search_fields = ('error_message',)
ordering = ('-logged_at',)
resource_class = DCASErrorLogResource
actions = [create_export_job_action]

list_display = (
"id",
"request_id",
"farm_id",
"error_type",
"error_message",
"logged_at",
)

search_fields = ("error_message", "farm_id", "request__id")
list_filter = ("error_type", "logged_at")

# GDD Config and Matrix

Expand Down
18 changes: 18 additions & 0 deletions django_project/dcas/migrations/0006_dcaserrorlog_error_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.7 on 2025-01-22 22:28

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('dcas', '0005_alter_gddmatrix_unique_together'),
]

operations = [
migrations.AddField(
model_name='dcaserrorlog',
name='error_type',
field=models.CharField(choices=[('MISSING_MESSAGES', 'Missing Messages'), ('PROCESSING_FAILURE', 'Processing Failure'), ('OTHER', 'Other')], default='OTHER', help_text='The type of error encountered.', max_length=50),
),
]
23 changes: 23 additions & 0 deletions django_project/dcas/models/error_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@
from dcas.models.request import DCASRequest


class DCASErrorType(models.TextChoices):
"""Enum for error types in DCAS ErrorLog."""

MISSING_MESSAGES = "MISSING_MESSAGES", _("Missing Messages")
PROCESSING_FAILURE = "PROCESSING_FAILURE", _("Processing Failure")
OTHER = "OTHER", _("Other")


class DCASErrorLog(models.Model):
"""Model to store farms that cannot be processed."""

Expand All @@ -25,6 +33,12 @@
Farm, on_delete=models.CASCADE,
help_text="The unique identifier of the farm that failed to process."
)
error_type = models.CharField(
max_length=50,
choices=DCASErrorType.choices,
default=DCASErrorType.OTHER,
help_text="The type of error encountered."
)
error_message = models.TextField(
help_text="Details about why the farm could not be processed."
)
Expand All @@ -39,3 +53,12 @@
db_table = 'dcas_error_log'
verbose_name = _('Error Log')
ordering = ['-logged_at']

@classmethod
def export_resource_classes(cls):
"""Export resource classes for import-export."""
from dcas.resources import DCASErrorLogResource

Check warning on line 60 in django_project/dcas/models/error_log.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/models/error_log.py#L60

Added line #L60 was not covered by tests

return {

Check warning on line 62 in django_project/dcas/models/error_log.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/models/error_log.py#L62

Added line #L62 was not covered by tests
"DCASErrorLog": ("DCASErrorLog Resource", DCASErrorLogResource)
}
39 changes: 39 additions & 0 deletions django_project/dcas/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,42 @@
df = conndb.sql(query).df()
conndb.close()
return df

def get_farms_without_messages(parquet_path: str, chunk_size: int = 500):
"""
Fetch farms without advisory messages using chunked processing.

:param parquet_path: Path to the final Parquet file.
:type parquet_path: str
:param chunk_size: Number of records per chunk (default: 500).
:type chunk_size: int
:return: Generator yielding Pandas DataFrames in chunks.
:rtype: Generator[pd.DataFrame]
"""
conn = duckdb.connect()
offset = 0 # Start at the beginning

try:
while True:
query = f"""
SELECT farm_id, crop_id
FROM read_parquet('{parquet_path}')
WHERE message IS NULL
AND message_2 IS NULL
AND message_3 IS NULL
AND message_4 IS NULL
AND message_5 IS NULL
LIMIT {chunk_size} OFFSET {offset}
"""
df = conn.sql(query).df()

if df.empty:
break # Stop when there are no more records

Check warning on line 315 in django_project/dcas/queries.py

View check run for this annotation

Codecov / codecov/patch

django_project/dcas/queries.py#L315

Added line #L315 was not covered by tests

yield df # Yield the chunk
offset += chunk_size # Move to the next batch

except Exception as e:
print(f"Error querying Parquet: {str(e)}")
finally:
conn.close()
45 changes: 45 additions & 0 deletions django_project/dcas/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: Resource class for DCASErrorLog
"""

from import_export.fields import Field
from import_export.resources import ModelResource
from dcas.models import DCASErrorLog


class DCASErrorLogResource(ModelResource):
"""Resource class for DCASErrorLog."""

request_id = Field(
attribute="request__id",
column_name="Request ID"
)
error_type = Field(
attribute="error_type",
column_name="Error Type"
)
error_message = Field(
attribute="error_message",
column_name="Error Message"
)
logged_at = Field(
attribute="logged_at",
column_name="Logged At"
)
farm_unique_id = Field(
column_name="Farm ID",
attribute="farm_id__unique_id"
)

class Meta:
"""Meta class for DCASErrorLogResource."""

model = DCASErrorLog
fields = [
"id", "request_id", "farm_unique_id",
"error_type", "error_message", "logged_at"
]
export_order = fields
81 changes: 81 additions & 0 deletions django_project/dcas/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: DCAS tasks.
"""

from celery import shared_task
import logging
from dcas.queries import DataQuery
from dcas.models import DCASErrorLog, DCASRequest, DCASErrorType
from dcas.outputs import DCASPipelineOutput
from gap.models.farm import Farm

logger = logging.getLogger(__name__)


@shared_task(name='log_farms_without_messages')
def log_farms_without_messages(request_id=None, chunk_size=1000):
"""
Celery task to log farms without messages using chunked queries.

:param request_id: Id for the pipeline output
:type request_id: int
:param chunk_size: Number of rows to process per iteration
:type chunk_size: int
"""
try:
# Get the most recent DCAS request
dcas_request = DCASRequest.objects.get(
id=request_id
)

# Initialize pipeline output to get the directory path
dcas_output = DCASPipelineOutput(request_id)
parquet_path = dcas_output._get_directory_path(
dcas_output.DCAS_OUTPUT_DIR + '/*.parquet'
)

# Query farms without messages in chunks
for df_chunk in DataQuery.get_farms_without_messages(
parquet_path, chunk_size=chunk_size
):
if df_chunk.empty:
logger.info(
"No farms found with missing messages in this chunk."
)
continue

# Log missing messages in the database
error_logs = []
for _, row in df_chunk.iterrows():
try:
farm = Farm.objects.get(id=row['farm_id'])
except Farm.DoesNotExist:
logger.warning(
f"Farm ID {row['farm_id']} not found, skipping."
)
continue

error_logs.append(DCASErrorLog(
request=dcas_request,
farm_id=farm,
error_type=DCASErrorType.MISSING_MESSAGES,
error_message=(
f"Farm {row['farm_id']} (Crop {row['crop_id']}) "
f"has no advisory messages."
)
))

# Bulk insert logs per chunk to optimize database writes
if error_logs:
DCASErrorLog.objects.bulk_create(error_logs)
logger.info(
f"Logged {len(error_logs)} farms with missing messages."
)

except DCASRequest.DoesNotExist:
logger.error(f"No DCASRequest found for request_id {request_id}.")
except Exception as e:
logger.error(f"Error processing missing messages: {str(e)}")
78 changes: 78 additions & 0 deletions django_project/dcas/tests/test_errorlog_resource.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# coding=utf-8
"""
Tomorrow Now GAP.

.. note:: Unit tests for DCASErrorLogResource.
"""

from django.test import TestCase
from django.contrib.gis.geos import Point
from tablib import Dataset
from dcas.models import DCASErrorLog, DCASErrorType, DCASRequest
from dcas.resources import DCASErrorLogResource
from gap.models.farm import Farm
from gap.models.common import Country


class TestDCASErrorLogResource(TestCase):
"""Test case for DCASErrorLogResource."""

def setUp(self):
"""Set up test data."""
self.country = Country.objects.create(
name="Test Country",
iso_a3="TST"
)
self.farm = Farm.objects.create(
unique_id="FARM123",
geometry=Point(0.0, 0.0)
)
self.request = DCASRequest.objects.create(
requested_at="2024-01-01T00:00:00Z",
country=self.country
)

self.error_log = DCASErrorLog.objects.create(
request=self.request,
farm_id=self.farm,
error_type=DCASErrorType.MISSING_MESSAGES,
error_message="Test missing message error"
)

self.resource = DCASErrorLogResource()

def test_resource_fields(self):
"""Ensure the resource includes correct fields."""
expected_fields = {
"id", "Request ID", "Farm ID",
"Error Type", "Error Message", "Logged At"
}
actual_fields = {
field.column_name for field in self.resource.get_export_fields()
}
expected_fields = {
"Request ID", "Farm ID", "Error Type",
"Error Message", "Logged At", "id"
}

self.assertSetEqual(actual_fields, expected_fields)

def test_export_data(self):
"""Test exporting error logs."""
dataset = self.resource.export(DCASErrorLog.objects.all())
self.assertIsInstance(dataset, Dataset)
self.assertEqual(len(dataset.dict), 1)

exported_data = dataset.dict[0]
self.assertEqual(
exported_data["Request ID"], str(self.request.id)
)
self.assertEqual(
exported_data["Farm ID"], self.farm.unique_id
)
self.assertEqual(
exported_data["Error Type"], DCASErrorType.MISSING_MESSAGES
)
self.assertEqual(
exported_data["Error Message"], "Test missing message error"
)
Loading
Loading