From 1c955154c26ff27a4cef8c66005cd861724a7586 Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Thu, 23 Jan 2025 01:31:52 +0300 Subject: [PATCH 01/11] Add dcas error handling function --- django_project/core/celery.py | 5 ++ .../0006_dcaserrorlog_error_type.py | 18 +++++ django_project/dcas/models/error_log.py | 13 ++++ django_project/dcas/queries.py | 16 ++++ django_project/dcas/tasks.py | 73 +++++++++++++++++++ 5 files changed, 125 insertions(+) create mode 100644 django_project/dcas/migrations/0006_dcaserrorlog_error_type.py create mode 100644 django_project/dcas/tasks.py diff --git a/django_project/core/celery.py b/django_project/core/celery.py index a6765986..2c134103 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -92,6 +92,11 @@ # Run everyday at 00:15 UTC 'schedule': crontab(minute='15', hour='00'), }, + 'log-farms-without-messages': { + 'task': 'log_farms_without_messages', + 'schedule': crontab(minute='0', hour='2'), # TODO time + 'args': ("parquet_file",) # TODO path to file + }, } diff --git a/django_project/dcas/migrations/0006_dcaserrorlog_error_type.py b/django_project/dcas/migrations/0006_dcaserrorlog_error_type.py new file mode 100644 index 00000000..276da5ce --- /dev/null +++ b/django_project/dcas/migrations/0006_dcaserrorlog_error_type.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.7 on 2025-01-22 22:28 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dcas', '0005_alter_gddmatrix_unique_together'), + ] + + operations = [ + migrations.AddField( + model_name='dcaserrorlog', + name='error_type', + field=models.CharField(choices=[('MISSING_MESSAGES', 'Missing Messages'), ('PROCESSING_FAILURE', 'Processing Failure'), ('OTHER', 'Other')], default='OTHER', help_text='The type of error encountered.', max_length=50), + ), + ] diff --git a/django_project/dcas/models/error_log.py b/django_project/dcas/models/error_log.py index 454c3d15..c0954663 100644 --- a/django_project/dcas/models/error_log.py +++ b/django_project/dcas/models/error_log.py @@ -13,6 +13,13 @@ from dcas.models.request import DCASRequest +class DCASErrorType(models.TextChoices): + """Enum for error types in DCAS ErrorLog.""" + MISSING_MESSAGES = "MISSING_MESSAGES", _("Missing Messages") + PROCESSING_FAILURE = "PROCESSING_FAILURE", _("Processing Failure") + OTHER = "OTHER", _("Other") + + class DCASErrorLog(models.Model): """Model to store farms that cannot be processed.""" @@ -25,6 +32,12 @@ class DCASErrorLog(models.Model): Farm, on_delete=models.CASCADE, help_text="The unique identifier of the farm that failed to process." ) + error_type = models.CharField( + max_length=50, + choices=DCASErrorType.choices, + default=DCASErrorType.OTHER, + help_text="The type of error encountered." + ) error_message = models.TextField( help_text="Details about why the farm could not be processed." ) diff --git a/django_project/dcas/queries.py b/django_project/dcas/queries.py index c3f84dad..6e45232b 100644 --- a/django_project/dcas/queries.py +++ b/django_project/dcas/queries.py @@ -269,3 +269,19 @@ def read_grid_data_crop_meta_parquet( df = conndb.sql(query).df() conndb.close() return df + + def get_farms_without_messages(parquet_path: str): + """Query the final Parquet file for farms with missing messages.""" + query = f""" + SELECT farm_id, crop_id + FROM read_parquet('{parquet_path}') + WHERE message IS NULL + AND message_2 IS NULL + AND message_3 IS NULL + AND message_4 IS NULL + AND message_5 IS NULL + """ + conn = duckdb.connect() + df = conn.sql(query).df() + conn.close() + return df diff --git a/django_project/dcas/tasks.py b/django_project/dcas/tasks.py new file mode 100644 index 00000000..fa16d757 --- /dev/null +++ b/django_project/dcas/tasks.py @@ -0,0 +1,73 @@ +# coding=utf-8 +''' +Tomorrow Now GAP. + +.. note:: DCAS tasks. +''' + +from celery import shared_task +import logging +from dcas.queries import DataQuery +from dcas.models import DCASErrorLog, DCASRequest, DCASErrorType +from gap.models.farm import Farm + +logger = logging.getLogger(__name__) + + +@shared_task(name='log_farms_without_messages') +def log_farms_without_messages(parquet_path: str, request_id: int): + ''' + Celery task to log farms without messages. + + :param parquet_path: Path to the final farm crop Parquet file + :type parquet_path: str + :param request_id: ID of the related DCASRequest + :type request_id: int + ''' + logger.info('Checking for farms without messages...') + + try: + # Query farms without messages + df = DataQuery.get_farms_without_messages(parquet_path) + + if df.empty: + logger.info('No farms found with missing messages.') + return + + # Retrieve the associated DCAS request + try: + dcas_request = DCASRequest.objects.get(id=request_id) + except DCASRequest.DoesNotExist: + logger.error(f'DCASRequest with ID {request_id} not found.') + return + + # Log missing messages in the database + error_logs = [] + for _, row in df.iterrows(): + try: + farm = Farm.objects.get(id=row['farm_id']) + except Farm.DoesNotExist: + logger.warning( + f'Farm ID {row['farm_id']} not found, skipping.' + ) + continue + + error_logs.append(DCASErrorLog( + request=dcas_request, + farm_id=farm, + error_type=DCASErrorType.MISSING_MESSAGES, + error_message=( + f'Farm {row['farm_id']} (Crop {row['crop_id']}) ' + f'has no advisory messages.' + ) + )) + + # Bulk insert to improve performance + if error_logs: + DCASErrorLog.objects.bulk_create(error_logs) + logger.info( + f'Logged {len(error_logs)} farms with missing messages.' + ) + + except Exception as e: + logger.error(f'Error processing missing messages: {str(e)}') From 63d5173c215a4cb302eba5807dcfd261a14de334 Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Thu, 23 Jan 2025 01:35:40 +0300 Subject: [PATCH 02/11] flake8 fixes --- django_project/dcas/tasks.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/django_project/dcas/tasks.py b/django_project/dcas/tasks.py index fa16d757..d6ca531a 100644 --- a/django_project/dcas/tasks.py +++ b/django_project/dcas/tasks.py @@ -1,9 +1,9 @@ # coding=utf-8 -''' +""" Tomorrow Now GAP. .. note:: DCAS tasks. -''' +""" from celery import shared_task import logging @@ -16,14 +16,14 @@ @shared_task(name='log_farms_without_messages') def log_farms_without_messages(parquet_path: str, request_id: int): - ''' + """ Celery task to log farms without messages. :param parquet_path: Path to the final farm crop Parquet file :type parquet_path: str :param request_id: ID of the related DCASRequest :type request_id: int - ''' + """ logger.info('Checking for farms without messages...') try: From de7d8ea74c8df1e7cb82ba61d5fa8ec32b30deeb Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Thu, 23 Jan 2025 01:36:18 +0300 Subject: [PATCH 03/11] fix flake8 --- django_project/dcas/models/error_log.py | 1 + 1 file changed, 1 insertion(+) diff --git a/django_project/dcas/models/error_log.py b/django_project/dcas/models/error_log.py index c0954663..d5382eda 100644 --- a/django_project/dcas/models/error_log.py +++ b/django_project/dcas/models/error_log.py @@ -15,6 +15,7 @@ class DCASErrorType(models.TextChoices): """Enum for error types in DCAS ErrorLog.""" + MISSING_MESSAGES = "MISSING_MESSAGES", _("Missing Messages") PROCESSING_FAILURE = "PROCESSING_FAILURE", _("Processing Failure") OTHER = "OTHER", _("Other") From b22dbbe510e748b2b0ad76d9d0ca326b6aa8667e Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Thu, 23 Jan 2025 14:59:10 +0300 Subject: [PATCH 04/11] Add test to filter farms without messages --- .../dcas/tests/test_pipeline_queries.py | 63 +++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/django_project/dcas/tests/test_pipeline_queries.py b/django_project/dcas/tests/test_pipeline_queries.py index d4eb4014..7e71523d 100644 --- a/django_project/dcas/tests/test_pipeline_queries.py +++ b/django_project/dcas/tests/test_pipeline_queries.py @@ -6,6 +6,7 @@ """ from mock import patch, MagicMock +import pandas as pd from dcas.tests.base import DCASPipelineBaseTest from dcas.queries import DataQuery @@ -26,3 +27,65 @@ def test_read_grid_data_crop_meta_parquet(self, mock_duckdb_connect): mock_duckdb_connect.assert_called_once() mock_conn.sql.assert_called_once() mock_conn.close.assert_called_once() + + @patch("dcas.queries.duckdb.connect") + def test_get_farms_without_messages(self, mock_duckdb_connect): + """Test retrieving farms with missing messages.""" + + # Mock DuckDB return DataFrame + mock_df = pd.DataFrame({ + 'farm_id': [1, 2, 3, 4], + 'crop_id': [101, 102, 103, 104], + 'message': ["MSG1", None, "MSG3", None], + 'message_2': [None, None, None, None], + 'message_3': [None, None, None, None], + 'message_4': [None, None, None, "MSG4"], + 'message_5': [None, None, None, None] + }) + + expected_df = mock_df[ + mock_df[ + [ + 'message', + 'message_2', + 'message_3', + 'message_4', + 'message_5' + ] + ].isnull().all(axis=1)][['farm_id', 'crop_id']] + + # Configure mock connection + mock_conn = mock_duckdb_connect.return_value + mock_conn.sql.return_value.df.return_value = expected_df + + # Call the function + result_df = DataQuery.get_farms_without_messages( + "/tmp/dcas/farm_crop.parquet" + ) + + self.assertIsInstance(result_df, pd.DataFrame) + self.assertEqual(len(result_df), len(expected_df)) + self.assertEqual(list(result_df.columns), ['farm_id', 'crop_id']) + + # Ensure the DataFrames are equal + pd.testing.assert_frame_equal(result_df, expected_df) + + # Ensure DuckDB was called correctly + expected_query = """ + SELECT farm_id, crop_id + FROM read_parquet('/tmp/dcas/farm_crop.parquet') + WHERE message IS NULL + AND message_2 IS NULL + AND message_3 IS NULL + AND message_4 IS NULL + AND message_5 IS NULL + """ + actual_query = mock_conn.sql.call_args[0][0] + + # Strip unnecessary spaces and line breaks before comparing + normalized_expected_query = " ".join(expected_query.split()) + normalized_actual_query = " ".join(actual_query.split()) + + self.assertEqual(normalized_actual_query, normalized_expected_query) + + mock_conn.close.assert_called_once() From c99f6849f9bef115dbd2283229f0a96b259356ff Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Thu, 23 Jan 2025 15:01:30 +0300 Subject: [PATCH 05/11] fix flake8 error --- django_project/dcas/tests/test_pipeline_queries.py | 1 - 1 file changed, 1 deletion(-) diff --git a/django_project/dcas/tests/test_pipeline_queries.py b/django_project/dcas/tests/test_pipeline_queries.py index 7e71523d..d1163427 100644 --- a/django_project/dcas/tests/test_pipeline_queries.py +++ b/django_project/dcas/tests/test_pipeline_queries.py @@ -31,7 +31,6 @@ def test_read_grid_data_crop_meta_parquet(self, mock_duckdb_connect): @patch("dcas.queries.duckdb.connect") def test_get_farms_without_messages(self, mock_duckdb_connect): """Test retrieving farms with missing messages.""" - # Mock DuckDB return DataFrame mock_df = pd.DataFrame({ 'farm_id': [1, 2, 3, 4], From 18f37f8feaf0d4560ffeb16321d51f1e59036a4a Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Thu, 23 Jan 2025 23:14:36 +0300 Subject: [PATCH 06/11] Fix celery task --- django_project/core/celery.py | 3 +-- django_project/dcas/tasks.py | 45 ++++++++++++++++++++--------------- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/django_project/core/celery.py b/django_project/core/celery.py index 2c134103..4475d98e 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -94,8 +94,7 @@ }, 'log-farms-without-messages': { 'task': 'log_farms_without_messages', - 'schedule': crontab(minute='0', hour='2'), # TODO time - 'args': ("parquet_file",) # TODO path to file + 'schedule': crontab(minute='0', hour='2'), }, } diff --git a/django_project/dcas/tasks.py b/django_project/dcas/tasks.py index d6ca531a..6331b886 100644 --- a/django_project/dcas/tasks.py +++ b/django_project/dcas/tasks.py @@ -7,26 +7,39 @@ from celery import shared_task import logging +from datetime import datetime, timedelta from dcas.queries import DataQuery from dcas.models import DCASErrorLog, DCASRequest, DCASErrorType +from dcas.outputs import DCASPipelineOutput from gap.models.farm import Farm logger = logging.getLogger(__name__) @shared_task(name='log_farms_without_messages') -def log_farms_without_messages(parquet_path: str, request_id: int): +def log_farms_without_messages(request_date=None): """ Celery task to log farms without messages. - :param parquet_path: Path to the final farm crop Parquet file - :type parquet_path: str - :param request_id: ID of the related DCASRequest - :type request_id: int + :param request_date: Date for the pipeline output + :type request_date: datetime.date """ + if request_date is None: + request_date = (datetime.utcnow() - timedelta(days=1)).date() logger.info('Checking for farms without messages...') try: + # Get the most recent DCAS request + dcas_request = DCASRequest.objects.filter( + request_date=request_date + ).latest('created_at') + + # Initialize pipeline output to get the directory path + dcas_output = DCASPipelineOutput(request_date) + parquet_path = dcas_output._get_directory_path( + dcas_output.DCAS_OUTPUT_DIR + '/*.parquet' + ) + # Query farms without messages df = DataQuery.get_farms_without_messages(parquet_path) @@ -34,13 +47,6 @@ def log_farms_without_messages(parquet_path: str, request_id: int): logger.info('No farms found with missing messages.') return - # Retrieve the associated DCAS request - try: - dcas_request = DCASRequest.objects.get(id=request_id) - except DCASRequest.DoesNotExist: - logger.error(f'DCASRequest with ID {request_id} not found.') - return - # Log missing messages in the database error_logs = [] for _, row in df.iterrows(): @@ -48,8 +54,7 @@ def log_farms_without_messages(parquet_path: str, request_id: int): farm = Farm.objects.get(id=row['farm_id']) except Farm.DoesNotExist: logger.warning( - f'Farm ID {row['farm_id']} not found, skipping.' - ) + f"Farm ID {row['farm_id']} not found, skipping.") continue error_logs.append(DCASErrorLog( @@ -57,8 +62,8 @@ def log_farms_without_messages(parquet_path: str, request_id: int): farm_id=farm, error_type=DCASErrorType.MISSING_MESSAGES, error_message=( - f'Farm {row['farm_id']} (Crop {row['crop_id']}) ' - f'has no advisory messages.' + f"Farm {row['farm_id']} (Crop {row['crop_id']}) " + f"has no advisory messages." ) )) @@ -66,8 +71,10 @@ def log_farms_without_messages(parquet_path: str, request_id: int): if error_logs: DCASErrorLog.objects.bulk_create(error_logs) logger.info( - f'Logged {len(error_logs)} farms with missing messages.' - ) + f"Logged {len(error_logs)} farms with missing messages.") + except DCASRequest.DoesNotExist: + logger.error( + f"No DCASRequest found for request_date {request_date}.") except Exception as e: - logger.error(f'Error processing missing messages: {str(e)}') + logger.error(f"Error processing missing messages: {str(e)}") From 342333b031defae919b2453f0197a7d1c285957a Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Sun, 26 Jan 2025 23:32:20 +0300 Subject: [PATCH 07/11] update query to use chuncks --- django_project/dcas/queries.py | 49 +++++++--- django_project/dcas/tasks.py | 72 ++++++++------- .../dcas/tests/test_pipeline_queries.py | 90 ++++++++----------- 3 files changed, 113 insertions(+), 98 deletions(-) diff --git a/django_project/dcas/queries.py b/django_project/dcas/queries.py index b1600f1e..b251b7d7 100644 --- a/django_project/dcas/queries.py +++ b/django_project/dcas/queries.py @@ -283,18 +283,41 @@ def read_grid_data_crop_meta_parquet( conndb.close() return df - def get_farms_without_messages(parquet_path: str): - """Query the final Parquet file for farms with missing messages.""" - query = f""" - SELECT farm_id, crop_id - FROM read_parquet('{parquet_path}') - WHERE message IS NULL - AND message_2 IS NULL - AND message_3 IS NULL - AND message_4 IS NULL - AND message_5 IS NULL + def get_farms_without_messages(parquet_path: str, chunk_size: int = 500): + """ + Fetch farms without advisory messages using chunked processing. + + :param parquet_path: Path to the final Parquet file. + :type parquet_path: str + :param chunk_size: Number of records per chunk (default: 500). + :type chunk_size: int + :return: Generator yielding Pandas DataFrames in chunks. + :rtype: Generator[pd.DataFrame] """ conn = duckdb.connect() - df = conn.sql(query).df() - conn.close() - return df + offset = 0 # Start at the beginning + + try: + while True: + query = f""" + SELECT farm_id, crop_id + FROM read_parquet('{parquet_path}') + WHERE message IS NULL + AND message_2 IS NULL + AND message_3 IS NULL + AND message_4 IS NULL + AND message_5 IS NULL + LIMIT {chunk_size} OFFSET {offset} + """ + df = conn.sql(query).df() + + if df.empty: + break # Stop when there are no more records + + yield df # Yield the chunk + offset += chunk_size # Move to the next batch + + except Exception as e: + print(f"Error querying Parquet: {str(e)}") + finally: + conn.close() diff --git a/django_project/dcas/tasks.py b/django_project/dcas/tasks.py index 6331b886..2121798f 100644 --- a/django_project/dcas/tasks.py +++ b/django_project/dcas/tasks.py @@ -17,16 +17,18 @@ @shared_task(name='log_farms_without_messages') -def log_farms_without_messages(request_date=None): +def log_farms_without_messages(request_date=None, chunk_size=1000): """ - Celery task to log farms without messages. + Celery task to log farms without messages using chunked queries. :param request_date: Date for the pipeline output :type request_date: datetime.date + :param chunk_size: Number of rows to process per iteration + :type chunk_size: int """ if request_date is None: request_date = (datetime.utcnow() - timedelta(days=1)).date() - logger.info('Checking for farms without messages...') + logger.info(f"Checking for farms without messages for {request_date}...") try: # Get the most recent DCAS request @@ -40,41 +42,45 @@ def log_farms_without_messages(request_date=None): dcas_output.DCAS_OUTPUT_DIR + '/*.parquet' ) - # Query farms without messages - df = DataQuery.get_farms_without_messages(parquet_path) + # Query farms without messages in chunks + for df_chunk in DataQuery.get_farms_without_messages( + parquet_path, chunk_size=chunk_size + ): + if df_chunk.empty: + logger.info( + "No farms found with missing messages in this chunk." + ) + continue - if df.empty: - logger.info('No farms found with missing messages.') - return + # Log missing messages in the database + error_logs = [] + for _, row in df_chunk.iterrows(): + try: + farm = Farm.objects.get(id=row['farm_id']) + except Farm.DoesNotExist: + logger.warning( + f"Farm ID {row['farm_id']} not found, skipping." + ) + continue - # Log missing messages in the database - error_logs = [] - for _, row in df.iterrows(): - try: - farm = Farm.objects.get(id=row['farm_id']) - except Farm.DoesNotExist: - logger.warning( - f"Farm ID {row['farm_id']} not found, skipping.") - continue + error_logs.append(DCASErrorLog( + request=dcas_request, + farm_id=farm, + error_type=DCASErrorType.MISSING_MESSAGES, + error_message=( + f"Farm {row['farm_id']} (Crop {row['crop_id']}) " + f"has no advisory messages." + ) + )) - error_logs.append(DCASErrorLog( - request=dcas_request, - farm_id=farm, - error_type=DCASErrorType.MISSING_MESSAGES, - error_message=( - f"Farm {row['farm_id']} (Crop {row['crop_id']}) " - f"has no advisory messages." + # Bulk insert logs per chunk to optimize database writes + if error_logs: + DCASErrorLog.objects.bulk_create(error_logs) + logger.info( + f"Logged {len(error_logs)} farms with missing messages." ) - )) - - # Bulk insert to improve performance - if error_logs: - DCASErrorLog.objects.bulk_create(error_logs) - logger.info( - f"Logged {len(error_logs)} farms with missing messages.") except DCASRequest.DoesNotExist: - logger.error( - f"No DCASRequest found for request_date {request_date}.") + logger.error(f"No DCASRequest found for request_date {request_date}.") except Exception as e: logger.error(f"Error processing missing messages: {str(e)}") diff --git a/django_project/dcas/tests/test_pipeline_queries.py b/django_project/dcas/tests/test_pipeline_queries.py index 573b8894..4cbbdc80 100644 --- a/django_project/dcas/tests/test_pipeline_queries.py +++ b/django_project/dcas/tests/test_pipeline_queries.py @@ -5,6 +5,7 @@ .. note:: Unit tests for DCAS Queries functions. """ +import re from mock import patch, MagicMock import pandas as pd from sqlalchemy import create_engine @@ -31,63 +32,48 @@ def test_read_grid_data_crop_meta_parquet(self, mock_duckdb_connect): mock_conn.close.assert_called_once() @patch("dcas.queries.duckdb.connect") - def test_get_farms_without_messages(self, mock_duckdb_connect): - """Test retrieving farms with missing messages.""" - # Mock DuckDB return DataFrame - mock_df = pd.DataFrame({ - 'farm_id': [1, 2, 3, 4], - 'crop_id': [101, 102, 103, 104], - 'message': ["MSG1", None, "MSG3", None], - 'message_2': [None, None, None, None], - 'message_3': [None, None, None, None], - 'message_4': [None, None, None, "MSG4"], - 'message_5': [None, None, None, None] - }) - - expected_df = mock_df[ - mock_df[ - [ - 'message', - 'message_2', - 'message_3', - 'message_4', - 'message_5' - ] - ].isnull().all(axis=1)][['farm_id', 'crop_id']] - - # Configure mock connection + def test_get_farms_without_messages_chunked(self, mock_duckdb_connect): + """Test retrieving farms with missing messages in chunks.""" + # Mock DuckDB return DataFrames (Simulating chunked retrieval) + chunk_1 = pd.DataFrame({'farm_id': [1, 2], 'crop_id': [101, 102]}) + chunk_2 = pd.DataFrame({'farm_id': [3, 4], 'crop_id': [103, 104]}) + + expected_chunks = [chunk_1, chunk_2] + + # Configure mock connection to return chunks in order mock_conn = mock_duckdb_connect.return_value - mock_conn.sql.return_value.df.return_value = expected_df + mock_conn.sql.return_value.df.side_effect = expected_chunks # Call the function - result_df = DataQuery.get_farms_without_messages( - "/tmp/dcas/farm_crop.parquet" + result_chunks = list( + DataQuery.get_farms_without_messages( + "/tmp/dcas/farm_crop.parquet", chunk_size=2 + ) ) - self.assertIsInstance(result_df, pd.DataFrame) - self.assertEqual(len(result_df), len(expected_df)) - self.assertEqual(list(result_df.columns), ['farm_id', 'crop_id']) - - # Ensure the DataFrames are equal - pd.testing.assert_frame_equal(result_df, expected_df) - - # Ensure DuckDB was called correctly - expected_query = """ - SELECT farm_id, crop_id - FROM read_parquet('/tmp/dcas/farm_crop.parquet') - WHERE message IS NULL - AND message_2 IS NULL - AND message_3 IS NULL - AND message_4 IS NULL - AND message_5 IS NULL - """ - actual_query = mock_conn.sql.call_args[0][0] - - # Strip unnecessary spaces and line breaks before comparing - normalized_expected_query = " ".join(expected_query.split()) - normalized_actual_query = " ".join(actual_query.split()) - - self.assertEqual(normalized_actual_query, normalized_expected_query) + # Ensure we receive correct number of chunks + self.assertEqual(len(result_chunks), len(expected_chunks)) + + # Validate each chunk + for result_df, expected_df in zip(result_chunks, expected_chunks): + pd.testing.assert_frame_equal(result_df, expected_df) + + # Check DuckDB Query + expected_query_pattern = re.compile( + r"SELECT farm_id, crop_id " + r"FROM read_parquet\('/tmp/dcas/farm_crop.parquet'\) " + r"WHERE message IS NULL " + r"AND message_2 IS NULL " + r"AND message_3 IS NULL " + r"AND message_4 IS NULL " + r"AND message_5 IS NULL" + r"(\s+LIMIT\s+\d+\s+OFFSET\s+\d+)?" + ) + + actual_query = " ".join(mock_conn.sql.call_args[0][0].split()) + + # Assert query structure matches, ignoring chunking additions + self.assertRegex(actual_query, expected_query_pattern) mock_conn.close.assert_called_once() From dd0325006b94f179d7b78aba43897cf90cf6ab10 Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Mon, 27 Jan 2025 01:20:36 +0300 Subject: [PATCH 08/11] Add errorlog export to csv/xlxs in admin page --- django_project/dcas/admin.py | 25 ++++-- django_project/dcas/models/error_log.py | 9 +++ django_project/dcas/resources.py | 45 +++++++++++ .../dcas/tests/test_errorlog_resource.py | 78 +++++++++++++++++++ 4 files changed, 151 insertions(+), 6 deletions(-) create mode 100644 django_project/dcas/resources.py create mode 100644 django_project/dcas/tests/test_errorlog_resource.py diff --git a/django_project/dcas/admin.py b/django_project/dcas/admin.py index a94a1a76..adc7e8a5 100644 --- a/django_project/dcas/admin.py +++ b/django_project/dcas/admin.py @@ -5,6 +5,8 @@ .. note:: Admin for DCAS Models """ +from import_export.admin import ExportMixin +from import_export_celery.admin_actions import create_export_job_action from django.contrib import admin from dcas.models import ( @@ -17,6 +19,7 @@ GDDConfig, GDDMatrix ) +from dcas.resources import DCASErrorLogResource class ConfigByCountryInline(admin.TabularInline): @@ -65,13 +68,23 @@ class DCASOutputAdmin(admin.ModelAdmin): @admin.register(DCASErrorLog) -class DCASErrorLogAdmin(admin.ModelAdmin): - """Admin page for DCASErrorLog.""" +class DCASErrorLogAdmin(ExportMixin, admin.ModelAdmin): + """Admin class for DCASErrorLog model.""" - list_display = ('logged_at', 'request', 'farm_id', 'error_message') - list_filter = ('request', 'farm_id') - search_fields = ('error_message',) - ordering = ('-logged_at',) + resource_class = DCASErrorLogResource + actions = [create_export_job_action] + + list_display = ( + "id", + "request_id", + "farm_id", + "error_type", + "error_message", + "logged_at", + ) + + search_fields = ("error_message", "farm_id", "request__id") + list_filter = ("error_type", "logged_at") # GDD Config and Matrix diff --git a/django_project/dcas/models/error_log.py b/django_project/dcas/models/error_log.py index d5382eda..2cd3e46a 100644 --- a/django_project/dcas/models/error_log.py +++ b/django_project/dcas/models/error_log.py @@ -53,3 +53,12 @@ class Meta: db_table = 'dcas_error_log' verbose_name = _('Error Log') ordering = ['-logged_at'] + + @classmethod + def export_resource_classes(cls): + """Export resource classes for import-export.""" + from dcas.resources import DCASErrorLogResource + + return { + "DCASErrorLog": ("DCASErrorLog Resource", DCASErrorLogResource) + } diff --git a/django_project/dcas/resources.py b/django_project/dcas/resources.py new file mode 100644 index 00000000..3b8fcc58 --- /dev/null +++ b/django_project/dcas/resources.py @@ -0,0 +1,45 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Resource class for DCASErrorLog +""" + +from import_export.fields import Field +from import_export.resources import ModelResource +from dcas.models import DCASErrorLog + + +class DCASErrorLogResource(ModelResource): + """Resource class for DCASErrorLog.""" + + request_id = Field( + attribute="request__id", + column_name="Request ID" + ) + error_type = Field( + attribute="error_type", + column_name="Error Type" + ) + error_message = Field( + attribute="error_message", + column_name="Error Message" + ) + logged_at = Field( + attribute="logged_at", + column_name="Logged At" + ) + farm_unique_id = Field( + column_name="Farm ID", + attribute="farm_id__unique_id" + ) + + class Meta: + """Meta class for DCASErrorLogResource.""" + + model = DCASErrorLog + fields = [ + "id", "request_id", "farm_unique_id", + "error_type", "error_message", "logged_at" + ] + export_order = fields diff --git a/django_project/dcas/tests/test_errorlog_resource.py b/django_project/dcas/tests/test_errorlog_resource.py new file mode 100644 index 00000000..1e9816a8 --- /dev/null +++ b/django_project/dcas/tests/test_errorlog_resource.py @@ -0,0 +1,78 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Unit tests for DCASErrorLogResource. +""" + +from django.test import TestCase +from django.contrib.gis.geos import Point +from tablib import Dataset +from dcas.models import DCASErrorLog, DCASErrorType, DCASRequest +from dcas.resources import DCASErrorLogResource +from gap.models.farm import Farm +from gap.models.common import Country + + +class TestDCASErrorLogResource(TestCase): + """Test case for DCASErrorLogResource.""" + + def setUp(self): + """Set up test data.""" + self.country = Country.objects.create( + name="Test Country", + iso_a3="TST" + ) + self.farm = Farm.objects.create( + unique_id="FARM123", + geometry=Point(0.0, 0.0) + ) + self.request = DCASRequest.objects.create( + requested_at="2024-01-01T00:00:00Z", + country=self.country + ) + + self.error_log = DCASErrorLog.objects.create( + request=self.request, + farm_id=self.farm, + error_type=DCASErrorType.MISSING_MESSAGES, + error_message="Test missing message error" + ) + + self.resource = DCASErrorLogResource() + + def test_resource_fields(self): + """Ensure the resource includes correct fields.""" + expected_fields = { + "id", "Request ID", "Farm ID", + "Error Type", "Error Message", "Logged At" + } + actual_fields = { + field.column_name for field in self.resource.get_export_fields() + } + expected_fields = { + "Request ID", "Farm ID", "Error Type", + "Error Message", "Logged At", "id" + } + + self.assertSetEqual(actual_fields, expected_fields) + + def test_export_data(self): + """Test exporting error logs.""" + dataset = self.resource.export(DCASErrorLog.objects.all()) + self.assertIsInstance(dataset, Dataset) + self.assertEqual(len(dataset.dict), 1) + + exported_data = dataset.dict[0] + self.assertEqual( + exported_data["Request ID"], str(self.request.id) + ) + self.assertEqual( + exported_data["Farm ID"], self.farm.unique_id + ) + self.assertEqual( + exported_data["Error Type"], DCASErrorType.MISSING_MESSAGES + ) + self.assertEqual( + exported_data["Error Message"], "Test missing message error" + ) From 1d4ddea31f38f45e70f46b4f8dd4176793e66efc Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Mon, 27 Jan 2025 10:45:47 +0300 Subject: [PATCH 09/11] rm celery for dcas tasks --- django_project/core/celery.py | 4 ---- django_project/dcas/tasks.py | 20 ++++++++------------ 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/django_project/core/celery.py b/django_project/core/celery.py index 4475d98e..a6765986 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -92,10 +92,6 @@ # Run everyday at 00:15 UTC 'schedule': crontab(minute='15', hour='00'), }, - 'log-farms-without-messages': { - 'task': 'log_farms_without_messages', - 'schedule': crontab(minute='0', hour='2'), - }, } diff --git a/django_project/dcas/tasks.py b/django_project/dcas/tasks.py index 2121798f..ccded11e 100644 --- a/django_project/dcas/tasks.py +++ b/django_project/dcas/tasks.py @@ -17,27 +17,23 @@ @shared_task(name='log_farms_without_messages') -def log_farms_without_messages(request_date=None, chunk_size=1000): +def log_farms_without_messages(request_id=None, chunk_size=1000): """ Celery task to log farms without messages using chunked queries. - :param request_date: Date for the pipeline output - :type request_date: datetime.date + :param request_id: Id for the pipeline output + :type request_id: int :param chunk_size: Number of rows to process per iteration :type chunk_size: int """ - if request_date is None: - request_date = (datetime.utcnow() - timedelta(days=1)).date() - logger.info(f"Checking for farms without messages for {request_date}...") - try: # Get the most recent DCAS request - dcas_request = DCASRequest.objects.filter( - request_date=request_date - ).latest('created_at') + dcas_request = DCASRequest.objects.get( + id=request_id + ) # Initialize pipeline output to get the directory path - dcas_output = DCASPipelineOutput(request_date) + dcas_output = DCASPipelineOutput(request_id) parquet_path = dcas_output._get_directory_path( dcas_output.DCAS_OUTPUT_DIR + '/*.parquet' ) @@ -81,6 +77,6 @@ def log_farms_without_messages(request_date=None, chunk_size=1000): ) except DCASRequest.DoesNotExist: - logger.error(f"No DCASRequest found for request_date {request_date}.") + logger.error(f"No DCASRequest found for request_id {request_id}.") except Exception as e: logger.error(f"Error processing missing messages: {str(e)}") From 82793432396858c55066d62aca80522005c2f6dc Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Mon, 27 Jan 2025 10:53:14 +0300 Subject: [PATCH 10/11] fix flake8 (imported not used --- django_project/dcas/tasks.py | 1 - 1 file changed, 1 deletion(-) diff --git a/django_project/dcas/tasks.py b/django_project/dcas/tasks.py index ccded11e..8cd7a3f7 100644 --- a/django_project/dcas/tasks.py +++ b/django_project/dcas/tasks.py @@ -7,7 +7,6 @@ from celery import shared_task import logging -from datetime import datetime, timedelta from dcas.queries import DataQuery from dcas.models import DCASErrorLog, DCASRequest, DCASErrorType from dcas.outputs import DCASPipelineOutput From 0a5cce7684f5f052d09fff29f672328a495536b3 Mon Sep 17 00:00:00 2001 From: Jeff Osundwa Date: Tue, 28 Jan 2025 09:51:34 +0300 Subject: [PATCH 11/11] fix test on test_crop_plan --- django_project/gap_api/tests/test_crop_plan_api.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/django_project/gap_api/tests/test_crop_plan_api.py b/django_project/gap_api/tests/test_crop_plan_api.py index b2cd8a2e..11fa137c 100644 --- a/django_project/gap_api/tests/test_crop_plan_api.py +++ b/django_project/gap_api/tests/test_crop_plan_api.py @@ -119,7 +119,10 @@ def test_params_error(self): def test_correct(self): """Test correct.""" - response = self.request(reverse('api:v1:crop-plan')) + response = self.request( + reverse('api:v1:crop-plan') + + '?farm_ids=farm-1,farm-2' + ) self.assertEqual(response.status_code, 200) _data = response.data self.assertEqual(len(_data), 2)