From 404fe834488537ea2f83ada6f4d0e0923278089e Mon Sep 17 00:00:00 2001 From: Danang Date: Mon, 9 Dec 2024 07:26:41 +0000 Subject: [PATCH] use x accel redirect on django with nginx (#294) * use x accel redirect on django with nginx * add user file model and uploader config * add task to clean UserFile * fix store to object storage * fix test * add tests * fix failed tests * fix test * fix user file path * add missing env to test yml --- deployment/docker-compose.test.yml | 2 + deployment/docker/Dockerfile | 1 + deployment/nginx/sites-enabled/default.conf | 32 ++ django_project/core/celery.py | 7 +- django_project/core/settings/project.py | 12 + django_project/core/tests/runner.py | 4 + django_project/gap/admin/preferences.py | 2 + ...ences_api_use_x_accel_redirect_and_more.py | 24 ++ django_project/gap/models/preferences.py | 26 ++ django_project/gap/providers/observation.py | 71 ++++- django_project/gap/utils/reader.py | 146 ++++++++- django_project/gap_api/admin.py | 18 +- .../gap_api/api_views/measurement.py | 183 +++++++++-- django_project/gap_api/apps.py | 2 +- django_project/gap_api/factories.py | 24 +- .../gap_api/migrations/0005_userfile.py | 28 ++ django_project/gap_api/models/__init__.py | 1 + django_project/gap_api/models/user_file.py | 99 ++++++ django_project/gap_api/tasks/__init__.py | 2 +- django_project/gap_api/tasks/cleanup.py | 28 ++ .../gap_api/tasks/cleanup_location.py | 19 -- .../gap_api/tests/test_location_api.py | 2 +- .../gap_api/tests/test_measurement_api.py | 10 +- .../gap_api/tests/test_user_file.py | 300 ++++++++++++++++++ 24 files changed, 973 insertions(+), 70 deletions(-) create mode 100644 django_project/gap/migrations/0041_preferences_api_use_x_accel_redirect_and_more.py create mode 100644 django_project/gap_api/migrations/0005_userfile.py create mode 100644 django_project/gap_api/models/user_file.py create mode 100644 django_project/gap_api/tasks/cleanup.py delete mode 100644 django_project/gap_api/tasks/cleanup_location.py create mode 100644 django_project/gap_api/tests/test_user_file.py diff --git a/deployment/docker-compose.test.yml b/deployment/docker-compose.test.yml index 5e66988e..42d736a7 100644 --- a/deployment/docker-compose.test.yml +++ b/deployment/docker-compose.test.yml @@ -64,6 +64,8 @@ services: - MINIO_AWS_ENDPOINT_URL=http://minio:9000/ - MINIO_AWS_BUCKET_NAME=tomorrownow - MINIO_AWS_DIR_PREFIX=dev/media + - MINIO_GAP_AWS_BUCKET_NAME=tngap-products + - MINIO_GAP_AWS_DIR_PREFIX=dev entrypoint: [ ] ports: # for django test server diff --git a/deployment/docker/Dockerfile b/deployment/docker/Dockerfile index 22e41042..501eb314 100644 --- a/deployment/docker/Dockerfile +++ b/deployment/docker/Dockerfile @@ -50,6 +50,7 @@ RUN groupadd --gid $USER_GID $USERNAME \ # ******************************************************** # * Anything else you want to do like clean up goes here * # ******************************************************** +RUN apt-get update && apt-get install -y gnupg2 # [Optional] Set the default user. Omit if you want to keep the default as root. USER $USERNAME diff --git a/deployment/nginx/sites-enabled/default.conf b/deployment/nginx/sites-enabled/default.conf index 9e2541c5..78202a88 100644 --- a/deployment/nginx/sites-enabled/default.conf +++ b/deployment/nginx/sites-enabled/default.conf @@ -49,6 +49,38 @@ server { expires 21d; # cache for 21 days } + location ~ ^/userfiles/(.*?)/(.*?)/(.*) { + internal; + # How to resove remote URLs, you may want to update this depending + # on your setup, in our case it’s inside a Docker container with + # dnsmasq running. + resolver 127.0.0.11 ipv6=off; + + # Extract the remote URL parts + set $download_protocol $1; + set $download_host $2; + set $download_path $3; + # Reconstruct the remote URL + set $download_url $download_protocol://$download_host/$download_path; + # Headers for the remote server, unset Authorization and Cookie for security reasons. + proxy_set_header Host $download_host; + proxy_set_header Authorization ''; + proxy_set_header Cookie ''; + # Headers for the response, by using $upstream_http_... here we can inject + # other headers from Django, proxy_hide_header ensures the header from the + # remote server isn't passed through. + proxy_hide_header Content-Disposition; + add_header Content-Disposition $upstream_http_content_disposition; + # Stops the local disk from being written to (just forwards data through) + proxy_max_temp_file_size 0; + # Set timeout, the remote URL should have at least this much + proxy_read_timeout 3600s; + proxy_send_timeout 3600s; + # Proxy the remote file through to the client + proxy_pass $download_url$is_args$args; + + } + # Finally, send all non-media requests to the Django server. location / { uwsgi_pass django; diff --git a/django_project/core/celery.py b/django_project/core/celery.py index e6896816..a6765986 100644 --- a/django_project/core/celery.py +++ b/django_project/core/celery.py @@ -86,7 +86,12 @@ 'task': 'cleanup_user_locations', # Run every week at 00:00 UTC 'schedule': crontab(minute='0', hour='0', day_of_week='0'), - } + }, + 'cleanup-user-files': { + 'task': 'cleanup_user_files', + # Run everyday at 00:15 UTC + 'schedule': crontab(minute='15', hour='00'), + }, } diff --git a/django_project/core/settings/project.py b/django_project/core/settings/project.py index 465d2934..6b348744 100644 --- a/django_project/core/settings/project.py +++ b/django_project/core/settings/project.py @@ -72,6 +72,18 @@ "django.contrib.staticfiles.storage.ManifestStaticFilesStorage" ) }, + "gap_products": { + "BACKEND": "storages.backends.s3.S3Storage", + "OPTIONS": { + "access_key": MINIO_AWS_ACCESS_KEY_ID, + "secret_key": MINIO_AWS_SECRET_ACCESS_KEY, + "bucket_name": os.environ.get("MINIO_GAP_AWS_BUCKET_NAME"), + "file_overwrite": False, + "max_memory_size": 500 * MB, # 500MB + "transfer_config": AWS_TRANSFER_CONFIG, + "endpoint_url": MINIO_AWS_ENDPOINT_URL + }, + } } STORAGE_DIR_PREFIX = os.environ.get("MINIO_AWS_DIR_PREFIX", "media") diff --git a/django_project/core/tests/runner.py b/django_project/core/tests/runner.py index 719cbad7..a47c8e68 100644 --- a/django_project/core/tests/runner.py +++ b/django_project/core/tests/runner.py @@ -5,6 +5,7 @@ .. note:: Test runner. """ +import os from django.conf import settings from django.test.runner import DiscoverRunner @@ -33,4 +34,7 @@ def setup_test_environment(self, **kwargs): """Prepare test env.""" CustomTestRunner.__disable_celery() create_s3_bucket(settings.MINIO_AWS_BUCKET_NAME) + create_s3_bucket( + os.environ.get("MINIO_GAP_AWS_BUCKET_NAME", "tngap-products") + ) super(CustomTestRunner, self).setup_test_environment(**kwargs) diff --git a/django_project/gap/admin/preferences.py b/django_project/gap/admin/preferences.py index a811a1f4..c25095f1 100644 --- a/django_project/gap/admin/preferences.py +++ b/django_project/gap/admin/preferences.py @@ -64,6 +64,8 @@ class PreferencesAdmin(admin.ModelAdmin): 'fields': ( 'dask_threads_num_api', 'api_log_batch_size', + 'api_use_x_accel_redirect', + 'user_file_uploader_config' ) } ), diff --git a/django_project/gap/migrations/0041_preferences_api_use_x_accel_redirect_and_more.py b/django_project/gap/migrations/0041_preferences_api_use_x_accel_redirect_and_more.py new file mode 100644 index 00000000..1871c46a --- /dev/null +++ b/django_project/gap/migrations/0041_preferences_api_use_x_accel_redirect_and_more.py @@ -0,0 +1,24 @@ +# Generated by Django 4.2.7 on 2024-12-08 15:55 + +from django.db import migrations, models +import gap.models.preferences + + +class Migration(migrations.Migration): + + dependencies = [ + ('gap', '0040_datasourcefilecache_size'), + ] + + operations = [ + migrations.AddField( + model_name='preferences', + name='api_use_x_accel_redirect', + field=models.BooleanField(default=True, help_text='When set to True, Django will send X-Accel-Redirect header to the NGINX to offload the download process to NGINX.'), + ), + migrations.AddField( + model_name='preferences', + name='user_file_uploader_config', + field=models.JSONField(blank=True, default=gap.models.preferences.user_file_uploader_config_default, help_text='Config for fsspec uploader to s3 for UserFile.', null=True), + ), + ] diff --git a/django_project/gap/models/preferences.py b/django_project/gap/models/preferences.py index 2f4e4161..f23e4ad3 100644 --- a/django_project/gap/models/preferences.py +++ b/django_project/gap/models/preferences.py @@ -59,6 +59,15 @@ def crop_plan_config_default() -> dict: } +def user_file_uploader_config_default() -> dict: + """Return dictionary for user file uploader config.""" + return { + 'max_concurrency': 2, + # upload chunk size to 500 MB + 'default_block_size': 500 * 1024 * 1024 + } + + class Preferences(SingletonModel): """Preference settings specifically for gap.""" @@ -134,6 +143,23 @@ class Preferences(SingletonModel): help_text='Number of API Request logs to be saved in a batch.' ) + # api use x-accel-redirect + api_use_x_accel_redirect = models.BooleanField( + default=True, + help_text=( + 'When set to True, Django will send X-Accel-Redirect header ' + 'to the NGINX to offload the download process to NGINX.' + ) + ) + + # UserFile Uploader s3 config + user_file_uploader_config = models.JSONField( + default=user_file_uploader_config_default, + blank=True, + null=True, + help_text='Config for fsspec uploader to s3 for UserFile.' + ) + class Meta: # noqa: D106 verbose_name_plural = "preferences" diff --git a/django_project/gap/providers/observation.py b/django_project/gap/providers/observation.py index 57b8bffe..f994c5a4 100644 --- a/django_project/gap/providers/observation.py +++ b/django_project/gap/providers/observation.py @@ -15,6 +15,7 @@ from django.contrib.gis.geos import Polygon, Point from django.contrib.gis.db.models.functions import Distance, GeoFunc from typing import List, Tuple, Union +from fsspec.core import OpenFile from gap.models import ( Dataset, @@ -195,20 +196,54 @@ def to_csv_stream(self, suffix='.csv', separator=','): :rtype: bytes """ headers, _ = self._get_headers(use_station_id=True) - - # write headers - yield bytes(','.join(headers) + '\n', 'utf-8') - # get dataframe df_pivot = self._get_data_frame(use_station_id=True) + # write headers + yield bytes(separator.join(headers) + '\n', 'utf-8') + # Write the data in chunks for start in range(0, len(df_pivot), self.csv_chunk_size): chunk = df_pivot.iloc[start:start + self.csv_chunk_size] - yield chunk.to_csv(index=False, header=False, float_format='%g') + yield chunk.to_csv( + index=False, header=False, float_format='%g', + sep=separator + ) - def to_netcdf_stream(self): - """Generate NetCDF.""" + def to_csv(self, suffix='.csv', separator=',', **kwargs): + """Generate csv file to object storage.""" + headers, _ = self._get_headers(use_station_id=True) + + # get dataframe + df_pivot = self._get_data_frame(use_station_id=True) + + outfile: OpenFile = None + outfile, output = self._get_fsspec_remote_url( + suffix, mode='w', **kwargs + ) + + with outfile as tmp_file: + # write headers + write_headers = True + + # Write the data in chunks + for start in range(0, len(df_pivot), self.csv_chunk_size): + chunk = df_pivot.iloc[start:start + self.csv_chunk_size] + if write_headers: + chunk.columns = headers + + chunk.to_csv( + tmp_file.name, index=False, header=write_headers, + float_format='%g', + sep=separator, mode='a' + ) + + if write_headers: + write_headers = False + + return output + + def _get_xarray_dataset(self): time_col_exists = self.has_time_column # if time column exists, in netcdf we should use datetime @@ -234,7 +269,12 @@ def to_netcdf_stream(self): # Convert to xarray Dataset df_pivot.set_index(field_indices, inplace=True) - ds = df_pivot.to_xarray() + + return df_pivot.to_xarray() + + def to_netcdf_stream(self): + """Generate NetCDF.""" + ds = self._get_xarray_dataset() # write to netcdf with ( @@ -253,6 +293,21 @@ def to_netcdf_stream(self): break yield chunk + def to_netcdf(self, **kwargs): + """Generate netcdf file to object storage.""" + ds = self._get_xarray_dataset() + + outfile, output = self._get_fsspec_remote_url('.nc', **kwargs) + + with outfile as tmp_file: + x = ds.to_netcdf( + tmp_file.name, format='NETCDF4', engine='h5netcdf', + compute=False + ) + execute_dask_compute(x) + + return output + def _to_dict(self) -> dict: """Convert into dict. diff --git a/django_project/gap/utils/reader.py b/django_project/gap/utils/reader.py index 05f1e17b..ce4b0355 100644 --- a/django_project/gap/utils/reader.py +++ b/django_project/gap/utils/reader.py @@ -5,12 +5,15 @@ .. note:: Helper for reading dataset """ +import os import json import tempfile import dask +import uuid from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from typing import Union, List, Tuple +import fsspec import numpy as np import pytz @@ -427,6 +430,55 @@ def to_json(self) -> dict: return self._xr_dataset_to_dict() return self._to_dict() + def _get_s3_variables(self) -> dict: + """Get s3 env variables for product bucket. + + :return: Dictionary of S3 env vars + :rtype: dict + """ + prefix = 'MINIO' + keys = [ + 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY', + 'AWS_ENDPOINT_URL', 'AWS_REGION_NAME' + ] + results = {} + for key in keys: + results[key] = os.environ.get(f'{prefix}_{key}', '') + results['AWS_BUCKET_NAME'] = os.environ.get( + 'MINIO_GAP_AWS_BUCKET_NAME', '') + results['AWS_DIR_PREFIX'] = os.environ.get( + 'MINIO_GAP_AWS_DIR_PREFIX', '') + return results + + def _get_fsspec_remote_url(self, suffix, mode='wb', **kwargs): + # s3 variables to product bucket + s3 = self._get_s3_variables() + + output_url = ( + f's3://{s3["AWS_BUCKET_NAME"]}/{s3["AWS_DIR_PREFIX"]}' + ) + if not output_url.endswith('/'): + output_url += '/' + output_url += f'user_data/{uuid.uuid4().hex}{suffix}' + + outfile = fsspec.open( + f'simplecache::{output_url}', + mode=mode, + s3={ + 'key': s3.get('AWS_ACCESS_KEY_ID'), + 'secret': s3.get('AWS_SECRET_ACCESS_KEY'), + 'endpoint_url': s3.get('AWS_ENDPOINT_URL'), + # 'region_name': s3.get('AWS_REGION_NAME'), + 'max_concurrency': kwargs.get('max_concurrency', 2), + 'default_block_size': kwargs.get( + 'default_block_size', + 200 * 1024 * 1024 + ) + } + ) + + return outfile, output_url + def to_netcdf_stream(self): """Generate netcdf stream.""" with ( @@ -445,6 +497,19 @@ def to_netcdf_stream(self): break yield chunk + def to_netcdf(self, **kwargs): + """Generate netcdf file to object storage.""" + outfile, output = self._get_fsspec_remote_url('.nc', **kwargs) + + with outfile as tmp_file: + x = self.xr_dataset.to_netcdf( + tmp_file.name, format='NETCDF4', engine='h5netcdf', + compute=False + ) + execute_dask_compute(x) + + return output + def _get_chunk_indices(self, chunks): indices = [] start = 0 @@ -454,16 +519,7 @@ def _get_chunk_indices(self, chunks): start = stop return indices - def to_csv_stream(self, suffix='.csv', separator=','): - """Generate csv bytes stream. - - :param suffix: file extension, defaults to '.csv' - :type suffix: str, optional - :param separator: separator, defaults to ',' - :type separator: str, optional - :yield: bytes of csv file - :rtype: bytes - """ + def _get_dataset_for_csv(self): dim_order = [self.date_variable] reordered_cols = [ attribute.attribute.variable_name for attribute in self.attributes @@ -486,6 +542,21 @@ def to_csv_stream(self, suffix='.csv', separator=','): # rechunk dataset ds = self.xr_dataset.chunk(rechunk) + + return ds, dim_order, reordered_cols + + def to_csv_stream(self, suffix='.csv', separator=','): + """Generate csv bytes stream. + + :param suffix: file extension, defaults to '.csv' + :type suffix: str, optional + :param separator: separator, defaults to ',' + :type separator: str, optional + :yield: bytes of csv file + :rtype: bytes + """ + ds, dim_order, reordered_cols = self._get_dataset_for_csv() + date_indices = self._get_chunk_indices( ds.chunksizes[self.date_variable] ) @@ -513,13 +584,64 @@ def to_csv_stream(self, suffix='.csv', separator=','): if write_headers: headers = dim_order + list(chunk_df.columns) - yield bytes(','.join(headers) + '\n', 'utf-8') + yield bytes( + separator.join(headers) + '\n', + 'utf-8' + ) write_headers = False yield chunk_df.to_csv( - index=True, header=False, float_format='%g' + index=True, header=False, float_format='%g', + sep=separator ) + def to_csv(self, suffix='.csv', separator=',', **kwargs): + """Generate csv file to object storage.""" + ds, dim_order, reordered_cols = self._get_dataset_for_csv() + + date_indices = self._get_chunk_indices( + ds.chunksizes[self.date_variable] + ) + lat_indices = self._get_chunk_indices(ds.chunksizes['lat']) + lon_indices = self._get_chunk_indices(ds.chunksizes['lon']) + write_headers = True + output = None + + # cannot use dask utils because to_dataframe is not returning + # delayed object + with dask.config.set( + pool=ThreadPoolExecutor(get_num_of_threads(is_api=True)) + ): + outfile, output = self._get_fsspec_remote_url( + suffix, mode='w', **kwargs + ) + + with outfile as tmp_file: + # iterate foreach chunk + for date_start, date_stop in date_indices: + for lat_start, lat_stop in lat_indices: + for lon_start, lon_stop in lon_indices: + slice_dict = { + self.date_variable: slice( + date_start, date_stop + ), + 'lat': slice(lat_start, lat_stop), + 'lon': slice(lon_start, lon_stop) + } + chunk = ds.isel(**slice_dict) + chunk_df = chunk.to_dataframe(dim_order=dim_order) + chunk_df = chunk_df[reordered_cols] + + chunk_df.to_csv( + tmp_file.name, index=True, mode='a', + header=write_headers, + float_format='%g', sep=separator + ) + if write_headers: + write_headers = False + + return output + class BaseDatasetReader: """Base class for Dataset Reader.""" diff --git a/django_project/gap_api/admin.py b/django_project/gap_api/admin.py index 4df229fb..1255ceb8 100644 --- a/django_project/gap_api/admin.py +++ b/django_project/gap_api/admin.py @@ -16,12 +16,14 @@ from rest_framework_tracking.admin import APIRequestLogAdmin from rest_framework_tracking.models import APIRequestLog as BaseAPIRequestLog +from core.utils.file import format_size from gap.models import DatasetType from gap_api.models import ( APIRequestLog, DatasetTypeAPIConfig, Location, - APIRateLimiter + APIRateLimiter, + UserFile ) @@ -218,7 +220,21 @@ class APIRateLimiterAdmin(admin.ModelAdmin): actions = (export_rate_limiter_as_json,) +class UserFileAdmin(admin.ModelAdmin): + """Admin class for UserFile.""" + + list_display = ('name', 'user', 'get_size', 'created_on',) + + def get_size(self, obj: UserFile): + """Get the size.""" + return format_size(obj.size) + + get_size.short_description = 'Size' + get_size.admin_order_field = 'size' + + admin.site.register(APIRequestLog, GapAPIRequestLogAdmin) admin.site.register(DatasetTypeAPIConfig, GapAPIDatasetTypeConfigAdmin) admin.site.register(Location, LocationAdmin) admin.site.register(APIRateLimiter, APIRateLimiterAdmin) +admin.site.register(UserFile, UserFileAdmin) diff --git a/django_project/gap_api/api_views/measurement.py b/django_project/gap_api/api_views/measurement.py index ad379ceb..1f1a85ed 100644 --- a/django_project/gap_api/api_views/measurement.py +++ b/django_project/gap_api/api_views/measurement.py @@ -5,6 +5,7 @@ .. note:: Measurement APIs """ +import os from datetime import date, datetime, time from typing import Dict @@ -21,13 +22,17 @@ from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework.views import APIView +from django.core.files.storage import storages +from storages.backends.s3boto3 import S3Boto3Storage +from urllib.parse import urlparse from gap.models import ( Dataset, DatasetObservationType, Attribute, DatasetAttribute, - DatasetType + DatasetType, + Preferences ) from gap.providers import get_reader_from_dataset from gap.utils.reader import ( @@ -37,7 +42,7 @@ BaseDatasetReader, DatasetReaderOutputType ) -from gap_api.models import DatasetTypeAPIConfig, Location +from gap_api.models import DatasetTypeAPIConfig, Location, UserFile from gap_api.serializers.common import APIErrorSerializer from gap_api.utils.helper import ApiTag from gap_api.mixins import GAPAPILoggingMixin, CounterSlidingWindowThrottle @@ -186,7 +191,7 @@ def _get_attribute_filter(self): :rtype: List[Attribute] """ attributes_str = self.request.GET.get('attributes') - attributes_str = attributes_str.split(',') + attributes_str = [a.strip() for a in attributes_str.split(',')] return Attribute.objects.filter(variable_name__in=attributes_str) def _get_date_filter(self, attr_name): @@ -333,45 +338,149 @@ def _read_data_as_json( data['results'].append(values) return data + def _get_accel_redirect_response( + self, presigned_url, file_name, content_type + ): + parse_result = urlparse(presigned_url) + response = Response( + status=200 + ) + url = presigned_url.replace( + f"{parse_result.scheme}://{parse_result.netloc}/", "" + ) + response['X-Accel-Redirect'] = ( + f'/userfiles/{parse_result.scheme}/{parse_result.netloc}/{url}' + ) + response['Content-Type'] = content_type + response['Content-Disposition'] = ( + f'attachment; filename="{file_name}"' + ) + return response + + def _read_data_as_netcdf( self, reader_dict: Dict[int, BaseDatasetReader], - start_dt: datetime, end_dt: datetime) -> Response: + start_dt: datetime, end_dt: datetime, + user_file: UserFile = None) -> Response: + # check if can use UserFile cache + if user_file: + cache_exist = user_file.find_in_cache() + if cache_exist: + return self._get_accel_redirect_response( + cache_exist.generate_url(), + os.path.basename(cache_exist.name), + 'application/x-netcdf' + ) + reader: BaseDatasetReader = list(reader_dict.values())[0] reader_value = self._read_data(reader) if reader_value.is_empty(): return None - response = StreamingHttpResponse( - reader_value.to_netcdf_stream(), - content_type='application/x-netcdf' - ) - response['Content-Disposition'] = 'attachment; filename="data.nc"' + + file_name = 'data.nc' + # check config for using X-Accel-Redirect + if user_file is None: + response = StreamingHttpResponse( + reader_value.to_netcdf_stream(), + content_type='application/x-netcdf' + ) + response['Content-Disposition'] = ( + f'attachment; filename="{file_name}"' + ) + else: + s3_storage: S3Boto3Storage = storages["gap_products"] + file_path = reader_value.to_netcdf( + max_concurrency=( + self._preferences.user_file_uploader_config.get( + 'max_concurrency') + ), + default_block_size=( + self._preferences.user_file_uploader_config.get( + 'default_block_size') + ) + ).replace( + f's3://{s3_storage.bucket.name}/', '' + ) + file_name = os.path.basename(file_path) + presigned_link = s3_storage.url(file_path) + response = self._get_accel_redirect_response( + presigned_link, file_name, 'application/x-netcdf' + ) + # store the user_file + user_file.name = file_path + user_file.size = s3_storage.size(file_path) + user_file.save() return response def _read_data_as_csv( self, reader_dict: Dict[int, BaseDatasetReader], - start_dt: datetime, end_dt: datetime) -> Response: + start_dt: datetime, end_dt: datetime, + suffix='.csv', separator=',', content_type='text/csv', + user_file: UserFile = None) -> Response: + # check if can use UserFile cache + if user_file: + cache_exist = user_file.find_in_cache() + if cache_exist: + return self._get_accel_redirect_response( + cache_exist.generate_url(), + os.path.basename(cache_exist.name), + content_type + ) + reader: BaseDatasetReader = list(reader_dict.values())[0] reader_value = self._read_data(reader) if reader_value.is_empty(): return None - response = StreamingHttpResponse( - reader_value.to_csv_stream(), content_type='text/csv') - response['Content-Disposition'] = 'attachment; filename="data.csv"' + + file_name = f'data{suffix}' + # check config for using X-Accel-Redirect + if user_file is None: + response = StreamingHttpResponse( + reader_value.to_csv_stream( + suffix=suffix, + separator=separator + ), + content_type=content_type + ) + response['Content-Disposition'] = ( + f'attachment; filename="{file_name}"' + ) + else: + s3_storage: S3Boto3Storage = storages["gap_products"] + file_path = reader_value.to_csv( + suffix=suffix, + separator=separator, + max_concurrency=( + self._preferences.user_file_uploader_config.get( + 'max_concurrency') + ), + default_block_size=( + self._preferences.user_file_uploader_config.get( + 'default_block_size') + ) + ).replace( + f's3://{s3_storage.bucket.name}/', '' + ) + file_name = os.path.basename(file_path) + presigned_link = s3_storage.url(file_path) + response = self._get_accel_redirect_response( + presigned_link, file_name, content_type + ) + # store the user_file + user_file.name = file_path + user_file.size = s3_storage.size(file_path) + user_file.save() + return response def _read_data_as_ascii( self, reader_dict: Dict[int, BaseDatasetReader], start_dt: datetime, end_dt: datetime) -> Response: - reader: BaseDatasetReader = list(reader_dict.values())[0] - reader_value = self._read_data(reader) - if reader_value.is_empty(): - return None - response = StreamingHttpResponse( - reader_value.to_csv_stream('.txt', '\t'), - content_type='text/ascii') - response['Content-Disposition'] = 'attachment; filename="data.txt"' - return response + return self._read_data_as_csv( + reader_dict, start_dt, end_dt, + suffix='.txt', separator='\t', content_type='text/ascii' + ) def validate_output_format( self, dataset: Dataset, product_type: str, @@ -465,6 +574,12 @@ def validate_date_range(self, product_filter, start_dt, end_dt): ) }) + def _get_request_params(self): + request_params = {} + for k, v in self.request.GET.items(): + request_params[k] = v + return request_params + def get_response_data(self) -> Response: """Read data from dataset. @@ -533,6 +648,21 @@ def get_response_data(self) -> Response: print(e) pass + # Check UserFile cache if x_accel_redirect is enabled + user_file: UserFile = None + if self._preferences.api_use_x_accel_redirect: + query_params = self._get_request_params() + query_params['attributes'] = [ + a.strip() for a in query_params['attributes'].split(',') + ] + query_params['geom_type'] = location.type + query_params['geometry'] = location.geometry.wkt + user_file = UserFile( + user=self.request.user, + name="", + query_params=query_params + ) + response = None if output_format == DatasetReaderOutputType.JSON: data_value = self._read_data_as_json( @@ -544,9 +674,12 @@ def get_response_data(self) -> Response: ) elif output_format == DatasetReaderOutputType.NETCDF: response = self._read_data_as_netcdf( - dataset_dict, start_dt, end_dt) + dataset_dict, start_dt, end_dt, user_file=user_file) elif output_format == DatasetReaderOutputType.CSV: - response = self._read_data_as_csv(dataset_dict, start_dt, end_dt) + response = self._read_data_as_csv( + dataset_dict, start_dt, end_dt, + user_file=user_file + ) elif output_format == DatasetReaderOutputType.ASCII: response = self._read_data_as_ascii(dataset_dict, start_dt, end_dt) else: @@ -588,4 +721,6 @@ def get_response_data(self) -> Response: ) def get(self, request, *args, **kwargs): """Fetch weather data by a single point or bounding box.""" + self._preferences = Preferences.load() + return self.get_response_data() diff --git a/django_project/gap_api/apps.py b/django_project/gap_api/apps.py index 1d5b33f6..4f3da1ed 100644 --- a/django_project/gap_api/apps.py +++ b/django_project/gap_api/apps.py @@ -16,4 +16,4 @@ class GapApiConfig(AppConfig): def ready(self): """App ready handler.""" - from gap_api.tasks import store_api_logs, cleanup_user_locations # noqa + from gap_api.tasks import store_api_logs, cleanup_user_locations, cleanup_user_files # noqa diff --git a/django_project/gap_api/factories.py b/django_project/gap_api/factories.py index 01d71ba0..36bab42b 100644 --- a/django_project/gap_api/factories.py +++ b/django_project/gap_api/factories.py @@ -9,7 +9,10 @@ from django.contrib.gis.geos import Polygon, MultiPolygon from core.factories import UserF -from gap_api.models import APIRequestLog, Location, APIRateLimiter +from gap_api.models import ( + APIRequestLog, Location, APIRateLimiter, + UserFile +) class APIRequestLogFactory(DjangoModelFactory): @@ -63,3 +66,22 @@ class Meta: # noqa minute_limit = 10 hour_limit = 100 day_limit = 1000 + + +class UserFileFactory(DjangoModelFactory): + """Factory class for UserFile momdel.""" + + class Meta: # noqa + model = UserFile + + user = factory.SubFactory(UserF) + name = 'dev/user_data/123.csv' + query_params = { + 'product': 'cbam_historical_analysis_bias_adjust', + 'attributes': ['max_temperature'], + 'start_date': '2020-01-01', + 'end_date': '2020-01-02', + 'output_type': 'csv', + 'geom_type': 'point', + 'geometry': 'POINT (1 1)' + } diff --git a/django_project/gap_api/migrations/0005_userfile.py b/django_project/gap_api/migrations/0005_userfile.py new file mode 100644 index 00000000..d9c898a7 --- /dev/null +++ b/django_project/gap_api/migrations/0005_userfile.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.7 on 2024-12-08 15:52 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion + + +class Migration(migrations.Migration): + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('gap_api', '0004_apiratelimiter'), + ] + + operations = [ + migrations.CreateModel( + name='UserFile', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.CharField(help_text='File path to the storage.', max_length=512)), + ('created_on', models.DateTimeField(auto_now_add=True)), + ('query_params', models.JSONField(default=dict, help_text='Query parameters that generate the file.')), + ('query_hash', models.CharField(blank=True, editable=False, help_text='Hash that can be used to cache the query result.', max_length=512)), + ('size', models.IntegerField(default=0)), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)), + ], + ), + ] diff --git a/django_project/gap_api/models/__init__.py b/django_project/gap_api/models/__init__.py index 2ced82d3..3c071aab 100644 --- a/django_project/gap_api/models/__init__.py +++ b/django_project/gap_api/models/__init__.py @@ -2,3 +2,4 @@ from gap_api.models.api_config import * # noqa from gap_api.models.location import * # noqa from gap_api.models.rate_limiter import * # noqa +from gap_api.models.user_file import * # noqa diff --git a/django_project/gap_api/models/user_file.py b/django_project/gap_api/models/user_file.py new file mode 100644 index 00000000..ddb9d388 --- /dev/null +++ b/django_project/gap_api/models/user_file.py @@ -0,0 +1,99 @@ +# coding=utf-8 +""" +Tomorrow Now GAP API. + +.. note:: Models for User File +""" + +from django.db import models +from django.conf import settings +import hashlib +from django.core.files.storage import storages +from storages.backends.s3boto3 import S3Boto3Storage +from django.db.models.signals import post_delete +from django.dispatch import receiver + + +class UserFile(models.Model): + """Model to store user generated file from API request.""" + + user = models.ForeignKey( + settings.AUTH_USER_MODEL, + on_delete=models.CASCADE + ) + name = models.CharField( + max_length=512, + help_text='File path to the storage.' + ) + created_on = models.DateTimeField( + auto_now_add=True + ) + query_params = models.JSONField( + default=dict, + help_text='Query parameters that generate the file.' + ) + query_hash = models.CharField( + max_length=512, + help_text='Hash that can be used to cache the query result.', + editable=False, + blank=True + ) + size = models.IntegerField(default=0) + + def _calculate_hash(self): + """Calculate hash from query params.""" + combined_str = "" + + # add product + combined_str += f'product:{self.query_params.get("product")}' + # add attributes + attributes = self.query_params.get("attributes") + combined_str += f'product:{",".join(attributes)}' + # add start_date + combined_str += f'start_date:{self.query_params.get("start_date")}' + # add start_time + combined_str += ( + f'start_time:{self.query_params.get("start_time", "-")}' + ) + # add end_date + combined_str += f'end_date:{self.query_params.get("end_date")}' + # add end_time + combined_str += f'end_time:{self.query_params.get("end_time", "-")}' + # add output_type + combined_str += f'output_type:{self.query_params.get("output_type")}' + # add query geom type + combined_str += f'geom_type:{self.query_params.get("geom_type")}' + # add wkt geom + combined_str += f'geometry:{self.query_params.get("geometry")}' + # add altitudes + combined_str += f'altitudes:{self.query_params.get("altitudes", "-")}' + + return hashlib.sha512(combined_str.encode()).hexdigest() + + def save(self, *args, **kwargs): + """Override the save method to calculate and set the hash.""" + if not self.query_hash: + self.query_hash = self._calculate_hash() + super().save(*args, **kwargs) + + def generate_url(self): + """Generate pre-signed url to the storage.""" + s3_storage: S3Boto3Storage = storages["gap_products"] + return s3_storage.url(self.name) + + def find_in_cache(self): + """Find UserFile object in cache.""" + if not self.query_hash: + self.query_hash = self._calculate_hash() + + return UserFile.objects.filter( + query_hash=self.query_hash + ).order_by('-created_on').first() + + +@receiver(post_delete, sender=UserFile) +def post_delete_user_file(sender, instance, **kwargs): + """Delete user file in s3 object storage.""" + s3_storage: S3Boto3Storage = storages["gap_products"] + if s3_storage.exists(instance.name): + s3_storage.delete(instance.name) diff --git a/django_project/gap_api/tasks/__init__.py b/django_project/gap_api/tasks/__init__.py index a6d11c44..b5b1324b 100644 --- a/django_project/gap_api/tasks/__init__.py +++ b/django_project/gap_api/tasks/__init__.py @@ -1,2 +1,2 @@ from gap_api.tasks.api_log import * # noqa -from gap_api.tasks.cleanup_location import * # noqa +from gap_api.tasks.cleanup import * # noqa diff --git a/django_project/gap_api/tasks/cleanup.py b/django_project/gap_api/tasks/cleanup.py new file mode 100644 index 00000000..455031fd --- /dev/null +++ b/django_project/gap_api/tasks/cleanup.py @@ -0,0 +1,28 @@ +# coding=utf-8 +""" +Tomorrow Now GAP API. + +.. note:: Tasks for Cleanup Expired Data +""" + +from celery import shared_task +from datetime import timedelta +from django.utils import timezone + +from gap_api.models import Location, UserFile + + +@shared_task(name="cleanup_user_locations") +def cleanup_user_locations(): + """Cleanup Expired Location.""" + Location.objects.filter( + expired_on__lte=timezone.now() + ).delete() + + +@shared_task(name="cleanup_user_files") +def cleanup_user_files(): + """Cleanup UserFile more than 2 days.""" + UserFile.objects.filter( + created_on__lte=(timezone.now() - timedelta(days=2)) + ).delete() diff --git a/django_project/gap_api/tasks/cleanup_location.py b/django_project/gap_api/tasks/cleanup_location.py deleted file mode 100644 index 8f864d9a..00000000 --- a/django_project/gap_api/tasks/cleanup_location.py +++ /dev/null @@ -1,19 +0,0 @@ -# coding=utf-8 -""" -Tomorrow Now GAP API. - -.. note:: Tasks for Cleanup Expired Location -""" - -from celery import shared_task -from django.utils import timezone - -from gap_api.models import Location - - -@shared_task(name="cleanup_user_locations") -def cleanup_user_locations(): - """Cleanup Expired Location.""" - Location.objects.filter( - expired_on__lte=timezone.now() - ).delete() diff --git a/django_project/gap_api/tests/test_location_api.py b/django_project/gap_api/tests/test_location_api.py index 2337b3ef..911f4ca4 100644 --- a/django_project/gap_api/tests/test_location_api.py +++ b/django_project/gap_api/tests/test_location_api.py @@ -17,7 +17,7 @@ from gap_api.models.location import Location from gap_api.api_views.location import LocationAPI from gap_api.factories import LocationFactory -from gap_api.tasks.cleanup_location import cleanup_user_locations +from gap_api.tasks.cleanup import cleanup_user_locations class TestAPILocation(BaseAPIViewTest): diff --git a/django_project/gap_api/tests/test_measurement_api.py b/django_project/gap_api/tests/test_measurement_api.py index 070bc17d..4cb7de7f 100644 --- a/django_project/gap_api/tests/test_measurement_api.py +++ b/django_project/gap_api/tests/test_measurement_api.py @@ -18,7 +18,7 @@ StationFactory, MeasurementFactory ) -from gap.models import DatasetAttribute, Dataset, DatasetType +from gap.models import DatasetAttribute, Dataset, DatasetType, Preferences from gap.utils.reader import ( DatasetReaderValue, DatasetTimelineValue, DatasetReaderInput, DatasetReaderOutputType, BaseDatasetReader, @@ -62,6 +62,13 @@ def get_data_values(self) -> DatasetReaderValue: class CommonMeasurementAPITest(BaseAPIViewTest): """Common class for Measurement API Test.""" + def setUp(self): + """Set the test class.""" + preferences = Preferences.load() + preferences.api_use_x_accel_redirect = False + preferences.save() + super().setUp() + def _get_measurement_request( self, lat=None, lon=None, bbox=None, attributes='max_temperature', @@ -389,6 +396,7 @@ def test_read_from_tahmo(self): ) response = view(request) self.assertEqual(response.status_code, 200) + print(response.headers) self.assertEqual(response['content-type'], 'text/csv') def test_validate_date_range(self): diff --git a/django_project/gap_api/tests/test_user_file.py b/django_project/gap_api/tests/test_user_file.py new file mode 100644 index 00000000..5e24f48c --- /dev/null +++ b/django_project/gap_api/tests/test_user_file.py @@ -0,0 +1,300 @@ +# coding=utf-8 +""" +Tomorrow Now GAP. + +.. note:: Unit tests for UserFile. +""" + +from datetime import timedelta, datetime +from typing import List, Tuple +from unittest.mock import patch +from django.utils import timezone +from django.core.files.base import ContentFile +from django.core.files.storage import storages +from storages.backends.s3boto3 import S3Boto3Storage +from django.contrib.gis.geos import Point + +from core.utils.s3 import remove_s3_folder +from gap.models import DatasetAttribute, Dataset, Preferences +from gap_api.models import UserFile +from gap_api.tasks.cleanup import cleanup_user_files +from gap.utils.reader import ( + DatasetReaderValue, + DatasetReaderInput, DatasetReaderOutputType, BaseDatasetReader, + LocationInputType +) +from gap_api.api_views.measurement import MeasurementAPI + +from gap.factories import MeasurementFactory, StationFactory +from gap.tests.ingestor.test_cbam_bias_adjust import mock_open_zarr_dataset +from gap_api.tests.test_measurement_api import CommonMeasurementAPITest +from gap_api.factories import UserFileFactory + + +class MockXArrayDatasetReader(BaseDatasetReader): + """Class to mock a dataset reader.""" + + def __init__( + self, dataset, attributes: List[DatasetAttribute], + location_input: DatasetReaderInput, start_date: datetime, + end_date: datetime, + output_type=DatasetReaderOutputType.JSON, + altitudes: Tuple[float, float] = None + ) -> None: + """Initialize MockDatasetReader class.""" + super().__init__( + dataset, attributes, location_input, + start_date, end_date, output_type) + + def get_data_values(self) -> DatasetReaderValue: + """Override data values with a mock object.""" + if self.location_input.type == LocationInputType.POLYGON: + p = Point(0, 0) + else: + p = self.location_input.point + return DatasetReaderValue( + mock_open_zarr_dataset(), + DatasetReaderInput.from_point(p), + self.attributes + ) + + +class TestUserFileAPI(CommonMeasurementAPITest): + """Test UserFile in the API.""" + + fixtures = [ + '2.provider.json', + '3.station_type.json', + '4.dataset_type.json', + '5.dataset.json', + '6.unit.json', + '7.attribute.json', + '8.dataset_attribute.json' + ] + + def setUp(self): + """Set the test class.""" + super().setUp() + preferences = Preferences.load() + preferences.api_use_x_accel_redirect = True + preferences.save() + + self.s3_storage: S3Boto3Storage = storages["gap_products"] + + def tearDown(self): + """Cleanup resources.""" + remove_s3_folder(self.s3_storage, 'dev/user_data') + super().tearDown() + + def test_cleanup(self): + """Test the cleanup logic.""" + f1 = UserFileFactory.create() + self.s3_storage.save(f1.name, ContentFile('echo')) + + f2 = UserFileFactory.create() + f2.name = 'dev/user_data/124.csv' + f2.created_on = timezone.now() - timedelta(days=15) + f2.save() + self.s3_storage.save(f2.name, ContentFile('echo')) + + cleanup_user_files() + + self.assertTrue( + UserFile.objects.filter( + id=f1.id + ).exists() + ) + self.assertFalse( + UserFile.objects.filter( + id=f2.id + ).exists() + ) + self.assertTrue(self.s3_storage.exists(f1.name)) + self.assertFalse(self.s3_storage.exists(f2.name)) + + @patch('gap_api.api_views.measurement.get_reader_from_dataset') + def test_api_netcdf_request(self, mocked_reader): + """Test generate to netcdf.""" + view = MeasurementAPI.as_view() + mocked_reader.return_value = MockXArrayDatasetReader + dataset = Dataset.objects.get( + type__variable_name='cbam_historical_analysis_bias_adjust' + ) + attribute1 = DatasetAttribute.objects.filter( + dataset=dataset, + attribute__variable_name='max_temperature' + ).first() + attribs = [attribute1.attribute.variable_name] + point = Point(x=26.9665, y=-12.5969) + request = self._get_measurement_request_point( + product='cbam_historical_analysis_bias_adjust', + attributes=','.join(attribs), + lat=point.y, lon=point.x, + start_dt='2023-01-01', + end_dt='2023-01-01', + output_type='netcdf' + ) + response = view(request) + self.assertEqual(response.status_code, 200) + mocked_reader.assert_called_once_with(attribute1.dataset) + self.assertIn('X-Accel-Redirect', response.headers) + self.assertTrue(UserFile.objects.filter( + user=self.superuser, + query_params__output_type='netcdf', + query_params__product='cbam_historical_analysis_bias_adjust', + query_params__geom_type='point', + query_params__geometry=point.wkt, + query_params__start_date='2023-01-01', + query_params__end_date='2023-01-01' + ).exists()) + + @patch('gap_api.api_views.measurement.get_reader_from_dataset') + def test_api_csv_request(self, mocked_reader): + """Test generate to csv.""" + view = MeasurementAPI.as_view() + mocked_reader.return_value = MockXArrayDatasetReader + dataset = Dataset.objects.get( + type__variable_name='cbam_historical_analysis_bias_adjust' + ) + attribute1 = DatasetAttribute.objects.filter( + dataset=dataset, + attribute__variable_name='max_temperature' + ).first() + attribs = [attribute1.attribute.variable_name] + point = Point(x=26.9665, y=-12.5969) + request = self._get_measurement_request_point( + product='cbam_historical_analysis_bias_adjust', + attributes=','.join(attribs), + lat=point.y, lon=point.x, + start_dt='2023-01-01', + end_dt='2023-01-01', + output_type='csv' + ) + response = view(request) + self.assertEqual(response.status_code, 200) + mocked_reader.assert_called_once_with(attribute1.dataset) + self.assertIn('X-Accel-Redirect', response.headers) + self.assertTrue(UserFile.objects.filter( + user=self.superuser, + query_params__output_type='csv', + query_params__product='cbam_historical_analysis_bias_adjust', + query_params__geom_type='point', + query_params__geometry=point.wkt, + query_params__start_date='2023-01-01', + query_params__end_date='2023-01-01' + ).exists()) + + @patch('gap_api.api_views.measurement.get_reader_from_dataset') + def test_api_cached_request(self, mocked_reader): + """Test cached UserFile.""" + f2 = UserFileFactory.create() + + view = MeasurementAPI.as_view() + mocked_reader.return_value = MockXArrayDatasetReader + dataset = Dataset.objects.get( + type__variable_name='cbam_historical_analysis_bias_adjust' + ) + attribute1 = DatasetAttribute.objects.filter( + dataset=dataset, + attribute__variable_name='max_temperature' + ).first() + attribs = [attribute1.attribute.variable_name] + request = self._get_measurement_request_point( + product='cbam_historical_analysis_bias_adjust', + attributes=','.join(attribs), + lat=1, lon=1, + start_dt='2020-01-01', + end_dt='2020-01-02', + output_type='csv' + ) + response = view(request) + self.assertEqual(response.status_code, 200) + mocked_reader.assert_called_once_with(attribute1.dataset) + self.assertIn('X-Accel-Redirect', response.headers) + self.assertIn(f2.name, response.headers['X-Accel-Redirect']) + + def test_api_observation_csv_request(self): + """Test Observation API to csv.""" + view = MeasurementAPI.as_view() + dataset = Dataset.objects.get(name='Tahmo Ground Observational') + p = Point(x=26.97, y=-12.56, srid=4326) + station = StationFactory.create( + geometry=p, + provider=dataset.provider + ) + attribute1 = DatasetAttribute.objects.filter( + dataset=dataset, + attribute__variable_name='min_relative_humidity' + ).first() + dt = datetime(2019, 11, 1, 0, 0, 0) + MeasurementFactory.create( + station=station, + dataset_attribute=attribute1, + date_time=dt, + value=100 + ) + attribs = [ + attribute1.attribute.variable_name + ] + request = self._get_measurement_request_point( + lat=p.y, + lon=p.x, + attributes=','.join(attribs), + product='tahmo_ground_observation', + output_type='csv', + start_dt=dt.date().isoformat(), + end_dt=dt.date().isoformat() + ) + response = view(request) + self.assertEqual(response.status_code, 200) + self.assertIn('X-Accel-Redirect', response.headers) + self.assertTrue(UserFile.objects.filter( + user=self.superuser, + query_params__output_type='csv', + query_params__product='tahmo_ground_observation', + query_params__geom_type='point', + query_params__geometry=p.wkt + ).exists()) + + def test_api_observation_netcdf_request(self): + """Test cached UserFile.""" + view = MeasurementAPI.as_view() + dataset = Dataset.objects.get(name='Tahmo Ground Observational') + p = Point(x=26.97, y=-12.56, srid=4326) + station = StationFactory.create( + geometry=p, + provider=dataset.provider + ) + attribute1 = DatasetAttribute.objects.filter( + dataset=dataset, + attribute__variable_name='min_relative_humidity' + ).first() + dt = datetime(2019, 11, 1, 0, 0, 0) + MeasurementFactory.create( + station=station, + dataset_attribute=attribute1, + date_time=dt, + value=100 + ) + attribs = [ + attribute1.attribute.variable_name + ] + request = self._get_measurement_request_point( + lat=p.y, + lon=p.x, + attributes=','.join(attribs), + product='tahmo_ground_observation', + output_type='netcdf', + start_dt=dt.date().isoformat(), + end_dt=dt.date().isoformat() + ) + response = view(request) + self.assertEqual(response.status_code, 200) + self.assertIn('X-Accel-Redirect', response.headers) + self.assertTrue(UserFile.objects.filter( + user=self.superuser, + query_params__output_type='netcdf', + query_params__product='tahmo_ground_observation', + query_params__geom_type='point', + query_params__geometry=p.wkt + ).exists())