diff --git a/backend/Dockerfile b/backend/Dockerfile index e6871867..96248032 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -118,4 +118,4 @@ RUN chown -R app:app $APP_HOME # change to the app user USER app WORKDIR /home/app/ -ENTRYPOINT ["./backend/entrypoint.sh", "-p"] +ENTRYPOINT ["./backend/entrypoint.sh", "-p"] \ No newline at end of file diff --git a/backend/api/__init__.py b/backend/api/__init__.py index 5b4eda0b..890d2472 100644 --- a/backend/api/__init__.py +++ b/backend/api/__init__.py @@ -46,9 +46,8 @@ def create_app(debug: bool = False) -> Flask: from .resources.cell_id import Cell_Id from .resources.power_data import Power_Data from .resources.teros_data import Teros_Data - from .resources.power_data_protobuf import Power_Data_Protobuf - from .resources.teros_data_protobuf import Teros_Data_Protobuf - from .resources.sensor_data import Sensor_Data + from .resources.sensor_data_ttn import Measurement_Upink + from .resources.sensor_data_wifi import Measurement_Direct from .resources.cell import Cell from .resources.session import Session_r from .resources.users_data import User_Data @@ -61,11 +60,8 @@ def create_app(debug: bool = False) -> Flask: api.add_resource(Cell_Id, "/cell/id") api.add_resource(Power_Data, "/power/", "/power/") api.add_resource(Teros_Data, "/teros/", "/teros/") - api.add_resource( - Power_Data_Protobuf, "/power-proto", "/power-proto/" - ) - api.add_resource(Teros_Data_Protobuf, "/teros-proto", "/teros-proto/") - api.add_resource(Sensor_Data, "/sensor", "/sensor/") + api.add_resource(Measurement_Direct, "/sensor/") + api.add_resource(Measurement_Upink, "/ttn/") api.add_resource(Session_r, "/session") api.add_resource(User_Data, "/user") app.register_blueprint(auth, url_prefix="/api") diff --git a/backend/api/database/getters.py b/backend/api/database/getters.py deleted file mode 100644 index cd46ed46..00000000 --- a/backend/api/database/getters.py +++ /dev/null @@ -1,136 +0,0 @@ -"""Helper functions for getting data - -Notes ------ -All functions must have a `sess` argument so that multiple functions can be -called while retaining object data from queried objects. -""" - -from sqlalchemy import select, func - -from .models import power_data as PowerData -from .models import teros_data as TEROSData - - -def get_power_data(sess, cell_id, resample="hour"): - """Gets the power data for a given cell. Can be directly passed to - bokeh.ColumnDataSource. - - Parmaters - --------- - sess : sqlalchemy.orm.Session - Session to use - cell_id : int - Valid Cell.id - - Returns - ------- - dict - Dictionary of lists with keys named after columns of the table - { - 'timestamp': [], - 'v': [], - 'i': [], - 'p': [] - } - """ - - data = { - 'timestamp': [], - 'v': [], - 'i': [], - 'p': [], - } - - resampled = ( - select( - func.date_trunc(resample, PowerData.ts).label("ts"), - func.avg(PowerData.voltage).label("voltage"), - func.avg(PowerData.current).label("current") - ) - .where(PowerData.cell_id == cell_id) - .group_by(func.date_trunc(resample, PowerData.ts)) - .subquery() - ) - - adj_units = ( - select( - resampled.c.ts.label("ts"), - (resampled.c.voltage * 10e-9).label("voltage"), - (resampled.c.current * 10e-6).label("current") - ) - .subquery() - ) - - stmt = ( - select( - adj_units.c.ts.label("ts"), - adj_units.c.voltage.label("voltage"), - adj_units.c.current.label("current"), - (adj_units.c.voltage * adj_units.c.current).label("power") - ) - .order_by(adj_units.c.ts) - ) - - for row in sess.execute(stmt): - data["timestamp"].append(row.ts) - data["v"].append(row.voltage) - data["i"].append(row.current) - data["p"].append(row.power) - - return data - - -def get_teros_data(sess, cell_id, resample='hour'): - """Gets the TEROS-12 sensor data for a given cell. Returned dictionary can - be passed directly to bokeh.ColumnDataSource. - - Parmaters - --------- - s : sqlalchemy.orm.Session - Session to use - cell_id : int - Valid Cell.id - resample : str - Resample time frame. Defaults to hour. Valid options are - [microseconds, milliseconds, second, minute, hour, day, week, month, - quarter, year, decade, century, millennium]. - - Returns - ------- - dict - Dictionary of lists with keys named after columns of the table - { - 'timestamp': [], - 'vwc': [], - 'temp': [], - 'ec': [] - } - """ - - data = { - 'timestamp': [], - 'vwc': [], - 'temp': [], - 'ec': [] - } - - stmt = ( - select( - func.date_trunc(resample, TEROSData.ts).label("ts"), - func.avg(TEROSData.vwc).label("vwc"), - func.avg(TEROSData.temp).label("temp"), - func.avg(TEROSData.ec).label("ec") - ) - .where(TEROSData.cell_id == cell_id) - .group_by(func.date_trunc(resample, TEROSData.ts)) - .order_by(func.date_trunc(resample, TEROSData.ts)) - ) - - for row in sess.execute(stmt): - data['timestamp'].append(row.ts) - data['vwc'].append(row.vwc) - data['temp'].append(row.temp) - data['ec'].append(row.ec) - - return data diff --git a/backend/api/database/models/power_data.py b/backend/api/database/models/power_data.py index 87b3c8fa..a5e2359a 100644 --- a/backend/api/database/models/power_data.py +++ b/backend/api/database/models/power_data.py @@ -65,55 +65,8 @@ def add_protobuf_power_data(logger_id, cell_id, ts, v, i): db.session.commit() return power_data - def get_power_data( - cell_id, - resample="hour", - startTime=datetime.now() - relativedelta(months=1), - endTime=datetime.now(), - ): - """gets power data aggregated by attributes""" - data = [] - - resampled = ( - db.select( - db.func.date_trunc(resample, PowerData.ts).label("ts"), - db.func.avg(PowerData.voltage).label("voltage"), - db.func.avg(PowerData.current).label("current"), - ) - .where(PowerData.cell_id == cell_id) - # .filter((PowerData.ts > startTime) & (PowerData.ts < endTime)) - .group_by(db.func.date_trunc(resample, PowerData.ts)) - .subquery() - ) - - # adj_units = db.select( - # resampled.c.ts.label("ts"), - # (resampled.c.voltage * 1e3).label("voltage"), - # (resampled.c.current * 1e6).label("current"), - # ).subquery() - - stmt = db.select( - resampled.c.ts.label("ts"), - (resampled.c.voltage * 1e3).label("voltage"), - (resampled.c.current * 1e6).label("current"), - (resampled.c.voltage * resampled.c.current * 1e6).label("power"), - ).order_by(resampled.c.ts) - - for row in db.session.execute(stmt): - data.append( - { - "ts": row.ts, - "v": row.voltage, - "i": row.current, - "p": row.power, - } - ) - - return data - def get_power_data_obj( cell_id, - resample="hour", start_time=datetime.now() - relativedelta(months=1), end_time=datetime.now(), stream=False, @@ -125,45 +78,27 @@ def get_power_data_obj( "i": [], "p": [], } - if not stream: - resampled = ( - db.select( - db.func.date_trunc(resample, PowerData.ts).label("ts"), - db.func.avg(PowerData.voltage).label("voltage"), - db.func.avg(PowerData.current).label("current"), - ) - .where((PowerData.cell_id == cell_id)) - # .filter((PowerData.ts >= startTime, PowerData.ts <= endTime)) - .filter((PowerData.ts.between(start_time, end_time))) - .group_by(db.func.date_trunc(resample, PowerData.ts)) - .subquery() - ) - else: - resampled = ( - db.select( - PowerData.ts, - PowerData.voltage, - PowerData.current, - ) - .where((PowerData.cell_id == cell_id)) - .filter((PowerData.ts.between(start_time, end_time))) - .subquery() - ) - # adj_units = db.select( - # resampled.c.ts.label("ts"), - # resampled.c.voltage.label("voltage"), - # resampled.c.current.label("current"), - # ).subquery() + stmt = ( + db.select( + PowerData.ts.label("ts"), + PowerData.voltage.label("voltage"), + PowerData.current.label("current"), + ) + .where(PowerData.cell_id == cell_id) + .filter((PowerData.ts.between(start_time, end_time))) + .subquery() + ) - stmt = db.select( - resampled.c.ts.label("ts"), - (resampled.c.voltage * 1e3).label("voltage"), - (resampled.c.current * 1e6).label("current"), - (resampled.c.voltage * resampled.c.current * 1e6).label("power"), - ).order_by(resampled.c.ts) + # expected units are mV, uA, and uW + adj_units = db.select( + stmt.c.ts.label("ts"), + stmt.c.voltage.label("voltage") * 1e-3, + stmt.c.current.label("current") * 1e-6, + (stmt.c.voltage * stmt.c.current * 1e-6).label("power") + ).order_by(stmt.c.ts) - for row in db.session.execute(stmt): + for row in db.session.execute(adj_units): data["timestamp"].append(row.ts) data["v"].append(row.voltage) data["i"].append(row.current) diff --git a/backend/api/database/models/teros_data.py b/backend/api/database/models/teros_data.py index e17d04fe..1f3a2ef6 100644 --- a/backend/api/database/models/teros_data.py +++ b/backend/api/database/models/teros_data.py @@ -47,7 +47,7 @@ def add_teros_data(cell_name, ts, vwc, raw_vwc, temp, ec, water_pot): return teros_data @staticmethod - def add_protobuf_power_data(cell_id, ts, vwc, raw_vwc, temp, ec, water_pot): + def add_protobuf_teros_data(cell_id, ts, vwc, raw_vwc, temp, ec, water_pot): cur_cell = Cell.query.filter_by(id=cell_id).first() if cur_cell is None: return None @@ -64,66 +64,32 @@ def add_protobuf_power_data(cell_id, ts, vwc, raw_vwc, temp, ec, water_pot): db.session.commit() return teros_data - def get_teros_data(cell_id, resample="hour"): - """gets teros data aggregated by attributes""" - data = [] - - stmt = ( - db.select( - func.date_trunc(resample, TEROSData.ts).label("ts"), - func.avg(TEROSData.vwc).label("vwc"), - func.avg(TEROSData.temp).label("temp"), - func.avg(TEROSData.ec).label("ec"), - ) - .where(TEROSData.cell_id == cell_id) - .group_by(func.date_trunc(resample, TEROSData.ts)) - .order_by(func.date_trunc(resample, TEROSData.ts)) - ) - - for row in db.session.execute(stmt): - data.append( - { - "ts": row.ts, - "vwc": row.vwc, - "temp": row.temp, - "ec": row.ec, - } - ) - return data - def get_teros_data_obj( cell_id, - resample="hour", start_time=datetime.now() - relativedelta(months=1), end_time=datetime.now(), stream=False, ): """gets teros data as a list of objects""" - data = {"timestamp": [], "vwc": [], "temp": [], "ec": []} - if not stream: - stmt = ( - db.select( - func.date_trunc(resample, TEROSData.ts).label("ts"), - func.avg(TEROSData.vwc).label("vwc"), - func.avg(TEROSData.temp).label("temp"), - func.avg(TEROSData.ec).label("ec"), - ) - .where(TEROSData.cell_id == cell_id) - .filter((TEROSData.ts.between(start_time, end_time))) - .group_by(func.date_trunc(resample, TEROSData.ts)) - .order_by(func.date_trunc(resample, TEROSData.ts)) - ) - else: - stmt = ( - db.select( - TEROSData.ts, - TEROSData.vwc, - TEROSData.temp, - TEROSData.ec, - ) - .where(TEROSData.cell_id == cell_id) - .filter((TEROSData.ts.between(start_time, end_time))) + data = { + "timestamp": [], + "vwc": [], + "temp": [], + "ec": [] + } + + # VWC stored in decimal, converted to percentage + stmt = ( + db.select( + TEROSData.ts.label("ts"), + (TEROSData.vwc * 100).label("vwc"), + TEROSData.temp.label("temp"), + TEROSData.ec.label("ec"), ) + .where(TEROSData.cell_id == cell_id) + .filter((TEROSData.ts.between(start_time, end_time))) + .order_by(TEROSData.ts) + ) for row in db.session.execute(stmt): data["timestamp"].append(row.ts) diff --git a/backend/api/resources/power_data_protobuf.py b/backend/api/resources/power_data_protobuf.py deleted file mode 100644 index d155c6fe..00000000 --- a/backend/api/resources/power_data_protobuf.py +++ /dev/null @@ -1,46 +0,0 @@ -from flask import request, jsonify, make_response -from flask_restful import Resource -from ..database.schemas.power_data_schema import PowerDataSchema -from ..database.schemas.get_cell_data_schema import GetCellDataSchema -from ..database.schemas.p_input import PInput -from ..database.models.power_data import PowerData -from soil_power_sensor_protobuf import encode, decode - -from datetime import datetime - -power_schema = PowerDataSchema() -get_cell_data = GetCellDataSchema() -p_in = PInput() - - -class Power_Data_Protobuf(Resource): - def post(self): - meas_power = decode(request.data) - - new_pwr_data = PowerData.add_protobuf_power_data( - meas_power["loggerId"], - meas_power["cellId"], - datetime.fromtimestamp(meas_power["ts"]), - meas_power["voltage"], - meas_power["current"], - ) - if new_pwr_data is None: - # missing cell_id or logger_id in table - encoded_data = encode(success=False) - response = make_response(encoded_data) - response.headers["content-type"] = "text/octet-stream" - response.status_code = 500 - return response - encoded_data = encode(success=True) - response = make_response(encoded_data) - response.headers["content-type"] = "text/octet-stream" - response.status_code = 201 - return response - - def get(self, cell_id=0): - v_args = get_cell_data.load(request.args) - return jsonify( - PowerData.get_power_data_obj( - cell_id, start_time=v_args["startTime"], end_time=v_args["endTime"] - ) - ) diff --git a/backend/api/resources/sensor_data.py b/backend/api/resources/sensor_data.py deleted file mode 100644 index 789bf6ea..00000000 --- a/backend/api/resources/sensor_data.py +++ /dev/null @@ -1,49 +0,0 @@ -from flask import request, jsonify, make_response -from flask_restful import Resource -from ..database.schemas.data_schema import DataSchema -from ..database.schemas.get_sensor_data_schema import GetSensorDataSchema -from ..database.models.sensor import Sensor -from soil_power_sensor_protobuf import encode, decode - -from datetime import datetime - -data_schema = DataSchema() -get_sensor_data_schema = GetSensorDataSchema() - - -class Sensor_Data(Resource): - def post(self): - meas_sensor = decode(request.data) - - for measurement, data in meas_sensor["data"].items(): - res = Sensor.add_data( - meas_sensor["cellId"], - meas_sensor["type"], - measurement, - data, - meas_sensor["data_type"], - datetime.fromtimestamp(meas_sensor["ts"]), - ) - if res is None: - encoded_data = encode(success=False) - response = make_response(encoded_data) - response.headers["content-type"] = "text/octet-stream" - response.status_code = 500 - return response - encoded_data = encode(success=True) - response = make_response(encoded_data) - response.headers["content-type"] = "text/octet-stream" - response.status_code = 201 - return response - - def get(self): - v_args = get_sensor_data_schema.load(request.args) - sensor_obj = Sensor.get_sensor_data_obj( - cell_id=v_args["cellId"], - measurement=v_args["measurement"], - start_time=v_args["startTime"], - end_time=v_args["endTime"], - ) - if sensor_obj is None: - return {"msg": "sensor not found"}, 400 - return jsonify(sensor_obj) diff --git a/backend/api/resources/sensor_data_ttn.py b/backend/api/resources/sensor_data_ttn.py new file mode 100644 index 00000000..794ca8df --- /dev/null +++ b/backend/api/resources/sensor_data_ttn.py @@ -0,0 +1,51 @@ +"""Sensor endpoint for request made with The Things Network webhooks + +The api endpoint handles uplink messages from The Things Network (TNN). The data +payload is decoded and inserted into the database. See the following for more +information on TTN integration: +- https://www.thethingsindustries.com/docs/integrations/webhooks/creating-webhooks/. +- https://www.thethingsindustries.com/docs/the-things-stack/concepts/data-formats/. + +TODO: +- Integrate downlinks to device to ack the data as successfully inserted into +the db and data can be cleared from local non-volatile storage. See +https://www.thethingsindustries.com/docs/integrations/webhooks/scheduling-downlinks/ + +Author: John Madden +""" + +import base64 + +from flask import request +from flask_restful import Resource + +from .util import process_measurement + + +class Measurement_Upink(Resource): + def post(self): + """Handlings uplink POST request from TTN + + Returns: + Response indicating success or failure. See util.process_measurement + for full description. + """ + + content_type = request.headers.get("Content-Type") + + # get uplink json + if content_type == "application/json": + uplink_json = request.json + else: + raise ValueError("POST request must be application/json") + + # get payload + payload_str = uplink_json["data"]["uplink_message"]["frm_payload"] + payload = base64.b64decode(payload_str) + + resp = process_measurement(payload) + + # TODO: Add downlink messages to device + + # return json of measurement + return resp \ No newline at end of file diff --git a/backend/api/resources/sensor_data_wifi.py b/backend/api/resources/sensor_data_wifi.py new file mode 100644 index 00000000..8218d8fe --- /dev/null +++ b/backend/api/resources/sensor_data_wifi.py @@ -0,0 +1,43 @@ +"""Sensor endpoint for requests directly uploading data + +The measurement data is POSTed to this resource as protobuf encoded binary data +which is decoded into a dictionary containing measured values. The data is +inserted into the appropriate table in the database. A HTTP response is sent +back containing protobuf binary data with the response message indicating status +of data. + +Author: John Madden +""" + +from flask import request +from flask_restful import Resource + +from .util import process_measurement + + +class Measurement_Direct(Resource): + def post(self): + """Handles POST request containing binary data and return response + + The HTTP request is checked for appropriate Content-Type then the + measurement is decoded and inserted into the database. Both a HTTP and + binary response are returned. + + Returns: + Response indicating success or failure. See util.process_measurement + for full description. + """ + + content_type = request.headers.get("Content-Type") + + # check for correct content type and get json + if content_type == "application/octet-stream": + # get uplink json + data = request.data + else: + raise ValueError("POST request must be application/json") + + # decode and insret into db + resp = process_measurement(data) + + return resp \ No newline at end of file diff --git a/backend/api/resources/teros_data_protobuf.py b/backend/api/resources/teros_data_protobuf.py deleted file mode 100644 index 568860fd..00000000 --- a/backend/api/resources/teros_data_protobuf.py +++ /dev/null @@ -1,47 +0,0 @@ -from flask import request, jsonify, make_response -from flask_restful import Resource -from ..database.schemas.teros_data_schema import TEROSDataSchema -from ..database.schemas.get_cell_data_schema import GetCellDataSchema -from ..database.schemas.t_input import TInput -from ..database.models.teros_data import TEROSData -from soil_power_sensor_protobuf import encode, decode - -from datetime import datetime - -teros_schema = TEROSDataSchema() -get_cell_data = GetCellDataSchema() -t_in = TInput() - - -class Teros_Data_Protobuf(Resource): - def post(self): - meas_teros = decode(request.data) - - new_pwr_data = TEROSData.add_protobuf_power_data( - meas_teros["cellId"], - datetime.fromtimestamp(meas_teros["ts"]), - meas_teros["vwcAdj"], - meas_teros["vwcRaw"], - meas_teros["temp"], - meas_teros["ec"], - ) - if new_pwr_data is None: - # missing cell_id or logger_id in table - encoded_data = encode(success=False) - response = make_response(encoded_data) - response.headers["content-type"] = "text/octet-stream" - response.status_code = 500 - return response - encoded_data = encode(success=True) - response = make_response(encoded_data) - response.headers["content-type"] = "text/octet-stream" - response.status_code = 201 - return response - - def get(self, cell_id=0): - v_args = get_cell_data.load(request.args) - return jsonify( - TEROSData.get_teros_data_obj( - cell_id, start_time=v_args["startTime"], end_time=v_args["endTime"] - ) - ) diff --git a/backend/api/resources/util.py b/backend/api/resources/util.py new file mode 100644 index 00000000..b294d12a --- /dev/null +++ b/backend/api/resources/util.py @@ -0,0 +1,68 @@ +"""Utility functions for interacting with the database + +author: John Madden +""" + +from datetime import datetime + +from flask import Response +from soil_power_sensor_protobuf import encode_response, decode_measurement + +from ..database.models.power_data import PowerData +from ..database.models.teros_data import TEROSData + + +def process_measurement(data : bytes): + """Process protobuf encoded measurement + + The byte string gets decoded through protobuf and inserted into the + associated table. Upon successful insertion, a 200 response is sent back. On + failure when the server does not know how to handle a measurement type, a + 501 response is sent. Both cases have serialized response messages sent + back. + + Args + data: Encoded measurement message + + Returns: + Flask response with status code and protobuf encoded response. + """ + + # decode binary protobuf data + meas = decode_measurement(data) + + # power measurement + if meas["type"] == "power": + db_obj = PowerData.add_protobuf_power_data( + meas["loggerId"], + meas["cellId"], + datetime.fromtimestamp(meas["ts"]), + meas["data"]["voltage"], + meas["data"]["current"], + ) + + # teros12 measurement + elif meas["type"] == "teros12": + db_obj = TEROSData.add_protobuf_teros_data( + meas["cellId"], + datetime.fromtimestamp(meas["ts"]), + meas["data"]["vwcAdj"], + meas["data"]["vwcRaw"], + meas["data"]["temp"], + meas["data"]["ec"], + None, + ) + + # format response + resp = Response() + resp.content_type = "application/octet-stream" + # indicate a success with 200 + if db_obj is not None: + resp.status_code = 200 + resp.data = encode_response(True) + # indicate an error with 501 + else: + resp.status_code = 501 + resp.data = encode_response(False) + + return resp \ No newline at end of file diff --git a/backend/entrypoint.sh b/backend/entrypoint.sh index 2b394802..3839faf8 100755 --- a/backend/entrypoint.sh +++ b/backend/entrypoint.sh @@ -4,7 +4,7 @@ while getopts 'dp' FLAG do case "$FLAG" in - d) flask --app backend.wsgi --debug run -p 8000;; + d) flask --app backend.wsgi --debug run -h 0.0.0.0 -p 8000;; p) gunicorn -b :8000 backend.wsgi:handler;; esac done diff --git a/backend/migrate.sh b/backend/migrate.sh new file mode 100755 index 00000000..c93f3013 --- /dev/null +++ b/backend/migrate.sh @@ -0,0 +1,62 @@ +#!/bin/sh + +# Date: 02/11/24 +# Author: Aaron Wu +# This aggergates all the necessary alembic migration commands + +# Note: migration commands +# flask --app backend.api db init -d ./backend/api/migrations +# flask --app backend.api db stamp head -d ./backend/api/migrations +# flask --app backend.api db migrate -d ./backend/api/database/migrations +# flask --app backend.api db revision -d ./backend/api/database/migrations --autogenerate -m "updated db" +# flask --app backend.api db upgrade head -d ./backend/api/database/migrations +# flask --app backend.api db downgrade -d ./backend/api/migrations +# flask --app backend.api db check -d ./backend/api/migrations + +USAGE="script usage: $(basename $0) [-u] [-m ] [-d ] [-c]" + +if (( $# == 0 )); then + echo $USAGE + exit 1 +fi + + +while getopts 'um:d:ch' FLAG +do + case "$FLAG" in + u) + # upgrades to lastest alembic version + flask --app api db upgrade head -d api/migrations + ;; + m) + # Migrate database + msg="$OPTARG" + # read -p "Enter migrate message: " msg; + flask --app api db migrate -d api/migrations -m "$msg"; + flask --app api db upgrade head -d api/migrations + ;; + d) + # Downgrade to another alembic version + ver="$OPTARG" + # read -p "Enter downgrade version: " ver; + flask --app api db downgrade $ver -d api/migrations + ;; + c) + # Checks if database needs migration + flask --app api db check -d api/migrations + ;; + h) + # Usage help + echo $USAGE >&2 + exit 1 + ;; + ?) + # Invalid flag + echo $USAGE >&2 + exit 1 + ;; + esac +done + + + diff --git a/backend/requirements.txt b/backend/requirements.txt index ffeb9a8a..06ef420f 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1,7 +1,6 @@ sqlalchemy alembic psycopg2-binary==2.9.5 -chirpstack-api==3.7.2 protobuf tqdm Flask