From 97238d7d5e32a182ca84530e56d9ec8caf8996bb Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 5 Jun 2024 15:55:13 +0200 Subject: [PATCH 1/2] feature: add get_schedule for asset schedules Signed-off-by: F.N. Claessen --- flexmeasures/api/v3_0/assets.py | 212 +++++++++++++++++- .../tests/test_asset_schedules_fresh_db.py | 14 +- 2 files changed, 215 insertions(+), 11 deletions(-) diff --git a/flexmeasures/api/v3_0/assets.py b/flexmeasures/api/v3_0/assets.py index 637daea77..67a249de3 100644 --- a/flexmeasures/api/v3_0/assets.py +++ b/flexmeasures/api/v3_0/assets.py @@ -3,11 +3,13 @@ from datetime import datetime, timedelta import json -from flask import current_app +import isodate +from flask import current_app, url_for from flask_classful import FlaskView, route from flask_security import auth_required from flask_json import as_json from marshmallow import fields, ValidationError +from rq.job import Job, NoSuchJobError from webargs.flaskparser import use_kwargs, use_args from sqlalchemy import select, delete @@ -15,21 +17,29 @@ from flexmeasures.data import db from flexmeasures.data.models.user import Account from flexmeasures.data.models.generic_assets import GenericAsset -from flexmeasures.data.schemas.generic_assets import ( - GenericAssetSchema as AssetSchema, - GenericAssetIdField as AssetIdField, -) +from flexmeasures.data.queries.utils import simplify_index +from flexmeasures.data.schemas import AssetIdField +from flexmeasures.data.schemas.generic_assets import GenericAssetSchema as AssetSchema from flexmeasures.data.schemas.scheduling import AssetTriggerSchema from flexmeasures.data.schemas.times import AwareDateTimeField from flexmeasures.data.services.scheduling import ( create_sequential_scheduling_job, + get_data_source_for_job, ) +from flexmeasures.data.services.utils import get_asset_or_sensor_from_ref from flexmeasures.api.common.schemas.users import AccountIdField from flexmeasures.api.common.responses import ( + fallback_schedule_redirect, invalid_flex_config, request_processed, + unknown_schedule, + unrecognized_event, +) +from flexmeasures.api.common.utils.validators import ( + optional_duration_accepted, ) from flexmeasures.utils.coding_utils import flatten_unique +from flexmeasures.utils.time_utils import duration_isoformat from flexmeasures.ui.utils.view_utils import set_session_variables @@ -516,3 +526,195 @@ def trigger_schedule( response = dict(schedule=jobs[-1].id) d, s = request_processed() return dict(**response, **d), s + + @route("//schedules/", methods=["GET"]) + @use_kwargs( + { + "asset": AssetIdField(data_key="id", status_if_not_found=404), + "job_id": fields.Str(data_key="uuid"), + }, + location="path", + ) + @optional_duration_accepted( + timedelta(hours=6) + ) # todo: make this a Marshmallow field + @permission_required_for_context("read", ctx_arg_name="asset") + def get_schedule( # noqa: C901 + self, asset: GenericAsset, job_id: str, duration: timedelta, **kwargs + ): + """Get a schedule from FlexMeasures for multiple devices. + + .. :quickref: Schedule; Download schedule from the platform for multiple devices + + **Optional fields** + + - "duration" (6 hours by default; can be increased to plan further into the future) + + **Example response** + + This message contains a schedule indicating two devices to consume at various power + rates from 10am UTC onwards for a duration of 45 minutes. + + .. sourcecode:: json + + { + "schedule": [ + { + "sensor": 1, + "values": [ + 2.15, + 3, + 2 + ], + "start": "2015-06-02T10:00:00+00:00", + "duration": "PT45M", + "unit": "MW" + }, + { + "sensor": 2, + "values": [ + 2.15, + 3, + 2 + ], + "start": "2015-06-02T10:00:00+00:00", + "duration": "PT45M", + "unit": "MW" + } + ] + } + + :reqheader Authorization: The authentication token + :reqheader Content-Type: application/json + :resheader Content-Type: application/json + :status 200: PROCESSED + :status 400: INVALID_TIMEZONE, INVALID_DOMAIN, INVALID_UNIT, UNKNOWN_SCHEDULE, UNRECOGNIZED_CONNECTION_GROUP + :status 401: UNAUTHORIZED + :status 403: INVALID_SENDER + :status 405: INVALID_METHOD + :status 422: UNPROCESSABLE_ENTITY + """ + + planning_horizon = min( # type: ignore + duration, current_app.config.get("FLEXMEASURES_PLANNING_HORIZON") + ) + + # Look up the scheduling job + connection = current_app.queues["scheduling"].connection + + try: # First try the scheduling queue + job = Job.fetch(job_id, connection=connection) + except NoSuchJobError: + return unrecognized_event(job_id, "job") + + scheduler_info = job.meta.get("scheduler_info", dict(scheduler="")) + scheduler_info_msg = f"{scheduler_info['scheduler']} was used." + + if job.is_finished: + error_message = "A scheduling job has been processed with your job ID, but " + + elif job.is_failed: # Try to inform the user on why the job failed + e = job.meta.get( + "exception", + Exception( + "The job does not state why it failed. " + "The worker may be missing an exception handler, " + "or its exception handler is not storing the exception as job meta data." + ), + ) + message = f"Scheduling job failed with {type(e).__name__}: {e}. {scheduler_info_msg}" + + fallback_job_id = job.meta.get("fallback_job_id") + + # redirect to the fallback schedule endpoint if the fallback_job_id + # is defined in the metadata of the original job + if fallback_job_id is not None: + return fallback_schedule_redirect( + message, + url_for( + "AssetAPI:get_schedule", + uuid=fallback_job_id, + id=asset.id, + _external=True, + ), + ) + else: + return unknown_schedule(message) + + elif job.is_started: + return unknown_schedule(f"Scheduling job in progress. {scheduler_info_msg}") + elif job.is_queued: + return unknown_schedule( + f"Scheduling job waiting to be processed. {scheduler_info_msg}" + ) + elif job.is_deferred: + try: + preferred_job = job.dependency + except NoSuchJobError: + return unknown_schedule( + f"Scheduling job waiting for unknown job to be processed. {scheduler_info_msg}" + ) + return unknown_schedule( + f'Scheduling job waiting for {preferred_job.status} job "{preferred_job.id}" to be processed. {scheduler_info_msg}' + ) + else: + return unknown_schedule( + f"Scheduling job has an unknown status. {scheduler_info_msg}" + ) + + overall_schedule_response = [] + for child_job in job.fetch_dependencies(): + sensor = get_asset_or_sensor_from_ref(child_job.kwargs["asset_or_sensor"]) + schedule_start = child_job.kwargs["start"] + + data_source = get_data_source_for_job(child_job) + if data_source is None: + return unknown_schedule( + error_message + + f"no data source could be found for {data_source}. {scheduler_info_msg}" + ) + + power_values = sensor.search_beliefs( + event_starts_after=schedule_start, + event_ends_before=schedule_start + planning_horizon, + source=data_source, + most_recent_beliefs_only=True, + one_deterministic_belief_per_event=True, + ) + + sign = 1 + if sensor.get_attribute("consumption_is_positive", True): + sign = -1 + + # For consumption schedules, positive values denote consumption. For the db, consumption is negative + consumption_schedule = sign * simplify_index(power_values)["event_value"] + if consumption_schedule.empty: + return unknown_schedule( + f"{error_message} the schedule was not found in the database. {scheduler_info_msg}" + ) + + # Update the planning window + resolution = sensor.event_resolution + start = consumption_schedule.index[0] + duration = min( + duration, consumption_schedule.index[-1] + resolution - start + ) + consumption_schedule = consumption_schedule[ + start : start + duration - resolution + ] + sensor_schedule_response = dict( + sensor=sensor.id, + values=consumption_schedule.tolist(), + start=isodate.datetime_isoformat(start), + duration=duration_isoformat(duration), + unit=sensor.unit, + ) + overall_schedule_response.append(sensor_schedule_response) + + d, s = request_processed(scheduler_info_msg) + return ( + dict( + scheduler_info=scheduler_info, schedule=overall_schedule_response, **d + ), + s, + ) diff --git a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py index 7919caace..8cfaca4a8 100644 --- a/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py +++ b/flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py @@ -101,14 +101,16 @@ def test_asset_trigger_and_get_schedule( scheduler_source = get_data_source_for_job(scheduling_job) assert scheduler_source is not None - # try to retrieve the schedule through the /sensors//schedules/ [GET] api endpoint + # try to retrieve the schedule through the /assets//schedules/ [GET] api endpoint get_schedule_response = client.get( - url_for( - "SensorAPI:get_schedule", id=sensor.id, uuid=scheduling_job.id - ), # todo: use (last?) job_id from trigger response + url_for("AssetAPI:get_schedule", id=sensor.id, uuid=job_id), query_string={"duration": "PT48H"}, ) print("Server responded with:\n%s" % get_schedule_response.json) assert get_schedule_response.status_code == 200 - # assert get_schedule_response.json["type"] == "GetDeviceMessageResponse" - assert len(get_schedule_response.json["values"]) == expected_length_of_schedule + assert len(get_schedule_response.json["schedule"]) == len(message["flex-model"]) + assert get_schedule_response.json["schedule"][0]["sensor"] == sensor.id + assert ( + len(get_schedule_response.json["schedule"][0]["values"]) + == expected_length_of_schedule + ) From 06bff04a1b23412aa9682ffcf9c25e1f5f600731 Mon Sep 17 00:00:00 2001 From: "F.N. Claessen" Date: Wed, 5 Jun 2024 16:08:16 +0200 Subject: [PATCH 2/2] fix: add scheduler info Signed-off-by: F.N. Claessen --- flexmeasures/data/services/scheduling.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flexmeasures/data/services/scheduling.py b/flexmeasures/data/services/scheduling.py index d1cb7cb0f..b8a48ed2a 100644 --- a/flexmeasures/data/services/scheduling.py +++ b/flexmeasures/data/services/scheduling.py @@ -312,6 +312,9 @@ def create_sequential_scheduling_job( connection=current_app.queues["scheduling"].connection, ) + # Stand-in for MultiStorageScheduler + job.meta["scheduler_info"] = dict(scheduler="A sequential job scheduling policy") + job_status = job.get_status(refresh=True) jobs.append(job)