Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22027] Use extra_data to propagate HW Constraints node data #58

Merged
merged 6 commits into from
Dec 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refs #22027: Fix and adapt python back-end node to Front-end requirem…
…ents

Signed-off-by: eProsima <jesuspoderoso@eprosima.com>
JesusPoderoso committed Nov 14, 2024
commit 80a712d3a281f235ef796694cbdcb9003da6b9c1
36 changes: 22 additions & 14 deletions sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
import threading
import time
import signal
import sustainml_swig
import sys
from orchestrator_node import orchestrator_node, utils
from werkzeug.serving import make_server
@@ -30,27 +31,28 @@
# Flask server default route
@server.route('/')
def hello_world():
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.<br>'}), 200
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.'}), 200

# Send user input data to orchestrator
@server.route('/user_input', methods=['POST'])
def user_input():
data = request.json
ret = orchestrator.send_user_input(data)
if not ret:
task_id = orchestrator.send_user_input(data)
if task_id is None:
return jsonify({'error': 'Invalid input data'}), 400
return jsonify({'message': 'User input data sent successfully.<br>'}), 200
return jsonify({'message': 'User input data sent successfully.',
'task_id': utils.task_json(task_id)}), 200

# Retrieve Node status methods
@server.route('/status', methods=['GET'])
def status():
return jsonify({'status': f'{orchestrator.get_all_status()}'}), 200
return jsonify({'status': orchestrator.get_all_status()}), 200

@server.route('/status', methods=['POST'])
def status_args():
data = request.json
node_id = data.get('node_id')
return jsonify({'status': f'{orchestrator.get_status(node_id)}'}), 200
return jsonify({'status': orchestrator.get_status(node_id)}), 200

# Retrieve Node results methods
@server.route('/results', methods=['GET'])
@@ -64,20 +66,26 @@ def results():
model = orchestrator.get_results(utils.node_id.ML_MODEL_PROVIDER.value, last_task_id)
hardware = orchestrator.get_results(utils.node_id.HW_PROVIDER.value, last_task_id)
carbontracker = orchestrator.get_results(utils.node_id.CARBONTRACKER.value, last_task_id)
json = {f'{utils.string_node(utils.node_id.APP_REQUIREMENTS.value)}': f'{app_req}',
f'{utils.string_node(utils.node_id.ML_MODEL_METADATA.value)}': f'{metadata}',
f'{utils.string_node(utils.node_id.HW_CONSTRAINTS.value)}': f'{constraints}',
f'{utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)}': f'{model}',
f'{utils.string_node(utils.node_id.HW_PROVIDER.value)}': f'{hardware}',
f'{utils.string_node(utils.node_id.CARBONTRACKER.value)}': f'{carbontracker}'}
task_json = {'problem_id': last_task_id.problem_id(), 'iteration_id': last_task_id.iteration_id()}
json = {utils.string_node(utils.node_id.APP_REQUIREMENTS.value): app_req,
utils.string_node(utils.node_id.ML_MODEL_METADATA.value): metadata,
utils.string_node(utils.node_id.HW_CONSTRAINTS.value): constraints,
utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value): model,
utils.string_node(utils.node_id.HW_PROVIDER.value): hardware,
utils.string_node(utils.node_id.CARBONTRACKER.value): carbontracker,
'task_id': task_json}
return jsonify(json), 200

@server.route('/results', methods=['POST'])
def results_args():
data = request.json
node_id = data.get('node_id')
task_id = data.get('task_id')
return jsonify({f'{utils.string_node(node_id)}': f'{orchestrator.get_results(node_id, task_id)}'}), 200
json_task = data.get('task_id')
if json_task is not None:
task_id = sustainml_swig.set_task_id(json_task.get('problem_id', 0), json_task.get('iteration_id', 0))
else:
task_id = None
return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200

# Flask server shutdown route
@server.route('/shutdown', methods=['GET'])
Original file line number Diff line number Diff line change
@@ -103,159 +103,165 @@ def get_last_task_id(self):
return self.handler_.last_task_id

