From efa3ab4ddd57cf8f14e615825f7a73817657d147 Mon Sep 17 00:00:00 2001 From: Danang Date: Fri, 15 Nov 2024 10:16:38 +0000 Subject: [PATCH] Fix observation api response time (#261) * add index to measurement table * fix query measurements to_csv * refactor to_netcdf_stream * fix to json for observation data * update uwsgi harakiri value * fix tests * remove debug * update version --- deployment/docker/uwsgi.conf | 2 +- django_project/_version.txt | 2 +- django_project/gap/admin/preferences.py | 3 +- ...gap_measure_dataset_c4f74a_idx_and_more.py | 21 + .../0039_preferences_dask_threads_num_api.py | 18 + django_project/gap/models/measurement.py | 6 + django_project/gap/models/preferences.py | 8 + .../gap/providers/airborne_observation.py | 19 +- django_project/gap/providers/observation.py | 386 ++++++++++-------- .../gap/tests/providers/test_observation.py | 38 +- django_project/gap/tests/utils/test_netcdf.py | 5 +- django_project/gap/utils/dask.py | 10 +- django_project/gap/utils/reader.py | 96 ++++- .../gap_api/api_views/measurement.py | 2 +- .../tests/test_measurement_api_airborne.py | 20 +- 15 files changed, 400 insertions(+), 236 deletions(-) create mode 100644 django_project/gap/migrations/0038_measurement_gap_measure_dataset_c4f74a_idx_and_more.py create mode 100644 django_project/gap/migrations/0039_preferences_dask_threads_num_api.py diff --git a/deployment/docker/uwsgi.conf b/deployment/docker/uwsgi.conf index 68541f80..2b7d1e04 100644 --- a/deployment/docker/uwsgi.conf +++ b/deployment/docker/uwsgi.conf @@ -17,5 +17,5 @@ env = DJANGO_SETTINGS_MODULE=core.settings.prod #uid = 1000 #gid = 1000 memory-report = true -harakiri = 120 +harakiri = 600 diff --git a/django_project/_version.txt b/django_project/_version.txt index b0a12275..58682af4 100644 --- a/django_project/_version.txt +++ b/django_project/_version.txt @@ -1 +1 @@ -0.0.10 \ No newline at end of file +0.0.11 \ No newline at end of file diff --git a/django_project/gap/admin/preferences.py b/django_project/gap/admin/preferences.py index 5b4af85a..a811a1f4 100644 --- a/django_project/gap/admin/preferences.py +++ b/django_project/gap/admin/preferences.py @@ -60,8 +60,9 @@ class PreferencesAdmin(admin.ModelAdmin): } ), ( - 'Logging', { + 'API Config', { 'fields': ( + 'dask_threads_num_api', 'api_log_batch_size', ) } diff --git a/django_project/gap/migrations/0038_measurement_gap_measure_dataset_c4f74a_idx_and_more.py b/django_project/gap/migrations/0038_measurement_gap_measure_dataset_c4f74a_idx_and_more.py new file mode 100644 index 00000000..cd367760 --- /dev/null +++ b/django_project/gap/migrations/0038_measurement_gap_measure_dataset_c4f74a_idx_and_more.py @@ -0,0 +1,21 @@ +# Generated by Django 4.2.7 on 2024-11-14 19:48 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('gap', '0037_alter_collectorsession_ingestor_type_and_more'), + ] + + operations = [ + migrations.AddIndex( + model_name='measurement', + index=models.Index(fields=['dataset_attribute', 'date_time'], name='gap_measure_dataset_c4f74a_idx'), + ), + migrations.AddIndex( + model_name='measurement', + index=models.Index(fields=['station_history', 'dataset_attribute', 'date_time'], name='gap_measure_station_694206_idx'), + ), + ] diff --git a/django_project/gap/migrations/0039_preferences_dask_threads_num_api.py b/django_project/gap/migrations/0039_preferences_dask_threads_num_api.py new file mode 100644 index 00000000..6dd595f0 --- /dev/null +++ b/django_project/gap/migrations/0039_preferences_dask_threads_num_api.py @@ -0,0 +1,18 @@ +# Generated by Django 4.2.7 on 2024-11-15 06:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('gap', '0038_measurement_gap_measure_dataset_c4f74a_idx_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='preferences', + name='dask_threads_num_api', + field=models.IntegerField(default=2, help_text='Number of threads for dask parallel computation in API, higher number will use more memory.'), + ), + ] diff --git a/django_project/gap/models/measurement.py b/django_project/gap/models/measurement.py index 642bc2aa..e6f8d847 100644 --- a/django_project/gap/models/measurement.py +++ b/django_project/gap/models/measurement.py @@ -91,3 +91,9 @@ def __str__(self): class Meta: # noqa unique_together = ('station', 'dataset_attribute', 'date_time') + indexes = [ + models.Index(fields=['dataset_attribute', 'date_time']), + models.Index( + fields=['station_history', 'dataset_attribute', 'date_time'] + ), + ] diff --git a/django_project/gap/models/preferences.py b/django_project/gap/models/preferences.py index de5b6e45..2f4e4161 100644 --- a/django_project/gap/models/preferences.py +++ b/django_project/gap/models/preferences.py @@ -109,6 +109,14 @@ class Preferences(SingletonModel): ) ) + dask_threads_num_api = models.IntegerField( + default=2, + help_text=( + 'Number of threads for dask parallel computation in API, ' + 'higher number will use more memory.' + ) + ) + # ingestor config ingestor_config = models.JSONField( default=dict, diff --git a/django_project/gap/providers/airborne_observation.py b/django_project/gap/providers/airborne_observation.py index 27a33bc8..85139c57 100644 --- a/django_project/gap/providers/airborne_observation.py +++ b/django_project/gap/providers/airborne_observation.py @@ -8,6 +8,7 @@ from datetime import datetime +from django.db.models import F, QuerySet from django.contrib.gis.db.models.functions import Distance from django.contrib.gis.geos import Polygon, Point @@ -79,15 +80,23 @@ def _find_nearest_station_by_points(self): def get_measurements(self, start_date: datetime, end_date: datetime): """Return measurements.""" nearest_histories = self.get_nearest_stations() - if nearest_histories is None or len(nearest_histories) == 0: + if isinstance(nearest_histories, QuerySet): + nearest_histories = nearest_histories.filter( + date_time__gte=start_date, + date_time__lte=end_date + ) + if ( + nearest_histories is None or + self._get_count(nearest_histories) == 0 + ): return None - return Measurement.objects.select_related( - 'dataset_attribute', 'dataset_attribute__attribute', - 'station', 'station_history' + return Measurement.objects.annotate( + geom=F('station_history__geometry'), + alt=F('station_history__altitude') ).filter( date_time__gte=start_date, date_time__lte=end_date, dataset_attribute__in=self.attributes, station_history__in=nearest_histories - ).order_by('date_time', 'station', 'dataset_attribute') + ).order_by('date_time') diff --git a/django_project/gap/providers/observation.py b/django_project/gap/providers/observation.py index 1fd84efe..65e0cd49 100644 --- a/django_project/gap/providers/observation.py +++ b/django_project/gap/providers/observation.py @@ -5,31 +5,44 @@ .. note:: Observation Data Reader """ +import json +from _collections_abc import dict_values from datetime import datetime -import numpy as np import pandas as pd -import xarray as xr import tempfile -from django.db.models import Exists, OuterRef +from django.db.models import Exists, OuterRef, F, FloatField, QuerySet +from django.db.models.functions.datetime import TruncDate, TruncTime from django.contrib.gis.geos import Polygon, Point -from django.contrib.gis.db.models.functions import Distance -from typing import List, Tuple +from django.contrib.gis.db.models.functions import Distance, GeoFunc +from typing import List, Tuple, Union from gap.models import ( Dataset, DatasetAttribute, Station, - Measurement, - DatasetTimeStep, - DatasetObservationType + Measurement ) from gap.utils.reader import ( LocationInputType, DatasetReaderInput, - DatasetTimelineValue, BaseDatasetReader, DatasetReaderValue ) +from gap.utils.dask import execute_dask_compute + + +class ST_X(GeoFunc): + """Custom GeoFunc to extract lon.""" + + output_field = FloatField() + function = 'ST_X' + + +class ST_Y(GeoFunc): + """Custom GeoFunc to extract lat.""" + + output_field = FloatField() + function = 'ST_Y' class CSVBuffer: @@ -46,12 +59,13 @@ class ObservationReaderValue(DatasetReaderValue): date_variable = 'date' def __init__( - self, val: List[DatasetTimelineValue], + self, val: QuerySet, location_input: DatasetReaderInput, attributes: List[DatasetAttribute], start_date: datetime, end_date: datetime, - nearest_stations) -> None: + nearest_stations, + result_count) -> None: """Initialize ObservationReaderValue class. :param val: value that has been read @@ -61,160 +75,163 @@ def __init__( :param attributes: list of dataset attributes :type attributes: List[DatasetAttribute] """ - super().__init__(val, location_input, attributes) + super().__init__( + val, location_input, attributes, + result_count=result_count + ) self.start_date = start_date self.end_date = end_date self.nearest_stations = nearest_stations + self.attributes = sorted( + self.attributes, key=lambda x: x.attribute.id + ) - def to_csv_stream(self, suffix='.csv', separator=','): - """Generate csv bytes stream. + def _get_data_frame(self, use_separate_time_col=True) -> pd.DataFrame: + """Create a dataframe from query result. - :param suffix: file extension, defaults to '.csv' - :type suffix: str, optional - :param separator: separator, defaults to ',' - :type separator: str, optional - :yield: bytes of csv file - :rtype: bytes + :return: Data frame + :rtype: pd.DataFrame """ - dataset = self.attributes[0].dataset - time_col_exists = dataset.time_step != DatasetTimeStep.DAILY - alt_col_exists = ( - dataset.observation_type == - DatasetObservationType.UPPER_AIR_OBSERVATION + fields = { + 'date': ( + TruncDate('date_time') if use_separate_time_col else + F('date_time') + ), + 'loc_x': ST_X('geom'), + 'loc_y': ST_Y('geom'), + 'attr_id': F('dataset_attribute__attribute__id'), + } + field_index = [ + 'date' + ] + + # add time if time_step is not daily + if self.has_time_column and use_separate_time_col: + fields.update({ + 'time': TruncTime('date_time') + }) + field_index.append('time') + + # add lat and lon + field_index.extend(['loc_y', 'loc_x']) + + # add altitude if it's upper air observation + if self.has_altitude_column: + fields.update({ + 'loc_alt': F('alt') + }) + field_index.append('loc_alt') + + # annotate and select required fields only + measurements = self._val.annotate(**fields).values( + *(list(fields.keys()) + ['value']) ) + # Convert to DataFrame + df = pd.DataFrame(list(measurements)) + + # Pivot the data to make attributes columns + df = df.pivot_table( + index=field_index, + columns='attr_id', + values='value' + ).reset_index() + + # add other attributes + for attr in self.attributes: + if attr.attribute.id not in df.columns: + df[attr.attribute.id] = None + + # reorder columns + df = df[ + field_index + [attr.attribute.id for attr in self.attributes] + ] + + return df + + def _get_headers(self, use_separate_time_col=True): + """Get list of headers that allign with dataframce columns.""" headers = ['date'] # add time if time_step is not daily - if time_col_exists: + if self.has_time_column and use_separate_time_col: headers.append('time') # add lat and lon headers.extend(['lat', 'lon']) # add altitude if it's upper air observation - if alt_col_exists: + if self.has_altitude_column: headers.append('altitude') - # write headers + field_indices = [header for header in headers] + + # add headers for attr in self.attributes: headers.append(attr.attribute.variable_name) - yield bytes(','.join(headers) + '\n', 'utf-8') - for val in self.values: - data = [val.get_datetime_repr('%Y-%m-%d')] + return headers, field_indices + + def to_csv_stream(self, suffix='.csv', separator=','): + """Generate csv bytes stream. - # add time if time_step is not daily - if time_col_exists: - data.append(val.get_datetime_repr('%H:%M:%S')) + :param suffix: file extension, defaults to '.csv' + :type suffix: str, optional + :param separator: separator, defaults to ',' + :type separator: str, optional + :yield: bytes of csv file + :rtype: bytes + """ + headers, _ = self._get_headers() - # add lat and lon - data.extend([ - str(val.location.y), - str(val.location.x) - ]) + # write headers + yield bytes(','.join(headers) + '\n', 'utf-8') - # add altitude if it's upper air observation - if alt_col_exists: - data.append( - str(val.altitude) if val.altitude else '' - ) + # get dataframe + df_pivot = self._get_data_frame() - for attr in self.attributes: - var_name = attr.attribute.variable_name - if var_name in val.values: - data.append(str(val.values[var_name])) - else: - data.append('') - - # write row - yield bytes(','.join(data) + '\n', 'utf-8') - - def _get_date_array(self): - """Get date range from result values.""" - dataset = self.attributes[0].dataset - - first_dt = self.values[0].get_datetime() - last_dt = self.values[-1].get_datetime() - - if dataset.time_step == DatasetTimeStep.DAILY: - first_dt = first_dt.date() - last_dt = last_dt.date() - - return pd.date_range( - first_dt, last_dt, - freq=DatasetTimeStep.to_freq(dataset.time_step)) - - def _get_date_index( - self, date_array: pd.DatetimeIndex, datetime: datetime): - """Get date index from date_array.""" - dataset = self.attributes[0].dataset - dt = ( - datetime.replace(hour=0, minute=0, second=0, tzinfo=None) if - dataset.time_step == DatasetTimeStep.DAILY else datetime - ) - return date_array.get_loc(dt) + # Write the data in chunks + for start in range(0, len(df_pivot), self.csv_chunk_size): + chunk = df_pivot.iloc[start:start + self.csv_chunk_size] + yield chunk.to_csv(index=False, header=False, float_format='%g') def to_netcdf_stream(self): """Generate NetCDF.""" - # create date array - date_array = self._get_date_array() - - # sort lat and lon array - lat_array = set() - lon_array = set() - station: Station - for station in self.nearest_stations: - x = round(station.geometry.x, 5) - y = round(station.geometry.y, 5) - lon_array.add(x) - lat_array.add(y) - lat_array = sorted(lat_array) - lon_array = sorted(lon_array) - lat_array = pd.Index(lat_array, dtype='float64') - lon_array = pd.Index(lon_array, dtype='float64') - - # define the data variables - data_vars = {} - empty_shape = (len(date_array), len(lat_array), len(lon_array)) - for attr in self.attributes: - var = attr.attribute.variable_name - data_vars[var] = ( - ['date', 'lat', 'lon'], - np.empty(empty_shape) - ) + time_col_exists = self.has_time_column - # create the dataset - ds = xr.Dataset( - data_vars=data_vars, - coords={ - 'date': ('date', date_array), - 'lat': ('lat', lat_array), - 'lon': ('lon', lon_array) - } + # if time column exists, in netcdf we should use datetime + # instead of separating the date and time columns + headers, field_indices = self._get_headers( + use_separate_time_col=not time_col_exists ) - # assign values to the dataset - for val in self.values: - date_idx = self._get_date_index(date_array, val.get_datetime()) - loc = val.location - lat_idx = lat_array.get_loc(round(loc.y, 5)) - lon_idx = lon_array.get_loc(round(loc.x, 5)) + # get dataframe + df_pivot = self._get_data_frame( + use_separate_time_col=not time_col_exists + ) - for attr in self.attributes: - var_name = attr.attribute.variable_name - if var_name not in val.values: - continue - ds[var_name][date_idx, lat_idx, lon_idx] = ( - val.values[var_name] - ) + # rename columns + if time_col_exists: + headers[0] = 'time' + field_indices[0] = 'time' + df_pivot.columns = headers + + # convert date/datetime objects + date_coord = 'date' if not time_col_exists else 'time' + df_pivot[date_coord] = pd.to_datetime(df_pivot[date_coord]) + + # Convert to xarray Dataset + ds = df_pivot.set_index(field_indices).to_xarray() # write to netcdf with ( tempfile.NamedTemporaryFile( suffix=".nc", delete=True, delete_on_close=False) ) as tmp_file: - ds.to_netcdf( - tmp_file.name, format='NETCDF4', engine='netcdf4') + x = ds.to_netcdf( + tmp_file.name, format='NETCDF4', engine='netcdf4', + compute=False + ) + execute_dask_compute(x, is_api=True) with open(tmp_file.name, 'rb') as f: while True: chunk = f.read(self.chunk_size_in_bytes) @@ -222,6 +239,41 @@ def to_netcdf_stream(self): break yield chunk + def _to_dict(self) -> dict: + """Convert into dict. + + :return: Dictionary of metadata and data + :rtype: dict + """ + if ( + self.location_input is None or self._val is None or + self.count() == 0 + ): + return {} + + has_altitude = self.has_altitude_column + output = { + 'geometry': json.loads(self.location_input.geometry.json), + 'data': [] + } + + # get dataframe + df_pivot = self._get_data_frame( + use_separate_time_col=False + ) + for _, row in df_pivot.iterrows(): + values = {} + for attr in self.attributes: + values[attr.attribute.variable_name] = row[attr.attribute.id] + output['data'].append({ + 'datetime': row['date'].isoformat(timespec='seconds'), + 'values': values + }) + if has_altitude: + output['altitude'] = row['loc_alt'] + + return output + class ObservationDatasetReader(BaseDatasetReader): """Class to read observation ground observation data.""" @@ -249,9 +301,22 @@ def __init__( dataset, attributes, location_input, start_date, end_date, altitudes=altitudes ) - self.results: List[DatasetTimelineValue] = [] + self.results: QuerySet = QuerySet.none + self.result_count = 0 self.nearest_stations = None + def _get_count(self, values: Union[List, QuerySet]) -> int: + """Get count of a list of queryset. + + :param values: List or QuerySet object + :type values: Union[List, QuerySet] + :return: count + :rtype: int + """ + if isinstance(values, (list, dict_values,)): + return len(values) + return values.count() + def query_by_altitude(self, qs): """Query by altitude.""" altitudes = self.altitudes @@ -334,17 +399,21 @@ def get_nearest_stations(self): def get_measurements(self, start_date: datetime, end_date: datetime): """Return measurements data.""" self.nearest_stations = self.get_nearest_stations() - if self.nearest_stations is None or len(self.nearest_stations) == 0: + if ( + self.nearest_stations is None or + self._get_count(self.nearest_stations) == 0 + ): return - return Measurement.objects.select_related( - 'dataset_attribute', 'dataset_attribute__attribute', - 'station', 'station_history' + + return Measurement.objects.annotate( + geom=F('station__geometry'), + alt=F('station__altitude') ).filter( date_time__gte=start_date, date_time__lte=end_date, dataset_attribute__in=self.attributes, station__in=self.nearest_stations - ).order_by('date_time', 'station', 'dataset_attribute') + ).order_by('date_time') def read_historical_data(self, start_date: datetime, end_date: datetime): """Read historical data from dataset. @@ -355,53 +424,11 @@ def read_historical_data(self, start_date: datetime, end_date: datetime): :type end_date: datetime """ measurements = self.get_measurements(start_date, end_date) - if measurements is None or measurements.count() == 0: + self.result_count = measurements.count() + if measurements is None or self.result_count == 0: return - # final result, group by datetime - self.results = [] - - iter_dt = None - iter_loc = None - iter_alt = None - # group by location and date_time - dt_loc_val = {} - for measurement in measurements: - # if it has history, use history location - station_history = measurement.station_history - if station_history: - measurement_loc = station_history.geometry - measurement_alt = station_history.altitude - else: - measurement_loc = measurement.station.geometry - measurement_alt = measurement.station.altitude - - if iter_dt is None: - iter_dt = measurement.date_time - iter_loc = measurement_loc - iter_alt = measurement_alt - elif ( - iter_loc != measurement_loc or - iter_dt != measurement.date_time or - iter_alt != measurement_alt - ): - self.results.append( - DatasetTimelineValue( - iter_dt, dt_loc_val, iter_loc, iter_alt - ) - ) - iter_dt = measurement.date_time - iter_loc = measurement_loc - iter_alt = measurement_alt - dt_loc_val = {} - dt_loc_val[ - measurement.dataset_attribute.attribute.variable_name - ] = measurement.value - self.results.append( - DatasetTimelineValue( - iter_dt, dt_loc_val, iter_loc, iter_alt - ) - ) + self.results = measurements def get_data_values(self) -> DatasetReaderValue: """Fetch data values from dataset. @@ -411,5 +438,6 @@ def get_data_values(self) -> DatasetReaderValue: """ return ObservationReaderValue( self.results, self.location_input, self.attributes, - self.start_date, self.end_date, self.nearest_stations + self.start_date, self.end_date, self.nearest_stations, + self.result_count ) diff --git a/django_project/gap/tests/providers/test_observation.py b/django_project/gap/tests/providers/test_observation.py index fc47ac02..a875b316 100644 --- a/django_project/gap/tests/providers/test_observation.py +++ b/django_project/gap/tests/providers/test_observation.py @@ -26,8 +26,7 @@ ) from gap.utils.reader import ( DatasetReaderInput, - LocationInputType, - DatasetTimelineValue + LocationInputType ) @@ -137,7 +136,7 @@ def test_read_historical_data_empty(self): datetime(2010, 11, 1, 0, 0, 0), datetime(2010, 11, 1, 0, 0, 0)) data_value = reader.get_data_values() - self.assertEqual(len(data_value._val), 0) + self.assertEqual(data_value.count(), 0) def test_read_historical_data_by_point(self): """Test read by stations that has same point for different dataset.""" @@ -174,8 +173,9 @@ def test_read_historical_data_by_point(self): data_value = reader.get_data_values() # should return above measurement self.assertEqual(len(data_value._val), 1) + dict_value = data_value.to_json() self.assertEqual( - data_value.values[0].values['surface_air_temperature'], + dict_value['data'][0]['values']['surface_air_temperature'], measurement.value ) @@ -217,16 +217,30 @@ def test_read_historical_data_multiple_locations(self): def test_observation_to_netcdf_stream(self): """Test convert observation value to netcdf stream.""" - val = DatasetTimelineValue( - self.start_date, - { - self.dataset_attr.attribute.variable_name: 20 - }, - self.location_input.point + dt1 = datetime(2019, 11, 1, 0, 0, 0) + dt2 = datetime(2019, 11, 2, 0, 0, 0) + MeasurementFactory.create( + station=self.station, + dataset_attribute=self.dataset_attr, + date_time=dt1, + value=100 + ) + MeasurementFactory.create( + station=self.station, + dataset_attribute=self.dataset_attr, + date_time=dt2, + value=200 ) + reader = ObservationDatasetReader( + self.dataset, [self.dataset_attr], self.location_input, + dt1, dt2 + ) + qs = reader.get_measurements(dt1, dt2) reader_value = ObservationReaderValue( - [val], self.location_input, [self.dataset_attr], - self.start_date, self.end_date, [self.station]) + qs, self.location_input, [self.dataset_attr], + self.start_date, self.end_date, [self.station], + qs.count() + ) d = reader_value.to_netcdf_stream() res = list(d) self.assertIsNotNone(res) diff --git a/django_project/gap/tests/utils/test_netcdf.py b/django_project/gap/tests/utils/test_netcdf.py index 375bfb28..d912a6e8 100644 --- a/django_project/gap/tests/utils/test_netcdf.py +++ b/django_project/gap/tests/utils/test_netcdf.py @@ -261,8 +261,9 @@ def test_to_csv_stream(self): csv_stream = self.dataset_reader_value_xr.to_csv_stream() csv_data = list(csv_stream) self.assertIsNotNone(csv_data) - data = str(csv_data[0], encoding='utf-8').splitlines() - self.assertEqual(len(data), 41) + data = csv_data[1].splitlines() + # rows without header + self.assertEqual(len(data), 40) def test_to_netcdf_stream(self): """Test convert to netcdf.""" diff --git a/django_project/gap/utils/dask.py b/django_project/gap/utils/dask.py index b2f8e919..c3ff6a20 100644 --- a/django_project/gap/utils/dask.py +++ b/django_project/gap/utils/dask.py @@ -12,13 +12,19 @@ from gap.models import Preferences -def execute_dask_compute(x: Delayed): +def execute_dask_compute(x: Delayed, is_api=False): """Execute dask computation based on number of threads config. :param x: Dask delayed object :type x: Delayed + :param is_api: Whether the computation is in GAP API, default to False + :type is_api: bool """ - num_of_threads = Preferences.load().dask_threads_num + preferences = Preferences.load() + num_of_threads = ( + preferences.dask_threads_num_api if is_api else + preferences.dask_threads_num + ) if num_of_threads <= 0: # use everything x.compute() diff --git a/django_project/gap/utils/reader.py b/django_project/gap/utils/reader.py index d7c07cdc..f612b85d 100644 --- a/django_project/gap/utils/reader.py +++ b/django_project/gap/utils/reader.py @@ -12,6 +12,7 @@ import numpy as np import pytz +from django.db.models import QuerySet from django.contrib.gis.geos import ( Point, Polygon, MultiPolygon, GeometryCollection, MultiPoint, GEOSGeometry ) @@ -22,8 +23,11 @@ Attribute, Unit, Dataset, - DatasetAttribute + DatasetAttribute, + DatasetTimeStep, + DatasetObservationType ) +from gap.utils.dask import execute_dask_compute class DatasetVariable: @@ -263,15 +267,16 @@ class DatasetReaderValue: date_variable = 'date' chunk_size_in_bytes = 81920 # 80KB chunks + csv_chunk_size = 50000 def __init__( - self, val: Union[xrDataset, List[DatasetTimelineValue]], + self, val: Union[xrDataset, List[DatasetTimelineValue], QuerySet], location_input: DatasetReaderInput, - attributes: List[DatasetAttribute]) -> None: + attributes: List[DatasetAttribute], result_count = None) -> None: """Initialize DatasetReaderValue class. :param val: value that has been read - :type val: Union[xrDataset, List[DatasetTimelineValue]] + :type val: Union[xrDataset, List[DatasetTimelineValue], QuerySet] :param location_input: location input query :type location_input: DatasetReaderInput :param attributes: list of dataset attributes @@ -281,19 +286,31 @@ def __init__( self._is_xr_dataset = isinstance(val, xrDataset) self.location_input = location_input self.attributes = attributes + self._result_count = result_count self._post_init() def _post_init(self): """Rename source variable into attribute name.""" - if self.is_empty(): - return if not self._is_xr_dataset: return + if self.is_empty(): + return renamed_dict = {} for attr in self.attributes: renamed_dict[attr.source] = attr.attribute.variable_name self._val = self._val.rename(renamed_dict) + def _get_dataset(self) -> Dataset: + """Get dataset from attribute. + + :return: dataset object + :rtype: Dataset + """ + if len(self.attributes) > 0: + return self.attributes[0].dataset + + return None + @property def xr_dataset(self) -> xrDataset: """Return the value as xarray Dataset. @@ -312,6 +329,41 @@ def values(self) -> List[DatasetTimelineValue]: """ return self._val + @property + def has_time_column(self) -> bool: + """Check if the output has time column. + + :return: True if time column should exist + :rtype: bool + """ + dataset = self._get_dataset() + + return ( + dataset.time_step != DatasetTimeStep.DAILY if + dataset else False + ) + + @property + def has_altitude_column(self) -> bool: + """Check if the output has altitude column. + + :return: True if altitude column should exist + :rtype: bool + """ + dataset = self._get_dataset() + + return ( + dataset.observation_type == + DatasetObservationType.UPPER_AIR_OBSERVATION if + dataset else False + ) + + def count(self): + """Return the count for QuerySet.""" + if self._result_count is not None: + return self._result_count + return len(self.values) + def is_empty(self) -> bool: """Check if value is empty. @@ -320,7 +372,8 @@ def is_empty(self) -> bool: """ if self._val is None: return True - return len(self.values) == 0 + + return self.count() == 0 def _to_dict(self) -> dict: """Convert into dict. @@ -378,8 +431,11 @@ def to_netcdf_stream(self): tempfile.NamedTemporaryFile( suffix=".nc", delete=True, delete_on_close=False) ) as tmp_file: - self.xr_dataset.to_netcdf( - tmp_file.name, format='NETCDF4', engine='h5netcdf') + x = self.xr_dataset.to_netcdf( + tmp_file.name, format='NETCDF4', engine='h5netcdf', + compute=False + ) + execute_dask_compute(x) with open(tmp_file.name, 'rb') as f: while True: chunk = f.read(self.chunk_size_in_bytes) @@ -412,19 +468,15 @@ def to_csv_stream(self, suffix='.csv', separator=','): # reordered_cols.insert(0, 'ensemble') df = self.xr_dataset.to_dataframe(dim_order=dim_order) df_reordered = df[reordered_cols] - with ( - tempfile.NamedTemporaryFile( - suffix=suffix, delete=True, delete_on_close=False) - ) as tmp_file: - df_reordered.to_csv( - tmp_file.name, index=True, header=True, sep=separator, - mode='w', lineterminator='\n', float_format='%.4f') - with open(tmp_file.name, 'rb') as f: - while True: - chunk = f.read(self.chunk_size_in_bytes) - if not chunk: - break - yield chunk + + # write headers + headers = dim_order + list(df_reordered.columns) + yield bytes(','.join(headers) + '\n', 'utf-8') + + # Write the data in chunks + for start in range(0, len(df_reordered), self.csv_chunk_size): + chunk = df_reordered.iloc[start:start + self.csv_chunk_size] + yield chunk.to_csv(index=True, header=False, float_format='%g') class BaseDatasetReader: diff --git a/django_project/gap_api/api_views/measurement.py b/django_project/gap_api/api_views/measurement.py index 292798f2..f77eedf1 100644 --- a/django_project/gap_api/api_views/measurement.py +++ b/django_project/gap_api/api_views/measurement.py @@ -494,7 +494,7 @@ def get_response_data(self) -> Response: ) dataset_attributes = DatasetAttribute.objects.select_related( - 'dataset' + 'dataset', 'attribute' ).filter( attribute__in=attributes, dataset__is_internal_use=False, diff --git a/django_project/gap_api/tests/test_measurement_api_airborne.py b/django_project/gap_api/tests/test_measurement_api_airborne.py index 8c6183f1..a2bfa2f7 100644 --- a/django_project/gap_api/tests/test_measurement_api_airborne.py +++ b/django_project/gap_api/tests/test_measurement_api_airborne.py @@ -145,7 +145,7 @@ def test_read_point(self): self.assertEqual(response.status_code, 200) results = response.data['results'] self.assertEqual(len(results), 1) - self.assertEqual(results[0]['geometry']['coordinates'], [0.0, 0.0]) + self.assertEqual(results[0]['geometry']['coordinates'], [0, 0]) self.assertEqual(results[0]['altitude'], 1) self.assertEqual(len(results[0]['data']), 1) self.assertEqual( @@ -153,7 +153,7 @@ def test_read_point(self): ) self.assertEqual( results[0]['data'][0]['values'], - {'atmospheric_pressure': 100.0, 'temperature': 1.0} + {'atmospheric_pressure': 100, 'temperature': 1} ) # Getting lat lon 10,10 @@ -168,7 +168,7 @@ def test_read_point(self): self.assertEqual(response.status_code, 200) results = response.data['results'] self.assertEqual(len(results), 1) - self.assertEqual(results[0]['geometry']['coordinates'], [10.0, 10.0]) + self.assertEqual(results[0]['geometry']['coordinates'], [10, 10]) self.assertEqual(results[0]['altitude'], 2) self.assertEqual(len(results[0]['data']), 1) self.assertEqual( @@ -176,7 +176,7 @@ def test_read_point(self): ) self.assertEqual( results[0]['data'][0]['values'], - {'atmospheric_pressure': 200.0, 'temperature': 2.0} + {'atmospheric_pressure': 200, 'temperature': 2} ) def read_csv(self, response): @@ -221,12 +221,12 @@ def test_read_with_bbox(self): self.assertEqual(len(rows), 2) self.assertEqual( rows[0], - ['2000-02-01', '00:00:00', '10.0', '10.0', '2.0', '200.0', '2.0'] + ['2000-02-01', '00:00:00', '10', '10', '2', '200', '2'] ) self.assertEqual( rows[1], - ['2000-03-01', '00:00:00', '100.0', '100.0', '3.0', '300.0', - '3.0'] + ['2000-03-01', '00:00:00', '100', '100', '3', '300', + '3'] ) # Second request @@ -249,7 +249,7 @@ def test_read_with_bbox(self): self.assertEqual(len(rows), 1) self.assertEqual( rows[0], - ['2000-02-01', '00:00:00', '10.0', '10.0', '2.0', '200.0', '2.0'] + ['2000-02-01', '00:00:00', '10', '10', '2', '200', '2'] ) # Return data with altitude between 1.5-5, should return history 2 & 3 @@ -273,11 +273,11 @@ def test_read_with_bbox(self): self.assertEqual(len(rows), 2) self.assertEqual( rows[0], - ['2000-02-01', '00:00:00', '10.0', '10.0', '2.0', '200.0', '2.0'] + ['2000-02-01', '00:00:00', '10', '10', '2', '200', '2'] ) self.assertEqual( rows[1], - ['2000-03-01', '00:00:00', '100.0', '100.0', '3.0', '300.0', '3.0'] + ['2000-03-01', '00:00:00', '100', '100', '3', '300', '3'] ) def test_read_to_netcdf(self):