Skip to content

Commit

Permalink
Refactor Message for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
sharkovsky committed May 16, 2023
1 parent 276270c commit 4d7355c
Show file tree
Hide file tree
Showing 15 changed files with 233 additions and 319 deletions.
2 changes: 1 addition & 1 deletion fedbiomed/common/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def emit(self, record: Any):
import fedbiomed.common.message as message

# verify the message content with Message validator
_ = message.NodeMessages.reply_create(msg)
_ = message.NodeMessages.format_outgoing_message(msg)
self._mqtt.publish(self._topic, json.dumps(msg))
except Exception: # pragma: no cover
# obviously cannot call logger here... (infinite loop) cannot also send the message to the researcher
Expand Down
287 changes: 99 additions & 188 deletions fedbiomed/common/message.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion fedbiomed/common/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def send_error(self, errnum: ErrorNumbers, extra_msg: str = "", researcher_id: s
)

# just check the syntax before sending
_ = message.NodeMessages.reply_create(msg)
_ = message.NodeMessages.format_outgoing_message(msg)
self._mqtt.publish("general/researcher", json.serialize_msg(msg))

def is_failed(self) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion fedbiomed/node/history_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def add_scalar(
"""

self.messaging.send_message(NodeMessages.reply_create({
self.messaging.send_message(NodeMessages.format_outgoing_message({
'node_id': environ['NODE_ID'],
'job_id': self.job_id,
'researcher_id': self.researcher_id,
Expand Down
37 changes: 19 additions & 18 deletions fedbiomed/node/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ def on_message(self, msg: dict, topic: str = None):
try:
# get the request from the received message (from researcher)
command = msg['command']
request = NodeMessages.request_create(msg).get_dict()
request = NodeMessages.format_incoming_message(msg).get_dict()
if command in ['train', 'secagg']:
# add training task to queue
self.add_task(request)
elif command == 'secagg-delete':
self._task_secagg_delete(NodeMessages.request_create(msg))
self._task_secagg_delete(NodeMessages.format_incoming_message(msg))
elif command == 'ping':
self.messaging.send_message(
NodeMessages.reply_create(
NodeMessages.format_outgoing_message(
{
'researcher_id': msg['researcher_id'],
'node_id': environ['NODE_ID'],
Expand All @@ -110,7 +110,7 @@ def on_message(self, msg: dict, topic: str = None):
if len(databases) != 0:
databases = self.dataset_manager.obfuscate_private_information(databases)
# FIXME: what happens if len(database) == 0
self.messaging.send_message(NodeMessages.reply_create(
self.messaging.send_message(NodeMessages.format_outgoing_message(
{'success': True,
'command': 'search',
'node_id': environ['NODE_ID'],
Expand All @@ -121,7 +121,7 @@ def on_message(self, msg: dict, topic: str = None):
# Get list of all datasets
databases = self.dataset_manager.list_my_data(verbose=False)
databases = self.dataset_manager.obfuscate_private_information(databases)
self.messaging.send_message(NodeMessages.reply_create(
self.messaging.send_message(NodeMessages.format_outgoing_message(
{'success': True,
'command': 'list',
'node_id': environ['NODE_ID'],
Expand All @@ -139,28 +139,29 @@ def on_message(self, msg: dict, topic: str = None):
else:
raise NotImplementedError('Command not found')
except decoder.JSONDecodeError:
resid = 'researcher_id' in msg.keys(
) and msg['researcher_id'] or 'unknown_researcher_id'
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg="Not able to deserialize the message",
researcher_id=resid)
except NotImplementedError:
resid = 'researcher_id' in msg.keys(
) and msg['researcher_id'] or 'unknown_researcher_id'
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg=f"Command `{command}` is not implemented",
researcher_id=resid)
except KeyError:
# FIXME: this error could be raised for other missing keys (eg
# researcher_id, ....)
resid = 'researcher_id' in msg.keys(
) and msg['researcher_id'] or 'unknown_researcher_id'
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg="'command' property was not found",
researcher_id=resid)
except FedbiomedMessageError: # Message was not properly formatted
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg='Message was not properly formatted',
researcher_id=resid)
except TypeError: # Message was not serializable
resid = 'researcher_id' in msg.keys(
) and msg['researcher_id'] or 'unknown_researcher_id'
resid = msg.get('researcher_id', 'unknown_researcher_id')
self.send_error(ErrorNumbers.FB301,
extra_msg='Message was not serializable',
researcher_id=resid)
Expand Down Expand Up @@ -271,7 +272,7 @@ def parser_task_train(self, msg: TrainRequest) -> Union[Round, None]:
# condition above is likely to be false
logger.error('Did not found proper data in local datasets ' +
f'on node={environ["NODE_ID"]}')
self.messaging.send_message(NodeMessages.reply_create(
self.messaging.send_message(NodeMessages.format_outgoing_message(
{'command': "error",
'node_id': environ['NODE_ID'],
'researcher_id': researcher_id,
Expand Down Expand Up @@ -311,12 +312,12 @@ def task_manager(self):
logger.debug('[TASKS QUEUE] Item:' + str(item_print))
try:

item = NodeMessages.request_create(item)
item = NodeMessages.format_incoming_message(item)
command = item.get_param('command')
except Exception as e:
# send an error message back to network if something wrong occured
self.messaging.send_message(
NodeMessages.reply_create(
NodeMessages.format_outgoing_message(
{
'command': 'error',
'extra_msg': str(e),
Expand Down Expand Up @@ -348,7 +349,7 @@ def task_manager(self):
# send an error message back to network if something
# wrong occured
self.messaging.send_message(
NodeMessages.reply_create(
NodeMessages.format_outgoing_message(
{
'command': 'error',
'extra_msg': str(e),
Expand Down Expand Up @@ -385,7 +386,7 @@ def reply(self, msg: dict):
"""

try:
reply = NodeMessages.reply_create(
reply = NodeMessages.format_outgoing_message(
{'node_id': environ['ID'],
**msg}
).get_dict()
Expand Down
2 changes: 1 addition & 1 deletion fedbiomed/node/round.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ def _send_round_reply(
if not success:
logger.error(message)

return NodeMessages.reply_create({'node_id': environ['NODE_ID'],
return NodeMessages.format_outgoing_message({'node_id': environ['NODE_ID'],
'job_id': self.job_id,
'researcher_id': self.researcher_id,
'command': 'train',
Expand Down
4 changes: 2 additions & 2 deletions fedbiomed/node/training_plan_security_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ def reply_training_plan_approval_request(self, msg: dict, messaging: Messaging):
reply['success'] = False

# Send training plan approval acknowledge answer to researcher
messaging.send_message(NodeMessages.reply_create(reply).get_dict())
messaging.send_message(NodeMessages.format_outgoing_message(reply).get_dict())

def reply_training_plan_status_request(self, msg: dict, messaging: Messaging):
"""Returns requested training plan file status {approved, rejected, pending}
Expand Down Expand Up @@ -719,7 +719,7 @@ def reply_training_plan_status_request(self, msg: dict, messaging: Messaging):
f'file. {msg["training_plan_url"]} , {e}'}
# finally:
# # Send check training plan status answer to researcher
messaging.send_message(NodeMessages.reply_create(reply).get_dict())
messaging.send_message(NodeMessages.format_outgoing_message(reply).get_dict())

return

Expand Down
22 changes: 11 additions & 11 deletions fedbiomed/researcher/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,19 @@ def on_message(self, msg: Dict[str, Any], topic: str):
if topic == "general/logger":
#
# forward the treatment to node_log_handling() (same thread)
self.print_node_log_message(ResearcherMessages.reply_create(msg).get_dict())
self.print_node_log_message(ResearcherMessages.format_incoming_message(msg).get_dict())
elif topic == "general/researcher":
#
# *Reply messages (SearchReply, TrainReply) added to the TaskQueue
self.queue.add(ResearcherMessages.reply_create(msg).get_dict())
self.queue.add(ResearcherMessages.format_incoming_message(msg).get_dict())

# we may trap FedbiomedTaskQueueError here then queue full
# but what can we do except of quitting ?

elif topic == "general/monitoring":
if self._monitor_message_callback is not None:
# Pass message to Monitor's on message handler
self._monitor_message_callback(ResearcherMessages.reply_create(msg).get_dict())
self._monitor_message_callback(ResearcherMessages.format_incoming_message(msg).get_dict())
else:
logger.error("message received on wrong topic (" + topic + ") - IGNORING")

Expand Down Expand Up @@ -148,7 +148,7 @@ def send_message(self, msg: dict, client: str = None, add_sequence: bool = False
msg['sequence'] = sequence

self.messaging.send_message(
ResearcherMessages.request_create(msg).get_dict(),
ResearcherMessages.format_outgoing_message(msg).get_dict(),
client=client)
return sequence

Expand Down Expand Up @@ -260,18 +260,18 @@ def search(self, tags: tuple, nodes: list = None) -> dict:
logger.info(f'Searching dataset with data tags: {tags} on specified nodes: {nodes}')
for node in nodes:
self.messaging.send_message(
ResearcherMessages.request_create({'tags': tags,
ResearcherMessages.format_outgoing_message({'tags': tags,
'researcher_id': environ['RESEARCHER_ID'],
"command": "search"}
).get_dict(),
).get_dict(),
client=node)
else:
logger.info(f'Searching dataset with data tags: {tags} for all nodes')
self.messaging.send_message(
ResearcherMessages.request_create({'tags': tags,
ResearcherMessages.format_outgoing_message({'tags': tags,
'researcher_id': environ['RESEARCHER_ID'],
"command": "search"}
).get_dict())
).get_dict())

data_found = {}
for resp in self.get_responses(look_for_commands=['search']):
Expand Down Expand Up @@ -299,14 +299,14 @@ def list(self, nodes: list = None, verbose: bool = False) -> dict:
if nodes:
for node in nodes:
self.messaging.send_message(
ResearcherMessages.request_create({'researcher_id': environ['RESEARCHER_ID'],
ResearcherMessages.format_outgoing_message({'researcher_id': environ['RESEARCHER_ID'],
"command": "list"}
).get_dict(),
).get_dict(),
client=node)
logger.info(f'Listing datasets of given list of nodes : {nodes}')
else:
self.messaging.send_message(
ResearcherMessages.request_create({'researcher_id': environ['RESEARCHER_ID'],
ResearcherMessages.format_outgoing_message({'researcher_id': environ['RESEARCHER_ID'],
"command": "list"}).get_dict())
logger.info('Listing available datasets in all nodes... ')

Expand Down
2 changes: 1 addition & 1 deletion tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def setUp(self):
return_value={"file": environ['UPLOADS_URL']})
self.patcher3 = patch('fedbiomed.common.repository.Repository.download_file',
return_value=(True, environ['TMP_DIR']))
self.patcher4 = patch('fedbiomed.common.message.ResearcherMessages.request_create')
self.patcher4 = patch('fedbiomed.common.message.ResearcherMessages.format_outgoing_message')
self.patcher5 = patch('fedbiomed.researcher.job.atexit')


Expand Down
Loading

0 comments on commit 4d7355c

Please sign in to comment.