-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: add get_schedule for asset schedules #1086
base: feature/api/endpoint-for-scheduling-asset
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,33 +3,43 @@ | |
from datetime import datetime, timedelta | ||
import json | ||
|
||
from flask import current_app | ||
import isodate | ||
from flask import current_app, url_for | ||
from flask_classful import FlaskView, route | ||
from flask_security import auth_required | ||
from flask_json import as_json | ||
from marshmallow import fields, ValidationError | ||
from rq.job import Job, NoSuchJobError | ||
from webargs.flaskparser import use_kwargs, use_args | ||
from sqlalchemy import select, delete | ||
|
||
from flexmeasures.auth.decorators import permission_required_for_context | ||
from flexmeasures.data import db | ||
from flexmeasures.data.models.user import Account | ||
from flexmeasures.data.models.generic_assets import GenericAsset | ||
from flexmeasures.data.schemas.generic_assets import ( | ||
GenericAssetSchema as AssetSchema, | ||
GenericAssetIdField as AssetIdField, | ||
) | ||
from flexmeasures.data.queries.utils import simplify_index | ||
from flexmeasures.data.schemas import AssetIdField | ||
from flexmeasures.data.schemas.generic_assets import GenericAssetSchema as AssetSchema | ||
from flexmeasures.data.schemas.scheduling import AssetTriggerSchema | ||
from flexmeasures.data.schemas.times import AwareDateTimeField | ||
from flexmeasures.data.services.scheduling import ( | ||
create_sequential_scheduling_job, | ||
get_data_source_for_job, | ||
) | ||
from flexmeasures.data.services.utils import get_asset_or_sensor_from_ref | ||
from flexmeasures.api.common.schemas.users import AccountIdField | ||
from flexmeasures.api.common.responses import ( | ||
fallback_schedule_redirect, | ||
invalid_flex_config, | ||
request_processed, | ||
unknown_schedule, | ||
unrecognized_event, | ||
) | ||
from flexmeasures.api.common.utils.validators import ( | ||
optional_duration_accepted, | ||
) | ||
from flexmeasures.utils.coding_utils import flatten_unique | ||
from flexmeasures.utils.time_utils import duration_isoformat | ||
from flexmeasures.ui.utils.view_utils import set_session_variables | ||
|
||
|
||
|
@@ -516,3 +526,195 @@ def trigger_schedule( | |
response = dict(schedule=jobs[-1].id) | ||
d, s = request_processed() | ||
return dict(**response, **d), s | ||
|
||
@route("/<id>/schedules/<uuid>", methods=["GET"]) | ||
@use_kwargs( | ||
{ | ||
"asset": AssetIdField(data_key="id", status_if_not_found=404), | ||
"job_id": fields.Str(data_key="uuid"), | ||
}, | ||
location="path", | ||
) | ||
@optional_duration_accepted( | ||
timedelta(hours=6) | ||
) # todo: make this a Marshmallow field | ||
@permission_required_for_context("read", ctx_arg_name="asset") | ||
def get_schedule( # noqa: C901 | ||
self, asset: GenericAsset, job_id: str, duration: timedelta, **kwargs | ||
): | ||
"""Get a schedule from FlexMeasures for multiple devices. | ||
|
||
.. :quickref: Schedule; Download schedule from the platform for multiple devices | ||
|
||
**Optional fields** | ||
|
||
- "duration" (6 hours by default; can be increased to plan further into the future) | ||
|
||
**Example response** | ||
|
||
This message contains a schedule indicating two devices to consume at various power | ||
rates from 10am UTC onwards for a duration of 45 minutes. | ||
|
||
.. sourcecode:: json | ||
|
||
{ | ||
"schedule": [ | ||
{ | ||
"sensor": 1, | ||
"values": [ | ||
2.15, | ||
3, | ||
2 | ||
], | ||
"start": "2015-06-02T10:00:00+00:00", | ||
"duration": "PT45M", | ||
"unit": "MW" | ||
}, | ||
{ | ||
"sensor": 2, | ||
"values": [ | ||
2.15, | ||
3, | ||
2 | ||
], | ||
"start": "2015-06-02T10:00:00+00:00", | ||
"duration": "PT45M", | ||
"unit": "MW" | ||
} | ||
] | ||
} | ||
|
||
:reqheader Authorization: The authentication token | ||
:reqheader Content-Type: application/json | ||
:resheader Content-Type: application/json | ||
:status 200: PROCESSED | ||
:status 400: INVALID_TIMEZONE, INVALID_DOMAIN, INVALID_UNIT, UNKNOWN_SCHEDULE, UNRECOGNIZED_CONNECTION_GROUP | ||
:status 401: UNAUTHORIZED | ||
:status 403: INVALID_SENDER | ||
:status 405: INVALID_METHOD | ||
:status 422: UNPROCESSABLE_ENTITY | ||
""" | ||
|
||
planning_horizon = min( # type: ignore | ||
duration, current_app.config.get("FLEXMEASURES_PLANNING_HORIZON") | ||
) | ||
|
||
# Look up the scheduling job | ||
connection = current_app.queues["scheduling"].connection | ||
|
||
try: # First try the scheduling queue | ||
job = Job.fetch(job_id, connection=connection) | ||
except NoSuchJobError: | ||
return unrecognized_event(job_id, "job") | ||
|
||
scheduler_info = job.meta.get("scheduler_info", dict(scheduler="")) | ||
scheduler_info_msg = f"{scheduler_info['scheduler']} was used." | ||
|
||
if job.is_finished: | ||
error_message = "A scheduling job has been processed with your job ID, but " | ||
|
||
elif job.is_failed: # Try to inform the user on why the job failed | ||
e = job.meta.get( | ||
"exception", | ||
Exception( | ||
"The job does not state why it failed. " | ||
"The worker may be missing an exception handler, " | ||
"or its exception handler is not storing the exception as job meta data." | ||
), | ||
) | ||
message = f"Scheduling job failed with {type(e).__name__}: {e}. {scheduler_info_msg}" | ||
|
||
fallback_job_id = job.meta.get("fallback_job_id") | ||
|
||
# redirect to the fallback schedule endpoint if the fallback_job_id | ||
# is defined in the metadata of the original job | ||
if fallback_job_id is not None: | ||
return fallback_schedule_redirect( | ||
message, | ||
url_for( | ||
"AssetAPI:get_schedule", | ||
uuid=fallback_job_id, | ||
id=asset.id, | ||
_external=True, | ||
), | ||
) | ||
else: | ||
return unknown_schedule(message) | ||
|
||
elif job.is_started: | ||
return unknown_schedule(f"Scheduling job in progress. {scheduler_info_msg}") | ||
elif job.is_queued: | ||
return unknown_schedule( | ||
f"Scheduling job waiting to be processed. {scheduler_info_msg}" | ||
) | ||
elif job.is_deferred: | ||
try: | ||
preferred_job = job.dependency | ||
except NoSuchJobError: | ||
return unknown_schedule( | ||
f"Scheduling job waiting for unknown job to be processed. {scheduler_info_msg}" | ||
) | ||
return unknown_schedule( | ||
f'Scheduling job waiting for {preferred_job.status} job "{preferred_job.id}" to be processed. {scheduler_info_msg}' | ||
) | ||
else: | ||
return unknown_schedule( | ||
f"Scheduling job has an unknown status. {scheduler_info_msg}" | ||
) | ||
|
||
overall_schedule_response = [] | ||
for child_job in job.fetch_dependencies(): | ||
sensor = get_asset_or_sensor_from_ref(child_job.kwargs["asset_or_sensor"]) | ||
schedule_start = child_job.kwargs["start"] | ||
|
||
data_source = get_data_source_for_job(child_job) | ||
if data_source is None: | ||
return unknown_schedule( | ||
error_message | ||
+ f"no data source could be found for {data_source}. {scheduler_info_msg}" | ||
) | ||
|
||
power_values = sensor.search_beliefs( | ||
event_starts_after=schedule_start, | ||
event_ends_before=schedule_start + planning_horizon, | ||
source=data_source, | ||
most_recent_beliefs_only=True, | ||
one_deterministic_belief_per_event=True, | ||
) | ||
|
||
sign = 1 | ||
if sensor.get_attribute("consumption_is_positive", True): | ||
sign = -1 | ||
|
||
# For consumption schedules, positive values denote consumption. For the db, consumption is negative | ||
consumption_schedule = sign * simplify_index(power_values)["event_value"] | ||
if consumption_schedule.empty: | ||
return unknown_schedule( | ||
f"{error_message} the schedule was not found in the database. {scheduler_info_msg}" | ||
) | ||
|
||
# Update the planning window | ||
resolution = sensor.event_resolution | ||
start = consumption_schedule.index[0] | ||
duration = min( | ||
duration, consumption_schedule.index[-1] + resolution - start | ||
) | ||
consumption_schedule = consumption_schedule[ | ||
start : start + duration - resolution | ||
] | ||
sensor_schedule_response = dict( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this one include the |
||
sensor=sensor.id, | ||
values=consumption_schedule.tolist(), | ||
start=isodate.datetime_isoformat(start), | ||
duration=duration_isoformat(duration), | ||
unit=sensor.unit, | ||
) | ||
overall_schedule_response.append(sensor_schedule_response) | ||
|
||
d, s = request_processed(scheduler_info_msg) | ||
return ( | ||
dict( | ||
scheduler_info=scheduler_info, schedule=overall_schedule_response, **d | ||
), | ||
s, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,14 +101,16 @@ def test_asset_trigger_and_get_schedule( | |
scheduler_source = get_data_source_for_job(scheduling_job) | ||
assert scheduler_source is not None | ||
|
||
# try to retrieve the schedule through the /sensors/<id>/schedules/<job_id> [GET] api endpoint | ||
# try to retrieve the schedule through the /assets/<id>/schedules/<job_id> [GET] api endpoint | ||
get_schedule_response = client.get( | ||
url_for( | ||
"SensorAPI:get_schedule", id=sensor.id, uuid=scheduling_job.id | ||
), # todo: use (last?) job_id from trigger response | ||
url_for("AssetAPI:get_schedule", id=sensor.id, uuid=job_id), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps we can try to get the schedules right before letting the worker take the jobs (Line 85). Also, before the jobs are enqueued to test the error of a non-existent job. |
||
query_string={"duration": "PT48H"}, | ||
) | ||
print("Server responded with:\n%s" % get_schedule_response.json) | ||
assert get_schedule_response.status_code == 200 | ||
# assert get_schedule_response.json["type"] == "GetDeviceMessageResponse" | ||
assert len(get_schedule_response.json["values"]) == expected_length_of_schedule | ||
assert len(get_schedule_response.json["schedule"]) == len(message["flex-model"]) | ||
assert get_schedule_response.json["schedule"][0]["sensor"] == sensor.id | ||
assert ( | ||
len(get_schedule_response.json["schedule"][0]["values"]) | ||
== expected_length_of_schedule | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -312,6 +312,9 @@ def create_sequential_scheduling_job( | |
connection=current_app.queues["scheduling"].connection, | ||
) | ||
|
||
# Stand-in for MultiStorageScheduler | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! I wonder if we should reflect that the output of the schedules used a sequential approach.. |
||
job.meta["scheduler_info"] = dict(scheduler="A sequential job scheduling policy") | ||
|
||
job_status = job.get_status(refresh=True) | ||
|
||
jobs.append(job) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks that there's is quite a large overlap between this logic and the one found in the sensor get_schedule endpoint. I'd say to keep it like that for now given that this endpoint might still get some changes when we introduce the non-sequential algorithm.