diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..d9b869d6 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,150 @@ +docs/site/ +.vscode/ +*.code-workspace +.idea/ +.DS_Store +Pipfile +Pipfile.lock +pgadmin4/ +mkdocs_env/ +latest.dump + + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/l diff --git a/Dockerfile b/Dockerfile index e3c4af3b..d7c1193d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,8 +3,12 @@ FROM python:3.8 MAINTAINER Daniel Reeves "xdanielreeves@gmail.com" WORKDIR / + +ADD --chmod=755 https://astral.sh/uv/install.sh /install.sh +RUN /install.sh && rm /install.sh + COPY requirements.txt app/requirements.txt -RUN pip install --no-cache-dir -r app/requirements.txt +RUN /root/.cargo/bin/uv pip install --system --no-cache -r app/requirements.txt COPY ./ /home/ WORKDIR /home/ @@ -13,6 +17,5 @@ ENV PYTHONPATH=/home EXPOSE 80 CMD ["gunicorn", \ - "-k", "egg:meinheld#gunicorn_worker", \ "-c", "gunicorn_conf.py", \ "app.main:create_app()"] diff --git a/app/admin/views/data.py b/app/admin/views/data.py index 99ce463b..5c7e5bbe 100644 --- a/app/admin/views/data.py +++ b/app/admin/views/data.py @@ -16,10 +16,12 @@ from app.data.celery import celery_app from app.data.celery import combine_data_v1_task from app.data.celery import combine_data_v2_task +from app.data.celery import combine_data_v3_task from app.data.celery import live_hobolink_data_task from app.data.celery import live_usgs_data_task from app.data.celery import predict_v1_task from app.data.celery import predict_v2_task +from app.data.celery import predict_v3_task from app.data.celery import update_db_task from app.data.database import execute_sql from app.data.database import get_current_time @@ -151,6 +153,17 @@ def source_usgs(self): data_source='usgs' )) + @expose('/csv/src/processed_data_v1_source') + def source_combine_data_v1(self): + async_result = combine_data_v1_task.s( + export_name='code_for_boston_export_90d', + days_ago=90).delay() + return redirect(url_for( + 'admin_downloadview.csv_wait', + task_id=async_result.id, + data_source='combined' + )) + @expose('/csv/src/processed_data_v2_source') def source_combine_data_v2(self): async_result = combine_data_v2_task.s( @@ -162,9 +175,9 @@ def source_combine_data_v2(self): data_source='combined' )) - @expose('/csv/src/processed_data_v1_source') - def source_combine_data_v1(self): - async_result = combine_data_v1_task.s( + @expose('/csv/src/processed_data_v3_source') + def source_combine_data_v3(self): + async_result = combine_data_v3_task.s( export_name='code_for_boston_export_90d', days_ago=90).delay() return redirect(url_for( @@ -173,6 +186,17 @@ def source_combine_data_v1(self): data_source='combined' )) + @expose('/csv/src/prediction_v1_source') + def source_prediction_v1(self): + async_result = predict_v1_task.s( + export_name='code_for_boston_export_90d', + days_ago=90).delay() + return redirect(url_for( + 'admin_downloadview.csv_wait', + task_id=async_result.id, + data_source='prediction' + )) + @expose('/csv/src/prediction_v2_source') def source_prediction_v2(self): async_result = predict_v2_task.s( @@ -184,9 +208,9 @@ def source_prediction_v2(self): data_source='prediction' )) - @expose('/csv/src/prediction_v1_source') - def source_prediction_v1(self): - async_result = predict_v1_task.s( + @expose('/csv/src/prediction_v3_source') + def source_prediction_v3(self): + async_result = predict_v3_task.s( export_name='code_for_boston_export_90d', days_ago=90).delay() return redirect(url_for( @@ -251,6 +275,17 @@ def sync_source_usgs(self): filename='usgs_source.csv' ) + @expose('/csv/src_sync/processed_data_v1_source') + def sync_source_combine_data_v1(self): + df = combine_data_v1_task.run( + days_ago=90, + export_name='code_for_boston_export_90d' + ) + return send_csv_attachment_of_dataframe( + df=pd.DataFrame(df), + filename='model_processed_data.csv' + ) + @expose('/csv/src_sync/processed_data_v2_source') def sync_source_combine_data_v2(self): df = combine_data_v2_task.run( @@ -262,9 +297,9 @@ def sync_source_combine_data_v2(self): filename='model_processed_data.csv' ) - @expose('/csv/src_sync/processed_data_v1_source') - def sync_source_combine_data_v1(self): - df = combine_data_v1_task.run( + @expose('/csv/src_sync/processed_data_v3_source') + def sync_source_combine_data_v3(self): + df = combine_data_v3_task.run( days_ago=90, export_name='code_for_boston_export_90d' ) @@ -273,6 +308,17 @@ def sync_source_combine_data_v1(self): filename='model_processed_data.csv' ) + @expose('/csv/src_sync/prediction_v1_source') + def sync_source_prediction_v1(self): + df = predict_v1_task.run( + days_ago=90, + export_name='code_for_boston_export_90d' + ) + return send_csv_attachment_of_dataframe( + df=pd.DataFrame(df), + filename='prediction_source.csv' + ) + @expose('/csv/src_sync/prediction_v2_source') def sync_source_prediction_v2(self): df = predict_v2_task.run( @@ -284,9 +330,9 @@ def sync_source_prediction_v2(self): filename='prediction_source.csv' ) - @expose('/csv/src_sync/prediction_v1_source') - def sync_source_prediction_v1(self): - df = predict_v1_task.run( + @expose('/csv/src_sync/prediction_v3_source') + def sync_source_prediction_v3(self): + df = predict_v3_task.run( days_ago=90, export_name='code_for_boston_export_90d' ) diff --git a/app/blueprints/api_v1.py b/app/blueprints/api_v1.py index e4bb0251..495921fb 100644 --- a/app/blueprints/api_v1.py +++ b/app/blueprints/api_v1.py @@ -20,7 +20,7 @@ from app.data.globals import cache from app.data.globals import reaches from app.data.globals import website_options -from app.data.processing.predictive_models.v2 import MODEL_VERSION +from app.data.processing.predictive_models.v3 import MODEL_YEAR bp = Blueprint('api', __name__, url_prefix='/api') @@ -40,7 +40,7 @@ def predictive_model_api(): selected_hours = max(selected_hours, 1) return jsonify({ - 'model_version': MODEL_VERSION, + 'model_version': MODEL_YEAR, 'time_returned': get_current_time(), 'is_boating_season': website_options.boating_season, 'model_outputs': [ diff --git a/app/data/celery.py b/app/data/celery.py index 8083ef7d..18948fa8 100644 --- a/app/data/celery.py +++ b/app/data/celery.py @@ -75,8 +75,8 @@ def live_usgs_data_task(*args, **kwargs) -> RecordsType: @celery_app.task def combine_data_v1_task(*args, **kwargs) -> RecordsType: - from app.data.processing.core import combine_v2_job - df = combine_v2_job(*args, **kwargs) + from app.data.processing.core import combine_v1_job + df = combine_v1_job(*args, **kwargs) return df.to_dict(orient='records') @@ -87,6 +87,13 @@ def combine_data_v2_task(*args, **kwargs) -> RecordsType: return df.to_dict(orient='records') +@celery_app.task +def combine_data_v3_task(*args, **kwargs) -> RecordsType: + from app.data.processing.core import combine_v3_job + df = combine_v3_job(*args, **kwargs) + return df.to_dict(orient='records') + + @celery_app.task def predict_v1_task(*args, **kwargs) -> RecordsType: from app.data.processing.core import predict_v1_job @@ -101,6 +108,13 @@ def predict_v2_task(*args, **kwargs) -> RecordsType: return df.to_dict(orient='records') +@celery_app.task +def predict_v3_task(*args, **kwargs) -> RecordsType: + from app.data.processing.core import predict_v3_job + df = predict_v3_job(*args, **kwargs) + return df.to_dict(orient='records') + + @celery_app.task def update_db_task() -> None: from app.data.processing.core import update_db @@ -127,9 +141,13 @@ def send_database_exports_task() -> None: # Down here, we define the types for the tasks to help the IDE. live_hobolink_data_task: WithAppContextTask live_usgs_data_task: WithAppContextTask -combine_data_task: WithAppContextTask +combine_data_v1_task: WithAppContextTask +combine_data_v2_task: WithAppContextTask +combine_data_v3_task: WithAppContextTask clear_cache_task: WithAppContextTask -prediction_task: WithAppContextTask +predict_v1_task: WithAppContextTask +predict_v2_task: WithAppContextTask +predict_v3_task: WithAppContextTask update_db_task: WithAppContextTask update_website_task: WithAppContextTask send_database_exports_task: WithAppContextTask diff --git a/app/data/processing/core.py b/app/data/processing/core.py index 18c633c1..359eecaf 100644 --- a/app/data/processing/core.py +++ b/app/data/processing/core.py @@ -3,7 +3,10 @@ about adding a little repetition and not worrying about async processing-- both in service of simplifying the code for ease of maintenance. """ +from enum import Enum +from functools import partial from typing import Optional +from typing import Protocol import pandas as pd from flask import current_app @@ -16,8 +19,6 @@ from app.data.processing.hobolink import HOBOLINK_DEFAULT_EXPORT_NAME from app.data.processing.hobolink import HOBOLINK_ROWS_PER_HOUR from app.data.processing.hobolink import get_live_hobolink_data -from app.data.processing.predictive_models import v1 -from app.data.processing.predictive_models import v2 from app.data.processing.usgs import USGS_DEFAULT_DAYS_AGO from app.data.processing.usgs import USGS_ROWS_PER_HOUR from app.data.processing.usgs import get_live_usgs_data @@ -42,58 +43,83 @@ def _write_to_db( ) -@mail_on_fail -def combine_v1_job( - days_ago: int = USGS_DEFAULT_DAYS_AGO, - export_name: str = HOBOLINK_DEFAULT_EXPORT_NAME -) -> pd.DataFrame: - df_usgs = get_live_usgs_data(days_ago=days_ago) - df_hobolink = get_live_hobolink_data(export_name=export_name) - df_combined = v1.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) - return df_combined +class ModelModule(Protocol): + + MODEL_YEAR: str + + def process_data(self, df_hobolink: pd.DataFrame, df_usgs: pd.DataFrame) -> pd.DataFrame: + ... + + def all_models(self, df: pd.DataFrame, *args, **kwargs) -> pd.DataFrame: + ... + + +class ModelVersion(str, Enum): + v1 = "v1" + v2 = "v2" + v3 = "v3" + + def get_module(self) -> ModelModule: + if self == self.__class__.v1: + from app.data.processing.predictive_models import v1 + return v1 + elif self == self.__class__.v2: + from app.data.processing.predictive_models import v2 + return v2 + elif self == self.__class__.v3: + from app.data.processing.predictive_models import v3 + return v3 + else: + raise ValueError(f"Unclear what happened; {self} not supported") + + +DEFAULT_MODEL_VERSION = ModelVersion.v1 @mail_on_fail -def combine_v2_job( +def _combine_job( days_ago: int = USGS_DEFAULT_DAYS_AGO, - export_name: str = HOBOLINK_DEFAULT_EXPORT_NAME + export_name: str = HOBOLINK_DEFAULT_EXPORT_NAME, + model_version: ModelVersion = DEFAULT_MODEL_VERSION ) -> pd.DataFrame: + mod = model_version.get_module() df_usgs = get_live_usgs_data(days_ago=days_ago) df_hobolink = get_live_hobolink_data(export_name=export_name) - df_combined = v2.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) + df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) return df_combined -@mail_on_fail -def predict_v1_job( - days_ago: int = USGS_DEFAULT_DAYS_AGO, - export_name: str = HOBOLINK_DEFAULT_EXPORT_NAME -) -> pd.DataFrame: - df_usgs = get_live_usgs_data(days_ago=days_ago) - df_hobolink = get_live_hobolink_data(export_name=export_name) - df_combined = v1.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) - df_predictions = v1.all_models(df_combined) - return df_predictions +combine_v1_job = partial(_combine_job, model_version=ModelVersion.v1) +combine_v2_job = partial(_combine_job, model_version=ModelVersion.v2) +combine_v3_job = partial(_combine_job, model_version=ModelVersion.v3) @mail_on_fail -def predict_v2_job( +def _predict_job( days_ago: int = USGS_DEFAULT_DAYS_AGO, - export_name: str = HOBOLINK_DEFAULT_EXPORT_NAME + export_name: str = HOBOLINK_DEFAULT_EXPORT_NAME, + model_version: ModelVersion = DEFAULT_MODEL_VERSION ) -> pd.DataFrame: + mod = model_version.get_module() df_usgs = get_live_usgs_data(days_ago=days_ago) df_hobolink = get_live_hobolink_data(export_name=export_name) - df_combined = v2.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) - df_predictions = v2.all_models(df_combined) + df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) + df_predictions = mod.all_models(df_combined) return df_predictions +predict_v1_job = partial(_predict_job, model_version=ModelVersion.v1) +predict_v2_job = partial(_predict_job, model_version=ModelVersion.v2) +predict_v3_job = partial(_predict_job, model_version=ModelVersion.v3) + + @mail_on_fail def update_db() -> None: + mod = DEFAULT_MODEL_VERSION.get_module() df_usgs = get_live_usgs_data() df_hobolink = get_live_hobolink_data() - df_combined = v1.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) - df_predictions = v1.all_models(df_combined) + df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) + df_predictions = mod.all_models(df_combined) hours = current_app.config['STORAGE_HOURS'] try: @@ -110,10 +136,11 @@ def update_db() -> None: @mail_on_fail def send_database_exports() -> None: + mod = DEFAULT_MODEL_VERSION.get_module() df_usgs = get_live_usgs_data(days_ago=90) df_hobolink = get_live_hobolink_data(export_name='code_for_boston_export_90d') - df_combined = v1.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) - df_predictions = v1.all_models(df_combined) + df_combined = mod.process_data(df_hobolink=df_hobolink, df_usgs=df_usgs) + df_predictions = mod.all_models(df_combined) df_override_history = execute_sql('select * from override_history;') todays_date = get_current_time().strftime('%Y_%m_%d') diff --git a/app/data/processing/predictive_models/v1.py b/app/data/processing/predictive_models/v1.py index d94a286c..d5da5767 100644 --- a/app/data/processing/predictive_models/v1.py +++ b/app/data/processing/predictive_models/v1.py @@ -15,7 +15,7 @@ import pandas as pd -MODEL_VERSION = '2020' +MODEL_YEAR = '2020' SIGNIFICANT_RAIN = 0.2 SAFETY_THRESHOLD = 0.65 @@ -112,7 +112,7 @@ def process_data( .fillna(df['time'].min()) ) df['days_since_sig_rain'] = ( - (df['time'] - df['last_sig_rain']).dt.seconds / 60 / 60 / 24 + (df['time'] - df['last_sig_rain']).dt.total_seconds() / 60 / 60 / 24 ) return df diff --git a/app/data/processing/predictive_models/v2.py b/app/data/processing/predictive_models/v2.py index d3de1e23..f4d6fce0 100644 --- a/app/data/processing/predictive_models/v2.py +++ b/app/data/processing/predictive_models/v2.py @@ -6,7 +6,7 @@ import pandas as pd -MODEL_VERSION = '2023' +MODEL_YEAR = '2023' SIGNIFICANT_RAIN = 0.1 @@ -57,7 +57,7 @@ def process_data( .fillna(df_hobolink['time'].min()) ) df_hobolink['days_since_sig_rain'] = ( - (df_hobolink['time'] - df_hobolink['_last_sig_rain']).dt.seconds / 60 / 60 / 24 + (df_hobolink['time'] - df_hobolink['_last_sig_rain']).dt.total_seconds() / 60 / 60 / 24 ) # Now collapse the data. @@ -121,7 +121,7 @@ def process_data( .fillna(df['time'].min()) ) df['days_since_sig_rain'] = ( - (df['time'] - df['last_sig_rain']).dt.seconds / 60 / 60 / 24 + (df['time'] - df['last_sig_rain']).dt.total_seconds() / 60 / 60 / 24 ) return df diff --git a/app/data/processing/predictive_models/v3.py b/app/data/processing/predictive_models/v3.py new file mode 100644 index 00000000..65196362 --- /dev/null +++ b/app/data/processing/predictive_models/v3.py @@ -0,0 +1,266 @@ +# flake8: noqa: E501 +import numpy as np +import pandas as pd + + +MODEL_YEAR = '2024' + +SIGNIFICANT_RAIN = 0.1 +SAFETY_THRESHOLD = 0.65 + + +def sigmoid(ser: np.ndarray) -> np.ndarray: + return 1 / (1 + np.exp(-ser)) + + +def process_data( + df_hobolink: pd.DataFrame, + df_usgs: pd.DataFrame +) -> pd.DataFrame: + """Combines the data from the Hobolink and the USGS into one table. + + Args: + df_hobolink: Hobolink data + df_usgs: USGS NWIS data + + Returns: + Cleaned dataframe. + """ + df_hobolink = df_hobolink.copy() + df_usgs = df_usgs.copy() + + # Cast to datetime type. + # When this comes from Celery, it might be a string. + df_hobolink['time'] = pd.to_datetime(df_hobolink['time']) + df_usgs['time'] = pd.to_datetime(df_usgs['time']) + + # Convert all timestamps to hourly in preparation for aggregation. + df_usgs['time'] = df_usgs['time'].dt.floor('h') + df_hobolink['time'] = df_hobolink['time'].dt.floor('h') + + # Now collapse the data. + # Take the mean measurements of everything except rain; rain is the sum + # within an hour. (HOBOlink devices record all rain seen in 10 minutes). + df_usgs = ( + df_usgs + .groupby('time') + .mean() + .reset_index() + ) + df_hobolink = ( + df_hobolink + .groupby('time') + .agg({ + 'pressure': np.mean, + 'par': np.mean, + 'rain': np.sum, + 'rh': np.mean, + 'dew_point': np.mean, + 'wind_speed': np.mean, + 'gust_speed': np.mean, + 'wind_dir': np.mean, + # 'water_temp': np.mean, + 'air_temp': np.mean, + }) + .reset_index() + ) + + # This is an outer join to include all the data (we collect more Hobolink + # data than USGS data). With that said, for the most recent value, we need + # to make sure one of the sources didn't update before the other one did. + # Note that usually Hobolink updates first. + df = df_hobolink.merge(right=df_usgs, how='left', on='time') + df = df.sort_values('time') + df = df.reset_index() + + # Drop last row if either Hobolink or USGS is missing. + # We drop instead of `ffill()` because we want the model to output + # consistently each hour. + if df.iloc[-1, :][['stream_flow', 'rain']].isna().any(): + df = df.drop(df.index[-1]) + + # The code from here on consists of feature transformations. + + # Calculate rolling means + df['stream_flow_1d_mean'] = df['stream_flow'].rolling(24).mean() + df['pressure_2d_mean'] = df['pressure'].rolling(48).mean() + + # Calculate rolling sums + df['rain_0_to_12h_sum'] = df['rain'].rolling(12).sum() + + # Lastly, they measure the "time since last significant rain." Significant + # rain is defined as a cumulative sum of 0.1 in a 12-hour time period. + df['sig_rain'] = df['rain_0_to_12h_sum'] >= SIGNIFICANT_RAIN + df['last_sig_rain'] = ( + df['time'] + .where(df['sig_rain']) + .ffill() + .fillna(df['time'].min()) + ) + df['days_since_sig_rain'] = ( + (df['time'] - df['last_sig_rain']).dt.total_seconds() / 60 / 60 / 24 + ) + + return df + + +def reach_2_model(df: pd.DataFrame, rows: int = None) -> pd.DataFrame: + """ + 1NBS: + 𝑎 = 1.444 ∗ 10^2 + 𝑤 = −1.586 ∗ 10^−4 + 𝑥 = 4.785 + 𝑦 = −6.973 + 𝑧 = 1.137 + + Args: + df: Input data from `process_data()` + rows: (int) Number of rows to return. + + Returns: + Outputs for model as a dataframe. + """ + if rows is None: + df = df.copy() + else: + df = df.tail(n=rows).copy() + + df['probability'] = sigmoid( + 14.44 + - 0.0001586 * df['stream_flow_1d_mean'] + + 4.785 * df['pressure_2d_mean'] + - 6.973 * df['rain_0_to_12h_sum'] + + 1.137 * df['days_since_sig_rain'] + ) + + df['safe'] = df['probability'] <= SAFETY_THRESHOLD + df['reach_id'] = 2 + + return df[['reach_id', 'time', 'probability', 'safe']] + + +def reach_3_model(df: pd.DataFrame, rows: int = None) -> pd.DataFrame: + """ + 2LARZ: + 𝑎 = −19.119085 + 𝑤 = −0.001841 + 𝑥 = 0.658676 + 𝑦 = −2.766888 + 𝑧 = 0.642593 + + Args: + df: Input data from `process_data()` + rows: (int) Number of rows to return. + + Returns: + Outputs for model as a dataframe. + """ + if rows is None: + df = df.copy() + else: + df = df.tail(n=rows).copy() + + df['probability'] = sigmoid( + - 19.119085 + - 0.001841 * df['stream_flow_1d_mean'] + + 0.658676 * df['pressure_2d_mean'] + - 2.766888 * df['rain_0_to_12h_sum'] + + 0.642593 * df['days_since_sig_rain'] + ) + + df['safe'] = df['probability'] <= SAFETY_THRESHOLD + df['reach_id'] = 3 + + return df[['reach_id', 'time', 'probability', 'safe']] + + +def reach_4_model(df: pd.DataFrame, rows: int = None) -> pd.DataFrame: + """ + 3BU: + 𝑎 = −23.96789 + 𝑤 = 0.00248 + 𝑥 = 0.83702 + 𝑦 = −5.34479 + 𝑧 = −0.02940 + + Args: + df: (pd.DataFrame) Input data from `process_data()` + rows: (int) Number of rows to return. + + Returns: + Outputs for model as a dataframe. + """ + if rows is None: + df = df.copy() + else: + df = df.tail(n=rows).copy() + + df['probability'] = sigmoid( + - 23.96789 + + 0.00248 * df['stream_flow_1d_mean'] + + 0.83702 * df['pressure_2d_mean'] + - 5.34479 * df['rain_0_to_12h_sum'] + - 0.02940 * df['days_since_sig_rain'] + ) + + df['safe'] = df['probability'] <= SAFETY_THRESHOLD + df['reach_id'] = 4 + + return df[['reach_id', 'time', 'probability', 'safe']] + + +def reach_5_model(df: pd.DataFrame, rows: int = None) -> pd.DataFrame: + """ + 4LONG: + 𝑎 = −395.24225 + 𝑤 = −0.03635 + 𝑥 = 13.67660 + 𝑦 = −19.65122 + 𝑧 = 11.64241 + + Args: + df: (pd.DataFrame) Input data from `process_data()` + rows: (int) Number of rows to return. + + Returns: + Outputs for model as a dataframe. + """ + if rows is None: + df = df.copy() + else: + df = df.tail(n=rows).copy() + + df['probability'] = sigmoid( + - 395.24225 + - 0.03635 * df['stream_flow_1d_mean'] + + 13.67660 * df['pressure_2d_mean'] + - 19.65122 * df['rain_0_to_12h_sum'] + + 11.64241 * df['days_since_sig_rain'] + ) + + df['safe'] = df['probability'] <= SAFETY_THRESHOLD + df['reach_id'] = 5 + + return df[['reach_id', 'time', 'probability', 'safe']] + + +def all_models(df: pd.DataFrame, *args, **kwargs): + # Cast to datetime type. + # When this comes from Celery, it might be a string. + df['time'] = pd.to_datetime(df['time']) + + out = pd.concat([ + reach_2_model(df, *args, **kwargs), + reach_3_model(df, *args, **kwargs), + reach_4_model(df, *args, **kwargs), + reach_5_model(df, *args, **kwargs), + ], axis=0) + out = out.sort_values(['reach_id', 'time']) + + # TODO: + # I'm a little worried that I had to add the below to make a test pass. + # I thought this part of the code was pretty settled by now, but guess + # not. I need to look into what's going on. + + out = out.loc[out['probability'].notna(), :] + return out diff --git a/app/mail.py b/app/mail.py index a9fff85c..73f1e8f5 100644 --- a/app/mail.py +++ b/app/mail.py @@ -1,6 +1,7 @@ import io import traceback from functools import wraps +from typing import TypeVar import pandas as pd from flask import current_app @@ -9,6 +10,9 @@ from flask_mail import Message +T = TypeVar("T") + + class Mail(_Mail): def send(self, message: str): @@ -53,7 +57,7 @@ def attach_dataframe(self, df: pd.DataFrame, filename: str) -> None: self.attach(filename, "text/csv", f.getvalue()) -def mail_on_fail(func: callable): +def mail_on_fail(func: T) -> T: """Send an email when something fails. Use this as a decorator.""" @wraps(func) def _wrap(*args, **kwargs): diff --git a/app/templates/admin/download.html b/app/templates/admin/download.html index ac6dcbbc..f9a7f12e 100644 --- a/app/templates/admin/download.html +++ b/app/templates/admin/download.html @@ -41,6 +41,8 @@