def get_all_status(self):
output = ""
json_output = {}
for key, value in self.handler_.node_status_.items():
output += utils.string_node(key) + " node status " + utils.string_status(value) + "<br>"
if output == "":
output = "No nodes have reported their status yet.\n"
return output
json_output[utils.string_node(key)] = utils.string_status(value)
return json_output

def get_status(self, node_id):
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
if node_id is None:
return self.get_all_status()
else:
return utils.string_status(utils.node_status.INACTIVE.value)
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
else:
return utils.string_status(utils.node_status.INACTIVE.value)

def get_app_requirements(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.APP_REQUIREMENTS.value):
self.handler_.condition.wait()
def get_app_requirements(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.APP_REQUIREMENTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_app_requirements(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
app_requirements_str_list = node_data.app_requirements()
json_output = {'app_requirements': f'{utils.string_std_vector(app_requirements_str_list)}<br>'}
json_output = {'task_id': task_json,
'app_requirements': utils.string_std_vector(app_requirements_str_list)}
return json_output

def get_model_metadata(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_METADATA.value):
self.handler_.condition.wait()
def get_model_metadata(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_METADATA.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_metadata(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
keywords_str_list = node_data.keywords()
metadata_str_list = node_data.ml_model_metadata()
json_output = {'keywords': f'{utils.string_std_vector(keywords_str_list)}<br>',
'metadata': f'{utils.string_std_vector(metadata_str_list)}<br>'}
json_output = {'task_id': task_json,
'keywords': utils.string_std_vector(keywords_str_list),
'metadata': utils.string_std_vector(metadata_str_list)}
return json_output

def get_hw_constraints(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_CONSTRAINTS.value):
self.handler_.condition.wait()
def get_hw_constraints(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_CONSTRAINTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_constraints(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
max_value = node_data.max_memory_footprint()
required_hardware = node_data.hardware_required()
json_output = {'max_memory_footprint': f'{max_value}<br>',
'hardware_required': f'{utils.string_std_vector(required_hardware)}<br>'}
json_output = {'task_id': task_json,
'max_memory_footprint': max_value,
'hardware_required': utils.string_std_vector(required_hardware)}
return json_output

def get_ml_model_provider(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_PROVIDER.value):
self.handler_.condition.wait()
def get_ml_model_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
model = node_data.model()
model_path = node_data.model_path()
model_properties = node_data.model_properties()
model_properties_path = node_data.model_properties_path()
input_batch = node_data.input_batch()
target_latency = node_data.target_latency()
json_output = {'model': f'{model}<br>',
'model_path': f'{model_path}<br>',
'model_properties': f'{model_properties}<br>',
'model_properties_path': f'{model_properties_path}<br>',
'input_batch': f'{utils.string_std_vector(input_batch)}<br>',
'target_latency': f'{target_latency}<br>'}
json_output = {'task_id': task_json,
'model': model,
'model_path': model_path,
'model_properties': model_properties,
'model_properties_path': model_properties_path,
'input_batch': utils.string_std_vector(input_batch),
'target_latency': target_latency}
return json_output

def get_hw_provider(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_PROVIDER.value):
self.handler_.condition.wait()
def get_hw_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
hw_description = node_data.hw_description()
power_consumption = node_data.power_consumption()
latency = node_data.latency()
memory_footprint_of_ml_model = node_data.memory_footprint_of_ml_model()
json_output = {'hw_description': f'{hw_description}<br>',
'power_consumption': f'{power_consumption}<br>',
'latency': f'{latency}<br>',
'memory_footprint_of_ml_model': f'{memory_footprint_of_ml_model}<br>'}
json_output = {'task_id': task_json,
'hw_description': hw_description,
'power_consumption': power_consumption,
'latency': latency,
'memory_footprint_of_ml_model': memory_footprint_of_ml_model}
return json_output

def get_carbontracker(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.CARBONTRACKER.value):
self.handler_.condition.wait()
def get_carbontracker(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.CARBONTRACKER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_carbontracker(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
carbon_footprint = node_data.carbon_footprint()
energy_consumption = node_data.energy_consumption()
carbon_intensity = node_data.carbon_intensity()
json_output = {'carbon_footprint': f'{carbon_footprint}<br>',
'energy_consumption': f'{energy_consumption}<br>',
'carbon_intensity': f'{carbon_intensity}<br>'}
json_output = {'task_id': task_json,
'carbon_footprint': carbon_footprint,
'energy_consumption': energy_consumption,
'carbon_intensity': carbon_intensity}
return json_output

def get_results(self, node_id, task_id):
wait = False
if task_id is None:
task_id = self.get_last_task_id()
wait = True

if node_id == utils.node_id.APP_REQUIREMENTS.value:
return self.get_app_requirements(task_id, wait)
return self.get_app_requirements(task_id)
elif node_id == utils.node_id.ML_MODEL_METADATA.value:
return self.get_model_metadata(task_id, wait)
return self.get_model_metadata(task_id)
elif node_id == utils.node_id.HW_CONSTRAINTS.value:
return self.get_hw_constraints(task_id, wait)
return self.get_hw_constraints(task_id)
elif node_id == utils.node_id.ML_MODEL_PROVIDER.value:
return self.get_ml_model_provider(task_id, wait)
return self.get_ml_model_provider(task_id)
elif node_id == utils.node_id.HW_PROVIDER.value:
return self.get_hw_provider(task_id, wait)
return self.get_hw_provider(task_id)
elif node_id == utils.node_id.CARBONTRACKER.value:
return self.get_carbontracker(task_id, wait)
return self.get_carbontracker(task_id)
else:
return utils.string_node(node_id) + " node does not have any results to show.<br>"
message = utils.string_node(node_id) + " node does not have any results to show."
return {'message': message, 'task_id': utils.task_json(task_id)}

def send_user_input(self, json_data):
pair = self.node_.prepare_new_task()
@@ -286,4 +292,8 @@ def send_user_input(self, json_data):
json_obj = utils.json_dict(extra_data)
data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8)
user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist()))
return self.node_.start_task(task_id, user_input)

if self.node_.start_task(task_id, user_input):
return task_id
else:
return None
Original file line number Diff line number Diff line change
@@ -59,21 +59,23 @@ def string_status(status):

def string_node(node):
if node == node_id.APP_REQUIREMENTS.value: # ID_APP_REQUIREMENTS
return "Application-level requirements"
return "APP_REQUIREMENTS"
elif node == node_id.CARBONTRACKER.value: # ID_CARBON_FOOTPRINT
return "Carbontracker"
return "CARBON_FOOTPRINT"
elif node == node_id.HW_CONSTRAINTS.value: # ID_HW_CONSTRAINTS
return "HW Constraints for inference"
return "HW_CONSTRAINTS"
elif node == node_id.HW_PROVIDER.value: # ID_HW_RESOURCES
return "HW Provider"
return "HW_RESOURCES"
elif node == node_id.ML_MODEL_METADATA.value: # ID_ML_MODEL_METADATA
return "ML Model Metadata"
return "ML_MODEL_METADATA"
elif node == node_id.ML_MODEL_PROVIDER.value: # ID_ML_MODEL
return "ML Model Provider"
elif node == node_id.ORCHESTRATOR.value: # ID_ORCHESTRATOR (MAX is ID 6)
return "Orchestrator"
return "ML_MODEL"
elif node == node_id.MAX.value: # MAX
return "MAX"
elif node == node_id.ORCHESTRATOR.value: # ID_ORCHESTRATOR
return "ORCHESTRATOR"
else:
return "Unknown node"
return "UNKNOWN"

def string_std_vector(vector):
output = ""
@@ -86,6 +88,9 @@ def string_std_vector(vector):
output += str(vector[i])
return output

def task_json(task_id):
return {"problem_id": task_id.problem_id(), "iteration_id": task_id.iteration_id()}

def string_task(task):
return "{" + str(task.problem_id()) + ", " + str(task.iteration_id()) + "}"

Original file line number Diff line number Diff line change
@@ -170,6 +170,13 @@
}
return task_id;
}

types::TaskId* set_task_id(
const uint32_t& problem_id,
const uint32_t& iteration_id)
{
return new types::TaskId(problem_id, iteration_id);
}
%}

// Include the class interfaces