Skip to content

Commit

Permalink
Refs #22027: Fix and adapt python back-end node to Front-end requirem…
Browse files Browse the repository at this point in the history
…ents

Signed-off-by: eProsima <[email protected]>
  • Loading branch information
JesusPoderoso committed Nov 7, 2024
1 parent edd093c commit 78df762
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 95 deletions.
36 changes: 22 additions & 14 deletions sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import threading
import time
import signal
import sustainml_swig
import sys
from orchestrator_node import orchestrator_node, utils
from werkzeug.serving import make_server
Expand All @@ -30,27 +31,28 @@
# Flask server default route
@server.route('/')
def hello_world():
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.<br>'}), 200
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.'}), 200

# Send user input data to orchestrator
@server.route('/user_input', methods=['POST'])
def user_input():
data = request.json
ret = orchestrator.send_user_input(data)
if not ret:
task_id = orchestrator.send_user_input(data)
if task_id is None:
return jsonify({'error': 'Invalid input data'}), 400
return jsonify({'message': 'User input data sent successfully.<br>'}), 200
return jsonify({'message': 'User input data sent successfully.',
'task_id': utils.task_json(task_id)}), 200

# Retrieve Node status methods
@server.route('/status', methods=['GET'])
def status():
return jsonify({'status': f'{orchestrator.get_all_status()}'}), 200
return jsonify({'status': orchestrator.get_all_status()}), 200

@server.route('/status', methods=['POST'])
def status_args():
data = request.json
node_id = data.get('node_id')
return jsonify({'status': f'{orchestrator.get_status(node_id)}'}), 200
return jsonify({'status': orchestrator.get_status(node_id)}), 200

# Retrieve Node results methods
@server.route('/results', methods=['GET'])
Expand All @@ -64,20 +66,26 @@ def results():
model = orchestrator.get_results(utils.node_id.ML_MODEL_PROVIDER.value, last_task_id)
hardware = orchestrator.get_results(utils.node_id.HW_PROVIDER.value, last_task_id)
carbontracker = orchestrator.get_results(utils.node_id.CARBONTRACKER.value, last_task_id)
json = {f'{utils.string_node(utils.node_id.APP_REQUIREMENTS.value)}': f'{app_req}',
f'{utils.string_node(utils.node_id.ML_MODEL_METADATA.value)}': f'{metadata}',
f'{utils.string_node(utils.node_id.HW_CONSTRAINTS.value)}': f'{constraints}',
f'{utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)}': f'{model}',
f'{utils.string_node(utils.node_id.HW_PROVIDER.value)}': f'{hardware}',
f'{utils.string_node(utils.node_id.CARBONTRACKER.value)}': f'{carbontracker}'}
task_json = {'problem_id': last_task_id.problem_id(), 'iteration_id': last_task_id.iteration_id()}
json = {utils.string_node(utils.node_id.APP_REQUIREMENTS.value): app_req,
utils.string_node(utils.node_id.ML_MODEL_METADATA.value): metadata,
utils.string_node(utils.node_id.HW_CONSTRAINTS.value): constraints,
utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value): model,
utils.string_node(utils.node_id.HW_PROVIDER.value): hardware,
utils.string_node(utils.node_id.CARBONTRACKER.value): carbontracker,
'task_id': task_json}
return jsonify(json), 200

@server.route('/results', methods=['POST'])
def results_args():
data = request.json
node_id = data.get('node_id')
task_id = data.get('task_id')
return jsonify({f'{utils.string_node(node_id)}': f'{orchestrator.get_results(node_id, task_id)}'}), 200
json_task = data.get('task_id')
if json_task is not None:
task_id = sustainml_swig.set_task_id(json_task.get('problem_id', 0), json_task.get('iteration_id', 0))
else:
task_id = None
return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200

