Skip to content

Commit

Permalink
Refactor system event logger and monitor, add new metrics (Netflix#2065)
Browse files Browse the repository at this point in the history
* Emit graph info to event logger, add runtime start metric

* Remove graph_info logging from resume

* Simplify run and resume metrics

* Remove logger update context

* Refactor monitor and logger to not use update_context

* Address comments
  • Loading branch information
talsperre authored Oct 31, 2024
1 parent a37555b commit f222585
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 50 deletions.
27 changes: 27 additions & 0 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,20 @@ def resume(
if runtime.should_skip_clone_only_execution():
return

current._update_env(
{
"run_id": runtime.run_id,
}
)
_system_logger.log_event(
level="info",
module="metaflow.resume",
name="start",
payload={
"msg": "Resuming run",
},
)

with runtime.run_heartbeat():
if clone_only:
runtime.clone_original_run()
Expand Down Expand Up @@ -775,6 +789,19 @@ def run(
write_file(run_id_file, runtime.run_id)

obj.flow._set_constants(obj.graph, kwargs)
current._update_env(
{
"run_id": runtime.run_id,
}
)
_system_logger.log_event(
level="info",
module="metaflow.run",
name="start",
payload={
"msg": "Starting run",
},
)
runtime.print_workflow_info()
runtime.persist_constants()

Expand Down
20 changes: 1 addition & 19 deletions metaflow/system/system_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,11 @@ class SystemLogger(object):
def __init__(self):
self._logger = None
self._flow_name = None
self._context = {}
self._is_context_updated = False

def __del__(self):
if self._flow_name == "not_a_real_flow":
self.logger.terminate()

def update_context(self, context: Dict[str, Any]):
"""
Update the global context maintained by the system logger.
Parameters
----------
context : Dict[str, Any]
A dictionary containing the context to update.
"""
self._is_context_updated = True
self._context.update(context)

def init_system_logger(
self, flow_name: str, logger: "metaflow.event_logger.NullEventLogger"
):
Expand Down Expand Up @@ -71,7 +56,7 @@ def _debug(msg: str):
"false",
"",
):
print("system monitor: %s" % msg, file=sys.stderr)
print("system logger: %s" % msg, file=sys.stderr)

def log_event(
self, level: str, module: str, name: str, payload: Optional[Any] = None
Expand All @@ -96,8 +81,5 @@ def log_event(
"module": module,
"name": name,
"payload": payload if payload is not None else {},
"context": self._context,
"is_context_updated": self._is_context_updated,
}
)
self._is_context_updated = False
24 changes: 0 additions & 24 deletions metaflow/system/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,11 @@ class SystemMonitor(object):
def __init__(self):
self._monitor = None
self._flow_name = None
self._context = {}

def __del__(self):
if self._flow_name == "not_a_real_flow":
self.monitor.terminate()

def update_context(self, context: Dict[str, Any]):
"""
Update the global context maintained by the system monitor.
Parameters
----------
context : Dict[str, Any]
A dictionary containing the context to update.
"""
from metaflow.sidecar import Message, MessageTypes

self._context.update(context)
self.monitor.send(
Message(
MessageTypes.MUST_SEND,
{
"is_context_updated": True,
**self._context,
},
)
)

def init_system_monitor(
self, flow_name: str, monitor: "metaflow.monitor.NullMonitor"
):
Expand Down
11 changes: 4 additions & 7 deletions metaflow/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,6 @@ def clone_only(
"origin_run_id": origin_run_id,
"origin_task_id": origin_task_id,
}
_system_logger.update_context(task_payload)
_system_monitor.update_context(task_payload)

msg = "Cloning task from {}/{}/{}/{} to {}/{}/{}/{}".format(
self.flow.name,
Expand Down Expand Up @@ -545,9 +543,6 @@ def run_step(
"project_flow_name": current.get("project_flow_name"),
"trace_id": trace_id or None,
}

_system_logger.update_context(task_payload)
_system_monitor.update_context(task_payload)
start = time.time()
self.metadata.start_task_heartbeat(self.flow.name, run_id, step_name, task_id)
with self.monitor.measure("metaflow.task.duration"):
Expand Down Expand Up @@ -592,7 +587,8 @@ def run_step(
{
"parameter_names": self._init_parameters(
inputs[0], passdown=True
)
),
"graph_info": self.flow._graph_info,
}
)
else:
Expand All @@ -616,7 +612,8 @@ def run_step(
{
"parameter_names": self._init_parameters(
inputs[0], passdown=False
)
),
"graph_info": self.flow._graph_info,
}
)

Expand Down

0 comments on commit f222585

Please sign in to comment.