Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add get_schedule for asset schedules #1086

Open
wants to merge 2 commits into
base: feature/api/endpoint-for-scheduling-asset
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 207 additions & 5 deletions flexmeasures/api/v3_0/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,43 @@
from datetime import datetime, timedelta
import json

from flask import current_app
import isodate
from flask import current_app, url_for
from flask_classful import FlaskView, route
from flask_security import auth_required
from flask_json import as_json
from marshmallow import fields, ValidationError
from rq.job import Job, NoSuchJobError
from webargs.flaskparser import use_kwargs, use_args
from sqlalchemy import select, delete

from flexmeasures.auth.decorators import permission_required_for_context
from flexmeasures.data import db
from flexmeasures.data.models.user import Account
from flexmeasures.data.models.generic_assets import GenericAsset
from flexmeasures.data.schemas.generic_assets import (
GenericAssetSchema as AssetSchema,
GenericAssetIdField as AssetIdField,
)
from flexmeasures.data.queries.utils import simplify_index
from flexmeasures.data.schemas import AssetIdField
from flexmeasures.data.schemas.generic_assets import GenericAssetSchema as AssetSchema
from flexmeasures.data.schemas.scheduling import AssetTriggerSchema
from flexmeasures.data.schemas.times import AwareDateTimeField
from flexmeasures.data.services.scheduling import (
create_sequential_scheduling_job,
get_data_source_for_job,
)
from flexmeasures.data.services.utils import get_asset_or_sensor_from_ref
from flexmeasures.api.common.schemas.users import AccountIdField
from flexmeasures.api.common.responses import (
fallback_schedule_redirect,
invalid_flex_config,
request_processed,
unknown_schedule,
unrecognized_event,
)
from flexmeasures.api.common.utils.validators import (
optional_duration_accepted,
)
from flexmeasures.utils.coding_utils import flatten_unique
from flexmeasures.utils.time_utils import duration_isoformat
from flexmeasures.ui.utils.view_utils import set_session_variables


