diff --git a/arcaflow_plugin_rtla/rtla_plugin.py b/arcaflow_plugin_rtla/rtla_plugin.py
index 4119dba..a724148 100644
--- a/arcaflow_plugin_rtla/rtla_plugin.py
+++ b/arcaflow_plugin_rtla/rtla_plugin.py
@@ -4,30 +4,19 @@
import re
import sys
import typing
+import time
from threading import Event
+from datetime import datetime
from arcaflow_plugin_sdk import plugin, predefined_schemas
from rtla_schema import (
TimerlatInputParams,
latency_stats_schema,
+ latency_timeseries_schema,
TimerlatOutput,
ErrorOutput,
)
-def run_oneshot_cmd(command_list: list[str]) -> tuple[str, subprocess.CompletedProcess]:
- try:
- cmd_out = subprocess.check_output(
- command_list,
- stderr=subprocess.STDOUT,
- text=True,
- )
- except subprocess.CalledProcessError as err:
- return "error", ErrorOutput(
- f"{err.cmd[0]} failed with return code {err.returncode}:\n{err.output}"
- )
- return "completed", cmd_out
-
-
class StartTimerlatStep:
def __init__(self, exit, finished_early):
self.exit = exit
@@ -66,7 +55,7 @@ def run_timerlat(
timerlat_cmd.extend(params.to_flags())
try:
- proc = subprocess.Popen(
+ timerlat_proc = subprocess.Popen(
timerlat_cmd,
start_new_session=True,
stdout=subprocess.PIPE,
@@ -78,9 +67,52 @@ def run_timerlat(
f"{err.cmd[0]} failed with return code {err.returncode}:\n{err.output}"
)
+ if params.enable_time_series:
+ timeseries_dict = {}
+ last_uptimestamp = {}
+ trace_path = "/sys/kernel/debug/tracing/instances/timerlat_hist/trace_pipe"
+
+ # A delay is needed before reading from the trace_path to ensure the file
+ # exists and data is streaming to the fifo. I've tried to avoid a sleep(),
+ # but none of the methods I've tested have worked.
+ wait_time = 0
+ timeout_seconds = 5
+ sleep_time = 0.5
+ trace_file = None
+ while wait_time < timeout_seconds:
+ try:
+ trace_file = open(trace_path, "r")
+ break
+ except FileNotFoundError:
+ time.sleep(sleep_time)
+ wait_time += sleep_time
+ continue
+
+ if not trace_file:
+ print("Unable to read tracer output; Skipping time series collection\n")
+ else:
+ timeseries_file = open("./timeseries_file", "w")
+ try:
+ timeseries_proc = subprocess.Popen(
+ ["cat"],
+ start_new_session=True,
+ stdin=trace_file,
+ stdout=timeseries_file,
+ stderr=subprocess.PIPE,
+ text=True,
+ )
+ except subprocess.CalledProcessError as err:
+ return "error", ErrorOutput(
+ f"""{err.cmd[0]} failed with return code {err.returncode}:\n
+ {err.output}"""
+ )
+ finally:
+ timeseries_file.close()
+ trace_file.close()
+
try:
# Block here, waiting on the cancel signal
- print("Gathering data... Use Ctrl-C to stop.")
+ print("Gathering data... Use Ctrl-C to stop.\n")
self.exit.wait(params.duration)
# Secondary block interrupt is via the KeyboardInterrupt exception.
@@ -92,9 +124,90 @@ def run_timerlat(
# In either the case of a keyboard interrupt or a cancel signal, we need to
# send the SIGINT to the subprocess.
if self.finished_early:
- proc.send_signal(2)
+ timerlat_proc.send_signal(2)
+
+ # Example time series output from the tracer (truncated)
+ # -0 [009] d. 123.769498: #1002 context irq timer_latency 458 ns
+ # <...>-625890 [009] .. 123.769499: #1002 context thread timer_latency 666 ns
+ # <...>-625890 [009] .. 123.769500: #1002 context user-ret timer_latency 499 ns
+ # -0 [002] d. 123.769528: #1003 context irq timer_latency 587 ns
+ # <...>-625883 [002] .. 123.769532: #1003 context thread timer_latency 712 ns
+ # <...>-625883 [002] .. 123.769534: #1003 context user-ret timer_latency 462 ns
+
+ if params.enable_time_series and trace_file:
+ # Interrupt the time series collection process and capture the output
+ timeseries_proc.send_signal(2)
+
+ # The calculation of the offset from uptime to current time certainly means
+ # that our time series is not 100% accurately aligned to the nanosecond, but
+ # the relative times will be accurate. We'll accept this as good enough.
+ uptime_offset = time.time() - time.monotonic()
+
+ with open("./timeseries_file") as timeseries_output:
+
+ if all(False for _ in timeseries_output):
+ print(
+ "No results reading tracer output; "
+ "Skipping time series collection\n"
+ )
+
+ for line in timeseries_output:
+ # The first object in the line is a string, and it is possible for
+ # it to have spaces in it, so we split first on the expected "["
+ # deliniator for the CPU number field.
+ # FIXME: I can't be 100% sure that a "[" wont' appear in the string
+ # that is the first object of the line.
+ stripped_line = line.split("[")
+ line_list = stripped_line[1].split()
+
+ # Because the tracer format is dependent on the underlying OS and
+ # cannot be controlled by the container, check the tracer output
+ # format and break gracefully if we don't recognize it
+ try:
+ cpu = int(line_list[0][:-1])
+ uptimestamp = float(line_list[2][:-1])
+ timestamp = (
+ datetime.fromtimestamp(uptimestamp + uptime_offset)
+ .astimezone()
+ .isoformat()
+ )
+ context = str(line_list[5])
+ latency = int(line_list[7])
+
+ except (IndexError, ValueError) as error:
+ print(
+ "Unknown tracer format; Skipping time series collection: "
+ f"{error}\n{line}"
+ )
+ timeseries_dict = {}
+ break
+
+ # The trace collects for all CPUs, so skip any CPU we did not select
+ # via the input to the plugin
+ if params.cpus and int(cpu) not in params.cpus:
+ continue
+ # Create a separate time series for each CPU + context combination
+ cpu_context = f"cpu{cpu}_{context}"
+ if cpu_context not in timeseries_dict:
+ timeseries_dict[cpu_context] = []
+ if cpu_context not in last_uptimestamp:
+ last_uptimestamp[cpu_context] = 0
+ if last_uptimestamp[cpu_context] and (
+ uptimestamp - last_uptimestamp[cpu_context]
+ < params.time_series_resolution
+ ):
+ continue
+ last_uptimestamp[cpu_context] = uptimestamp
+ timeseries_dict[cpu_context].append(
+ latency_timeseries_schema.unserialize(
+ {
+ "timestamp": timestamp,
+ "latency_ns": latency,
+ }
+ )
+ )
- output, _ = proc.communicate()
+ timerlat_output = timerlat_proc.stdout.read()
# The output from the `rtla timerlat hist` command is columnar in three
# sections, plus headers. The first section is histogram data, which will
@@ -138,7 +251,7 @@ def run_timerlat(
is_time_unit = re.compile(r"# Time unit is (\w+)")
- output_lines = iter(output.splitlines())
+ output_lines = iter(timerlat_output.splitlines())
# Phase 1: Get the headers
for line in output_lines:
@@ -181,7 +294,7 @@ def run_timerlat(
total_usr_latency[label] = line_list[3]
# Provide the rtla command formatted data as debug output
- print(output)
+ print(timerlat_output)
return "success", TimerlatOutput(
time_unit,
@@ -194,6 +307,7 @@ def run_timerlat(
if params.user_threads
else None
),
+ (timeseries_dict if params.enable_time_series else None),
)
diff --git a/arcaflow_plugin_rtla/rtla_schema.py b/arcaflow_plugin_rtla/rtla_schema.py
index 6841a7c..640207b 100644
--- a/arcaflow_plugin_rtla/rtla_schema.py
+++ b/arcaflow_plugin_rtla/rtla_schema.py
@@ -67,6 +67,18 @@ class TimerlatInputParams:
"Use rtla user-space threads instead of kernel-space timerlat threads"
),
] = None
+ enable_time_series: typing.Annotated[
+ typing.Optional[bool],
+ schema.id("enable-time-series"),
+ schema.name("enable time series"),
+ schema.description("Enable collection of latency time series data"),
+ ] = False
+ time_series_resolution: typing.Annotated[
+ typing.Optional[float],
+ schema.id("time-series-resolution"),
+ schema.name("time series resolution"),
+ schema.description("Minimum time in seconds between time series entries"),
+ ] = 1.0
def to_flags(self) -> str:
return params_to_flags(
@@ -110,6 +122,23 @@ class LatencyStats:
latency_stats_schema = plugin.build_object_schema(LatencyStats)
+@dataclass
+class LatencyTimeSeries:
+ timestamp: typing.Annotated[
+ str,
+ schema.name("timestamp"),
+ schema.description("CPU latency timestamp"),
+ ] = None
+ latency_ns: typing.Annotated[
+ int,
+ schema.name("latency in ns"),
+ schema.description("CPU latency value in nanoseconds"),
+ ] = None
+
+
+latency_timeseries_schema = plugin.build_object_schema(LatencyTimeSeries)
+
+
@dataclass
class TimerlatOutput:
time_unit: typing.Annotated[
@@ -142,6 +171,11 @@ class TimerlatOutput:
schema.name("total usr latency"),
schema.description("Total user latency"),
] = None
+ latency_time_series: typing.Annotated[
+ typing.Optional[typing.Dict[str, typing.List[LatencyTimeSeries]]],
+ schema.name("latency time series"),
+ schema.description("Time series of latencies for each CPU and context"),
+ ] = None
@dataclass
diff --git a/configs/timerlat_example.yaml b/configs/timerlat_example.yaml
index 44cac7f..81e3384 100644
--- a/configs/timerlat_example.yaml
+++ b/configs/timerlat_example.yaml
@@ -1,4 +1,5 @@
duration: 5
+enable-time-series: True
user-threads: True
cpus:
- 1
|