Skip to content

Commit

Permalink
fix grid_crop_key (#386)
Browse files Browse the repository at this point in the history
* fix grid_crop_key

* fix using connection for loading pandas by query

* check for a test test that causes test_db is failed to remove

* fix tests
  • Loading branch information
danangmassandy authored Jan 23, 2025
1 parent 334d5b4 commit ac6cc22
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 163 deletions.
20 changes: 11 additions & 9 deletions django_project/dcas/partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,14 @@ def process_partition_seasonal_precipitation(

# calculate seasonal_precipitation
grid_column_list.remove('grid_id')
df['seasonal_precipitation'] = df[grid_column_list].sum(axis=1)
seasonal_precipitation_df = df[grid_column_list].sum(axis=1)
seasonal_precipitation_df.name = 'seasonal_precipitation'

# data cleanup
df = df.drop(columns=grid_column_list)

df = pd.concat([df, seasonal_precipitation_df], axis=1)

return df


Expand Down Expand Up @@ -224,11 +227,14 @@ def process_partition_growth_stage_precipitation(
)

grid_column_list.remove('grid_id')
df['growth_stage_precipitation'] = df[grid_column_list].sum(axis=1)
growth_stage_precipitation_df = df[grid_column_list].sum(axis=1)
growth_stage_precipitation_df.name = 'growth_stage_precipitation'

# data cleanup
df = df.drop(columns=grid_column_list)

df = pd.concat([df, growth_stage_precipitation_df], axis=1)

return df


Expand Down Expand Up @@ -317,18 +323,14 @@ def process_partition_farm_registry(
:return: merged dataframe
:rtype: pd.DataFrame
"""
grid_id_list = df['grid_id'].unique()
crop_id_list = df['crop_id'].unique()
crop_stage_type_list = df['crop_stage_type_id'].unique()

# read grid_data_df
grid_data_df = read_grid_crop_data(
parquet_file_path, grid_id_list,
crop_id_list, crop_stage_type_list
parquet_file_path,
df['grid_crop_key'].to_list()
)

grid_data_df = grid_data_df.drop(
columns=['__null_dask_index__', 'planting_date']
columns=['__null_dask_index__', 'planting_date', 'grid_crop_key']
)

# merge the df with grid_data
Expand Down
14 changes: 8 additions & 6 deletions django_project/dcas/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,12 @@ def load_grid_data(self) -> pd.DataFrame:
:return: DataFrame of Grid Data
:rtype: pd.DataFrame
"""
df = pd.read_sql_query(
self.data_query.grid_data_query(self.farm_registry_group),
con=self.conn_engine,
index_col=self.data_query.grid_id_index_col,
)
with self.conn_engine.connect() as conn:
df = pd.read_sql_query(
self.data_query.grid_data_query(self.farm_registry_group),
con=conn,
index_col=self.data_query.grid_id_index_col,
)

return self._merge_grid_data_with_config(df)

Expand Down Expand Up @@ -431,7 +432,8 @@ def _append_grid_crop_meta(self, farm_df_meta: pd.DataFrame):
# - growth_stage
meta = grid_crop_df_meta.drop(columns=[
'crop_id', 'crop_stage_type_id', 'planting_date',
'grid_id', 'planting_date_epoch', '__null_dask_index__'
'grid_id', 'planting_date_epoch', '__null_dask_index__',
'grid_crop_key'
])
# add growth_stage
meta = meta.assign(growth_stage=None)
Expand Down
43 changes: 29 additions & 14 deletions django_project/dcas/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
"""

import pandas as pd
from sqlalchemy import select, distinct, column, extract, func
from sqlalchemy import select, distinct, column, extract, func, cast
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.types import String as SqlString
from geoalchemy2.functions import ST_X, ST_Y, ST_Centroid
import duckdb

Expand Down Expand Up @@ -118,6 +119,11 @@ def _grid_data_with_crop_subquery(self, farm_registry_group):
'epoch',
func.DATE(self.farmregistry.c.growth_stage_start_date)
).label('prev_growth_stage_start_date'),
(
cast(self.farmregistry.c.crop_id, SqlString) + '_' +
cast(self.farmregistry.c.crop_stage_type_id, SqlString) + '_' +
cast(self.grid.c.id, SqlString)
).label('grid_crop_key')
).select_from(self.farmregistry).join(
self.farm, self.farmregistry.c.farm_id == self.farm.c.id
).join(
Expand All @@ -142,7 +148,8 @@ def grid_data_with_crop_query(self, farm_registry_group):
column('prev_growth_stage_id'),
column('prev_growth_stage_start_date'),
column('grid_id'),
column('planting_date_epoch')
column('planting_date_epoch'),
column('grid_crop_key')
).distinct().select_from(subquery)

def grid_data_with_crop_meta(self, farm_registry_group):
Expand All @@ -156,13 +163,15 @@ def grid_data_with_crop_meta(self, farm_registry_group):
column('crop_stage_type_id'), column('planting_date'),
column('prev_growth_stage_id'),
column('prev_growth_stage_start_date'),
column('grid_id'), column('planting_date_epoch')
column('grid_id'), column('planting_date_epoch'),
column('grid_crop_key')
).distinct().select_from(subquery)
df = pd.read_sql_query(
sql_query,
con=self.conn_engine,
index_col=self.grid_id_index_col,
)
with self.conn_engine.connect() as conn:
df = pd.read_sql_query(
sql_query,
con=conn,
index_col=self.grid_id_index_col,
)
df['prev_growth_stage_id'] = (
df['prev_growth_stage_id'].astype('Int64')
)
Expand Down Expand Up @@ -192,7 +201,12 @@ def _farm_registry_subquery(self, farm_registry_group):
self.farmregistry.c.id.label('registry_id'),
(self.crop.c.name + '_' + self.cropstagetype.c.name).label('crop'),
self.country.c.iso_a3.label('iso_a3'),
self.country.c.id.label('country_id')
self.country.c.id.label('country_id'),
(
cast(self.crop.c.id, SqlString) + '_' +
cast(self.cropstagetype.c.id, SqlString) + '_' +
cast(self.grid.c.id, SqlString)
).label('grid_crop_key'),
).select_from(self.farmregistry).join(
self.farm, self.farmregistry.c.farm_id == self.farm.c.id
).join(
Expand Down Expand Up @@ -230,11 +244,12 @@ def farm_registry_meta(self, farm_registry_group, request_date):
subquery = subquery.subquery('farm_data')

sql_query = select(subquery)
df = pd.read_sql_query(
sql_query,
con=self.conn_engine,
index_col=self.farmregistry_id_index_col,
)
with self.conn_engine.connect() as conn:
df = pd.read_sql_query(
sql_query,
con=conn,
index_col=self.farmregistry_id_index_col,
)

df = df.assign(
date=pd.Timestamp(request_date),
Expand Down
44 changes: 41 additions & 3 deletions django_project/dcas/tests/test_partitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
_merge_partition_gdd_config,
process_partition_farm_registry,
process_partition_seasonal_precipitation,
process_partition_other_params
process_partition_other_params,
process_partition_growth_stage_precipitation
)


Expand Down Expand Up @@ -48,7 +49,8 @@ def test_process_partition_farm_registry(self, mock_read_grid_data):
'crop_stage_type_id': [1, 2],
'planting_date_epoch': [5, 6],
'farm_id': [3, 4],
'growth_stage_id': [1, 1]
'growth_stage_id': [1, 1],
'grid_crop_key': ['2_1_1', '10_2_2']
})

mock_read_grid_data.return_value = pd.DataFrame({
Expand All @@ -58,7 +60,8 @@ def test_process_partition_farm_registry(self, mock_read_grid_data):
'planting_date_epoch': [5, 6, 7],
'temperature': [9, 8, 7],
'__null_dask_index__': [0, 1, 2],
'planting_date': ['2025-01-01', '2025-01-05', '2025-01-05']
'planting_date': ['2025-01-01', '2025-01-05', '2025-01-05'],
'grid_crop_key': ['2_1_1', '10_2_2', '7_7_7']
})

result_df = process_partition_farm_registry(
Expand Down Expand Up @@ -155,3 +158,38 @@ def test_process_partition_other_params(self, mock_read_grid_data):
result_df['p_pet'],
pd.Series([15, 16], name='p_pet')
)

@patch('dcas.partitions.read_grid_data')
def test_process_partition_growth_precipitation(
self, mock_read_grid_data
):
"""Test process_partition_growth_stage_precipitation."""
epoch_list = [1, 2]
df = pd.DataFrame({
'grid_id': [1, 2, 1],
'growth_stage_start_date': [1, 2, 1]
})

mock_read_grid_data.return_value = pd.DataFrame({
'grid_id': [1, 2],
'total_rainfall_1': [10, 12],
'total_rainfall_2': [13, 14]
})

result_df = process_partition_growth_stage_precipitation(
df, 'test.parquet', epoch_list
)
mock_read_grid_data.assert_called_once()
self.assertEqual(result_df.shape[0], 3)
pd.testing.assert_series_equal(
result_df['grid_id'],
pd.Series([1, 2, 1], name='grid_id')
)
pd.testing.assert_series_equal(
result_df['growth_stage_precipitation'],
pd.Series(
[23, 14, 23],
name='growth_stage_precipitation',
dtype='float64'
)
)
Loading

0 comments on commit ac6cc22

Please sign in to comment.