Expand Down Expand Up @@ -516,3 +526,195 @@ def trigger_schedule(
response = dict(schedule=jobs[-1].id)
d, s = request_processed()
return dict(**response, **d), s

@route("/<id>/schedules/<uuid>", methods=["GET"])
@use_kwargs(
{
"asset": AssetIdField(data_key="id", status_if_not_found=404),
"job_id": fields.Str(data_key="uuid"),
},
location="path",
)
@optional_duration_accepted(
timedelta(hours=6)
) # todo: make this a Marshmallow field
@permission_required_for_context("read", ctx_arg_name="asset")
def get_schedule( # noqa: C901
self, asset: GenericAsset, job_id: str, duration: timedelta, **kwargs
):
"""Get a schedule from FlexMeasures for multiple devices.

.. :quickref: Schedule; Download schedule from the platform for multiple devices

**Optional fields**

- "duration" (6 hours by default; can be increased to plan further into the future)

**Example response**

This message contains a schedule indicating two devices to consume at various power
rates from 10am UTC onwards for a duration of 45 minutes.

.. sourcecode:: json

{
"schedule": [
{
"sensor": 1,
"values": [
2.15,
3,
2
],
"start": "2015-06-02T10:00:00+00:00",
"duration": "PT45M",
"unit": "MW"
},
{
"sensor": 2,
"values": [
2.15,
3,
2
],
"start": "2015-06-02T10:00:00+00:00",
"duration": "PT45M",
"unit": "MW"
}
]
}

:reqheader Authorization: The authentication token
:reqheader Content-Type: application/json
:resheader Content-Type: application/json
:status 200: PROCESSED
:status 400: INVALID_TIMEZONE, INVALID_DOMAIN, INVALID_UNIT, UNKNOWN_SCHEDULE, UNRECOGNIZED_CONNECTION_GROUP
:status 401: UNAUTHORIZED
:status 403: INVALID_SENDER
:status 405: INVALID_METHOD
:status 422: UNPROCESSABLE_ENTITY
"""

planning_horizon = min( # type: ignore
duration, current_app.config.get("FLEXMEASURES_PLANNING_HORIZON")
)

# Look up the scheduling job
connection = current_app.queues["scheduling"].connection

try: # First try the scheduling queue
job = Job.fetch(job_id, connection=connection)
except NoSuchJobError:
return unrecognized_event(job_id, "job")

scheduler_info = job.meta.get("scheduler_info", dict(scheduler=""))
scheduler_info_msg = f"{scheduler_info['scheduler']} was used."

if job.is_finished:
error_message = "A scheduling job has been processed with your job ID, but "

elif job.is_failed: # Try to inform the user on why the job failed
e = job.meta.get(
"exception",
Exception(
"The job does not state why it failed. "
"The worker may be missing an exception handler, "
"or its exception handler is not storing the exception as job meta data."
),
)
message = f"Scheduling job failed with {type(e).__name__}: {e}. {scheduler_info_msg}"

fallback_job_id = job.meta.get("fallback_job_id")

# redirect to the fallback schedule endpoint if the fallback_job_id
# is defined in the metadata of the original job
if fallback_job_id is not None:
return fallback_schedule_redirect(
message,
url_for(
"AssetAPI:get_schedule",
uuid=fallback_job_id,
id=asset.id,
_external=True,
),
)
else:
return unknown_schedule(message)

elif job.is_started:
return unknown_schedule(f"Scheduling job in progress. {scheduler_info_msg}")
elif job.is_queued:
return unknown_schedule(
f"Scheduling job waiting to be processed. {scheduler_info_msg}"
)
elif job.is_deferred:
try:
preferred_job = job.dependency
except NoSuchJobError:
return unknown_schedule(
f"Scheduling job waiting for unknown job to be processed. {scheduler_info_msg}"
)
return unknown_schedule(
f'Scheduling job waiting for {preferred_job.status} job "{preferred_job.id}" to be processed. {scheduler_info_msg}'
)
else:
return unknown_schedule(
f"Scheduling job has an unknown status. {scheduler_info_msg}"
)

overall_schedule_response = []
for child_job in job.fetch_dependencies():
Copy link
Contributor

@victorgarcia98 victorgarcia98 Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that there's is quite a large overlap between this logic and the one found in the sensor get_schedule endpoint. I'd say to keep it like that for now given that this endpoint might still get some changes when we introduce the non-sequential algorithm.

sensor = get_asset_or_sensor_from_ref(child_job.kwargs["asset_or_sensor"])
schedule_start = child_job.kwargs["start"]

data_source = get_data_source_for_job(child_job)
if data_source is None:
return unknown_schedule(
error_message
+ f"no data source could be found for {data_source}. {scheduler_info_msg}"
)

power_values = sensor.search_beliefs(
event_starts_after=schedule_start,
event_ends_before=schedule_start + planning_horizon,
source=data_source,
most_recent_beliefs_only=True,
one_deterministic_belief_per_event=True,
)

sign = 1
if sensor.get_attribute("consumption_is_positive", True):
sign = -1

# For consumption schedules, positive values denote consumption. For the db, consumption is negative
consumption_schedule = sign * simplify_index(power_values)["event_value"]
if consumption_schedule.empty:
return unknown_schedule(
f"{error_message} the schedule was not found in the database. {scheduler_info_msg}"
)

# Update the planning window
resolution = sensor.event_resolution
start = consumption_schedule.index[0]
duration = min(
duration, consumption_schedule.index[-1] + resolution - start
)
consumption_schedule = consumption_schedule[
start : start + duration - resolution
]
sensor_schedule_response = dict(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one include the scheduler_info as well? That way we can detect which schedules were fallback and which not.

sensor=sensor.id,
values=consumption_schedule.tolist(),
start=isodate.datetime_isoformat(start),
duration=duration_isoformat(duration),
unit=sensor.unit,
)
overall_schedule_response.append(sensor_schedule_response)

d, s = request_processed(scheduler_info_msg)
return (
dict(
scheduler_info=scheduler_info, schedule=overall_schedule_response, **d
),
s,
)
14 changes: 8 additions & 6 deletions flexmeasures/api/v3_0/tests/test_asset_schedules_fresh_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ def test_asset_trigger_and_get_schedule(
scheduler_source = get_data_source_for_job(scheduling_job)
assert scheduler_source is not None

# try to retrieve the schedule through the /sensors/<id>/schedules/<job_id> [GET] api endpoint
# try to retrieve the schedule through the /assets/<id>/schedules/<job_id> [GET] api endpoint
get_schedule_response = client.get(
url_for(
"SensorAPI:get_schedule", id=sensor.id, uuid=scheduling_job.id
), # todo: use (last?) job_id from trigger response
url_for("AssetAPI:get_schedule", id=sensor.id, uuid=job_id),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can try to get the schedules right before letting the worker take the jobs (Line 85). Also, before the jobs are enqueued to test the error of a non-existent job.

query_string={"duration": "PT48H"},
)
print("Server responded with:\n%s" % get_schedule_response.json)
assert get_schedule_response.status_code == 200
# assert get_schedule_response.json["type"] == "GetDeviceMessageResponse"
assert len(get_schedule_response.json["values"]) == expected_length_of_schedule
assert len(get_schedule_response.json["schedule"]) == len(message["flex-model"])
assert get_schedule_response.json["schedule"][0]["sensor"] == sensor.id
assert (
len(get_schedule_response.json["schedule"][0]["values"])
== expected_length_of_schedule
)
3 changes: 3 additions & 0 deletions flexmeasures/data/services/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ def create_sequential_scheduling_job(
connection=current_app.queues["scheduling"].connection,
)

# Stand-in for MultiStorageScheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

I wonder if we should reflect that the output of the schedules used a sequential approach..

job.meta["scheduler_info"] = dict(scheduler="A sequential job scheduling policy")

job_status = job.get_status(refresh=True)

jobs.append(job)
Expand Down
Loading