# Flask server shutdown route
@server.route('/shutdown', methods=['GET'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,159 +103,165 @@ def get_last_task_id(self):
return self.handler_.last_task_id

def get_all_status(self):
output = ""
json_output = {}
for key, value in self.handler_.node_status_.items():
output += utils.string_node(key) + " node status " + utils.string_status(value) + "<br>"
if output == "":
output = "No nodes have reported their status yet.\n"
return output
json_output[utils.string_node(key)] = utils.string_status(value)
return json_output

def get_status(self, node_id):
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
if node_id is None:
return self.get_all_status()
else:
return utils.string_status(utils.node_status.INACTIVE.value)
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
else:
return utils.string_status(utils.node_status.INACTIVE.value)

def get_app_requirements(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.APP_REQUIREMENTS.value):
self.handler_.condition.wait()
def get_app_requirements(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.APP_REQUIREMENTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_app_requirements(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
app_requirements_str_list = node_data.app_requirements()
json_output = {'app_requirements': f'{utils.string_std_vector(app_requirements_str_list)}<br>'}
json_output = {'task_id': task_json,
'app_requirements': utils.string_std_vector(app_requirements_str_list)}
return json_output

def get_model_metadata(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_METADATA.value):
self.handler_.condition.wait()
def get_model_metadata(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_METADATA.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_metadata(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
keywords_str_list = node_data.keywords()
metadata_str_list = node_data.ml_model_metadata()
json_output = {'keywords': f'{utils.string_std_vector(keywords_str_list)}<br>',
'metadata': f'{utils.string_std_vector(metadata_str_list)}<br>'}
json_output = {'task_id': task_json,
'keywords': utils.string_std_vector(keywords_str_list),
'metadata': utils.string_std_vector(metadata_str_list)}
return json_output

def get_hw_constraints(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_CONSTRAINTS.value):
self.handler_.condition.wait()
def get_hw_constraints(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_CONSTRAINTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_constraints(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
max_value = node_data.max_memory_footprint()
required_hardware = node_data.hardware_required()
json_output = {'max_memory_footprint': f'{max_value}<br>',
'hardware_required': f'{utils.string_std_vector(required_hardware)}<br>'}
json_output = {'task_id': task_json,
'max_memory_footprint': max_value,
'hardware_required': utils.string_std_vector(required_hardware)}
return json_output

def get_ml_model_provider(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_PROVIDER.value):
self.handler_.condition.wait()
def get_ml_model_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
model = node_data.model()
model_path = node_data.model_path()
model_properties = node_data.model_properties()
model_properties_path = node_data.model_properties_path()
input_batch = node_data.input_batch()
target_latency = node_data.target_latency()
json_output = {'model': f'{model}<br>',
'model_path': f'{model_path}<br>',
'model_properties': f'{model_properties}<br>',
'model_properties_path': f'{model_properties_path}<br>',
'input_batch': f'{utils.string_std_vector(input_batch)}<br>',
'target_latency': f'{target_latency}<br>'}
json_output = {'task_id': task_json,
'model': model,
'model_path': model_path,
'model_properties': model_properties,
'model_properties_path': model_properties_path,
'input_batch': utils.string_std_vector(input_batch),
'target_latency': target_latency}
return json_output

def get_hw_provider(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_PROVIDER.value):
self.handler_.condition.wait()
def get_hw_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
hw_description = node_data.hw_description()
power_consumption = node_data.power_consumption()
latency = node_data.latency()
memory_footprint_of_ml_model = node_data.memory_footprint_of_ml_model()
json_output = {'hw_description': f'{hw_description}<br>',
'power_consumption': f'{power_consumption}<br>',
'latency': f'{latency}<br>',
'memory_footprint_of_ml_model': f'{memory_footprint_of_ml_model}<br>'}
json_output = {'task_id': task_json,
'hw_description': hw_description,
'power_consumption': power_consumption,
'latency': latency,
'memory_footprint_of_ml_model': memory_footprint_of_ml_model}
return json_output

def get_carbontracker(self, task_id, wait):
if wait:
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.CARBONTRACKER.value):
self.handler_.condition.wait()
def get_carbontracker(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.CARBONTRACKER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_carbontracker(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
carbon_footprint = node_data.carbon_footprint()
energy_consumption = node_data.energy_consumption()
carbon_intensity = node_data.carbon_intensity()
json_output = {'carbon_footprint': f'{carbon_footprint}<br>',
'energy_consumption': f'{energy_consumption}<br>',
'carbon_intensity': f'{carbon_intensity}<br>'}
json_output = {'task_id': task_json,
'carbon_footprint': carbon_footprint,
'energy_consumption': energy_consumption,
'carbon_intensity': carbon_intensity}
return json_output

def get_results(self, node_id, task_id):
wait = False
if task_id is None:
task_id = self.get_last_task_id()
wait = True

if node_id == utils.node_id.APP_REQUIREMENTS.value:
return self.get_app_requirements(task_id, wait)
return self.get_app_requirements(task_id)
elif node_id == utils.node_id.ML_MODEL_METADATA.value:
return self.get_model_metadata(task_id, wait)
return self.get_model_metadata(task_id)
elif node_id == utils.node_id.HW_CONSTRAINTS.value:
return self.get_hw_constraints(task_id, wait)
return self.get_hw_constraints(task_id)
elif node_id == utils.node_id.ML_MODEL_PROVIDER.value:
return self.get_ml_model_provider(task_id, wait)
return self.get_ml_model_provider(task_id)
elif node_id == utils.node_id.HW_PROVIDER.value:
return self.get_hw_provider(task_id, wait)
return self.get_hw_provider(task_id)
elif node_id == utils.node_id.CARBONTRACKER.value:
return self.get_carbontracker(task_id, wait)
return self.get_carbontracker(task_id)
else:
return utils.string_node(node_id) + " node does not have any results to show.<br>"
message = utils.string_node(node_id) + " node does not have any results to show."
return {'message': message, 'task_id': utils.task_json(task_id)}

def send_user_input(self, json_data):
pair = self.node_.prepare_new_task()
Expand Down Expand Up @@ -286,4 +292,8 @@ def send_user_input(self, json_data):
json_obj = utils.json_dict(extra_data)
data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8)
user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist()))
return self.node_.start_task(task_id, user_input)

if self.node_.start_task(task_id, user_input):
return task_id
else:
return None
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,23 @@ def string_status(status):

def string_node(node):
if node == node_id.APP_REQUIREMENTS.value: # ID_APP_REQUIREMENTS
return "Application-level requirements"
return "APP_REQUIREMENTS"
elif node == node_id.CARBONTRACKER.value: # ID_CARBON_FOOTPRINT
return "Carbontracker"
return "CARBON_FOOTPRINT"
elif node == node_id.HW_CONSTRAINTS.value: # ID_HW_CONSTRAINTS
return "HW Constraints for inference"
return "HW_CONSTRAINTS"
elif node == node_id.HW_PROVIDER.value: # ID_HW_RESOURCES
return "HW Provider"
return "HW_RESOURCES"
elif node == node_id.ML_MODEL_METADATA.value: # ID_ML_MODEL_METADATA
return "ML Model Metadata"
return "ML_MODEL_METADATA"
elif node == node_id.ML_MODEL_PROVIDER.value: # ID_ML_MODEL
return "ML Model Provider"
elif node == node_id.ORCHESTRATOR.value: # ID_ORCHESTRATOR (MAX is ID 6)
return "Orchestrator"
return "ML_MODEL"
elif node == node_id.MAX.value: # MAX
return "MAX"
elif node == node_id.ORCHESTRATOR.value: # ID_ORCHESTRATOR
return "ORCHESTRATOR"
else:
return "Unknown node"
return "UNKNOWN"

def string_std_vector(vector):
output = ""
Expand All @@ -86,6 +88,9 @@ def string_std_vector(vector):
output += str(vector[i])
return output

def task_json(task_id):
return {"problem_id": task_id.problem_id(), "iteration_id": task_id.iteration_id()}

def string_task(task):
return "{" + str(task.problem_id()) + ", " + str(task.iteration_id()) + "}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,13 @@
}
return task_id;
}

types::TaskId* set_task_id(
const uint32_t& problem_id,
const uint32_t& iteration_id)
{
return new types::TaskId(problem_id, iteration_id);
}
%}

// Include the class interfaces
Expand Down

0 comments on commit 78df762

Please sign in to comment.