Inactive sources

{% endblock %} diff --git a/docs/src/development/predictive_models.md b/docs/src/development/predictive_models.md index 9cfc1190..1a1c48be 100644 --- a/docs/src/development/predictive_models.md +++ b/docs/src/development/predictive_models.md @@ -24,11 +24,11 @@ The feature transformations the CRWA uses depends on the year of the model, so b ???+ note We use 28 days of HOBOlink data to process the model. For most features, we only need the last 48 hours worth of data to calculate the most recent value, however the last significant rainfall feature requires a lot of historic data because it is not technically bounded or transformed otherwise. This means that even when calculating 1 row of output data, i.e. the latest hour of data, we still need 28 days. - + In the deployed model, if we do not see any significant rainfall in the last 28 days, we return the difference between the timestamp and the earliest time in the dataframe, `#!python df['time'].min()`. In this scenario, the data will no longer be temporally consistent: a calculation right now will have `28.0` for `'days_since_sig_rain'`, but 12 hours from now it will be 27.5. This is fine though because the model will basically never predict E. coli blooms with 28+ days since significant rain, even when the data is not censored. - + Unfortunately there's no pretty way to implement `days_since_sig_rain`, so the Pandas code that does all of this is one of the more inscrutable parts of the codebase. Note that `'last_sig_rain'` is calculating the timestamp of the last significant rain, and `'days_since_sig_rain'` calculates the time delta and translates into days: - + ```python df['sig_rain'] = df['rain_0_to_24h_sum'] >= SIGNIFICANT_RAIN df['last_sig_rain'] = ( @@ -38,7 +38,7 @@ The feature transformations the CRWA uses depends on the year of the model, so b .fillna(df['time'].min()) ) df['days_since_sig_rain'] = ( - (df['time'] - df['last_sig_rain']).dt.seconds / 60 / 60 / 24 + (df['time'] - df['last_sig_rain']).dt.total_seconds() / 60 / 60 / 24 ) ``` @@ -60,7 +60,7 @@ def reach_3_model(df: pd.DataFrame, rows: int = None) -> pd.DataFrame: a- rainfall sum 0-24 hrs b- rainfall sum 24-48 hr d- Days since last rain - + Logistic model: 0.267*a + 0.1681*b - 0.02855*d + 0.5157 Args: @@ -94,11 +94,11 @@ def reach_3_model(df: pd.DataFrame, rows: int = None) -> pd.DataFrame: ???+ note This section covers making changes to the following: - + - The coefficients for the models. - The safety threshold. - The model features. - + If you want to do anything more complicated, such as adding a new source of information to the model, that is outside the scope of this document. To accomplish that, you'll need to do more sleuthing into the code to really understand it. ???+ note