diff --git a/README.md b/README.md index fb4416b..f9fda56 100644 --- a/README.md +++ b/README.md @@ -56,6 +56,9 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
duration (int) +
Name:timerlat duration seconds
Description:Duration of the session in seconds
Required:No
Type:int
+
enable-time-series (bool) +
Name:enable time series
Description:Enable collection of latency time series data
Required:No
Default (JSON encoded):
false
Type:bool
entries (int) @@ -77,6 +80,10 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
period (int)
Name:histogram entries
Description:Set the number of entries of the histogram (default 256)
Required:No
Type:int
Minimum:10
Maximum:9999999
+
Name:timerlat period
Description:Timerlat period in μs
Required:No
Type:int
+
time-series-resolution (float) + +
Name:time series resolution
Description:Minimum time in seconds between time series entries
Required:No
Default (JSON encoded):
1.0
Type:float
user-threads (bool) @@ -100,6 +107,9 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
duration (int)
Name:use user threads
Description:Use rtla user-space threads instead of kernel-space timerlat threads
Required:No
Type:bool
+
Name:timerlat duration seconds
Description:Duration of the session in seconds
Required:No
Type:int
+
enable-time-series (bool) +
Name:enable time series
Description:Enable collection of latency time series data
Required:No
Default (JSON encoded):
false
Type:bool
entries (int) @@ -121,6 +131,10 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
period (int)
Name:histogram entries
Description:Set the number of entries of the histogram (default 256)
Required:No
Type:int
Minimum:10
Maximum:9999999
+
Name:timerlat period
Description:Timerlat period in μs
Required:No
Type:int
+
time-series-resolution (float) + +
Name:time series resolution
Description:Minimum time in seconds between time series entries
Required:No
Default (JSON encoded):
1.0
Type:float
user-threads (bool) @@ -181,6 +195,29 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
Name:use user threads
Description:Use rtla user-space threads instead of kernel-space timerlat threads
Required:No
Type:bool
+ +
latency_time_series (map[string,list[reference[LatencyTimeSeries]]]) + + +
Name:latency time series
Description:Time series of latencies for each CPU and context
Required:No
Type:map[string,list[reference[LatencyTimeSeries]]]
+
+ Key type + +
Type:string
+
+
+
+ Value type + +
Type:list[reference[LatencyTimeSeries]]
+
+ List items + +
Type:reference[LatencyTimeSeries]
Referenced object:LatencyTimeSeries
+
+
+
+
stats_per_col (list[map[string,any]])
Name:statistics per column
Description:Latency statistics per captured column
Required:No
Type:list[map[string,any]]
@@ -237,6 +274,17 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
+ +
LatencyTimeSeries (object) + +
Type:object
Properties
latency_ns (int) + + +
Name:latency in ns
Description:CPU latency value in nanoseconds
Required:No
Type:int
+
timestamp (string) + +
Name:timestamp
Description:CPU latency timestamp
Required:No
Type:string
+
TimerlatOutput (object)
Type:object
Properties
latency_hist (list[map[string,int]]) @@ -262,6 +310,29 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
+ +
latency_time_series (map[string,list[reference[LatencyTimeSeries]]]) + + +
Name:latency time series
Description:Time series of latencies for each CPU and context
Required:No
Type:map[string,list[reference[LatencyTimeSeries]]]
+
+ Key type + +
Type:string
+
+
+
+ Value type + +
Type:list[reference[LatencyTimeSeries]]
+
+ List items + +
Type:reference[LatencyTimeSeries]
Referenced object:LatencyTimeSeries
+
+
+
+
stats_per_col (list[map[string,any]])
Name:statistics per column
Description:Latency statistics per captured column
Required:No
Type:list[map[string,any]]
diff --git a/arcaflow_plugin_rtla/rtla_plugin.py b/arcaflow_plugin_rtla/rtla_plugin.py index 4119dba..a724148 100644 --- a/arcaflow_plugin_rtla/rtla_plugin.py +++ b/arcaflow_plugin_rtla/rtla_plugin.py @@ -4,30 +4,19 @@ import re import sys import typing +import time from threading import Event +from datetime import datetime from arcaflow_plugin_sdk import plugin, predefined_schemas from rtla_schema import ( TimerlatInputParams, latency_stats_schema, + latency_timeseries_schema, TimerlatOutput, ErrorOutput, ) -def run_oneshot_cmd(command_list: list[str]) -> tuple[str, subprocess.CompletedProcess]: - try: - cmd_out = subprocess.check_output( - command_list, - stderr=subprocess.STDOUT, - text=True, - ) - except subprocess.CalledProcessError as err: - return "error", ErrorOutput( - f"{err.cmd[0]} failed with return code {err.returncode}:\n{err.output}" - ) - return "completed", cmd_out - - class StartTimerlatStep: def __init__(self, exit, finished_early): self.exit = exit @@ -66,7 +55,7 @@ def run_timerlat( timerlat_cmd.extend(params.to_flags()) try: - proc = subprocess.Popen( + timerlat_proc = subprocess.Popen( timerlat_cmd, start_new_session=True, stdout=subprocess.PIPE, @@ -78,9 +67,52 @@ def run_timerlat( f"{err.cmd[0]} failed with return code {err.returncode}:\n{err.output}" ) + if params.enable_time_series: + timeseries_dict = {} + last_uptimestamp = {} + trace_path = "/sys/kernel/debug/tracing/instances/timerlat_hist/trace_pipe" + + # A delay is needed before reading from the trace_path to ensure the file + # exists and data is streaming to the fifo. I've tried to avoid a sleep(), + # but none of the methods I've tested have worked. + wait_time = 0 + timeout_seconds = 5 + sleep_time = 0.5 + trace_file = None + while wait_time < timeout_seconds: + try: + trace_file = open(trace_path, "r") + break + except FileNotFoundError: + time.sleep(sleep_time) + wait_time += sleep_time + continue + + if not trace_file: + print("Unable to read tracer output; Skipping time series collection\n") + else: + timeseries_file = open("./timeseries_file", "w") + try: + timeseries_proc = subprocess.Popen( + ["cat"], + start_new_session=True, + stdin=trace_file, + stdout=timeseries_file, + stderr=subprocess.PIPE, + text=True, + ) + except subprocess.CalledProcessError as err: + return "error", ErrorOutput( + f"""{err.cmd[0]} failed with return code {err.returncode}:\n + {err.output}""" + ) + finally: + timeseries_file.close() + trace_file.close() + try: # Block here, waiting on the cancel signal - print("Gathering data... Use Ctrl-C to stop.") + print("Gathering data... Use Ctrl-C to stop.\n") self.exit.wait(params.duration) # Secondary block interrupt is via the KeyboardInterrupt exception. @@ -92,9 +124,90 @@ def run_timerlat( # In either the case of a keyboard interrupt or a cancel signal, we need to # send the SIGINT to the subprocess. if self.finished_early: - proc.send_signal(2) + timerlat_proc.send_signal(2) + + # Example time series output from the tracer (truncated) + # -0 [009] d. 123.769498: #1002 context irq timer_latency 458 ns + # <...>-625890 [009] .. 123.769499: #1002 context thread timer_latency 666 ns + # <...>-625890 [009] .. 123.769500: #1002 context user-ret timer_latency 499 ns + # -0 [002] d. 123.769528: #1003 context irq timer_latency 587 ns + # <...>-625883 [002] .. 123.769532: #1003 context thread timer_latency 712 ns + # <...>-625883 [002] .. 123.769534: #1003 context user-ret timer_latency 462 ns + + if params.enable_time_series and trace_file: + # Interrupt the time series collection process and capture the output + timeseries_proc.send_signal(2) + + # The calculation of the offset from uptime to current time certainly means + # that our time series is not 100% accurately aligned to the nanosecond, but + # the relative times will be accurate. We'll accept this as good enough. + uptime_offset = time.time() - time.monotonic() + + with open("./timeseries_file") as timeseries_output: + + if all(False for _ in timeseries_output): + print( + "No results reading tracer output; " + "Skipping time series collection\n" + ) + + for line in timeseries_output: + # The first object in the line is a string, and it is possible for + # it to have spaces in it, so we split first on the expected "[" + # deliniator for the CPU number field. + # FIXME: I can't be 100% sure that a "[" wont' appear in the string + # that is the first object of the line. + stripped_line = line.split("[") + line_list = stripped_line[1].split() + + # Because the tracer format is dependent on the underlying OS and + # cannot be controlled by the container, check the tracer output + # format and break gracefully if we don't recognize it + try: + cpu = int(line_list[0][:-1]) + uptimestamp = float(line_list[2][:-1]) + timestamp = ( + datetime.fromtimestamp(uptimestamp + uptime_offset) + .astimezone() + .isoformat() + ) + context = str(line_list[5]) + latency = int(line_list[7]) + + except (IndexError, ValueError) as error: + print( + "Unknown tracer format; Skipping time series collection: " + f"{error}\n{line}" + ) + timeseries_dict = {} + break + + # The trace collects for all CPUs, so skip any CPU we did not select + # via the input to the plugin + if params.cpus and int(cpu) not in params.cpus: + continue + # Create a separate time series for each CPU + context combination + cpu_context = f"cpu{cpu}_{context}" + if cpu_context not in timeseries_dict: + timeseries_dict[cpu_context] = [] + if cpu_context not in last_uptimestamp: + last_uptimestamp[cpu_context] = 0 + if last_uptimestamp[cpu_context] and ( + uptimestamp - last_uptimestamp[cpu_context] + < params.time_series_resolution + ): + continue + last_uptimestamp[cpu_context] = uptimestamp + timeseries_dict[cpu_context].append( + latency_timeseries_schema.unserialize( + { + "timestamp": timestamp, + "latency_ns": latency, + } + ) + ) - output, _ = proc.communicate() + timerlat_output = timerlat_proc.stdout.read() # The output from the `rtla timerlat hist` command is columnar in three # sections, plus headers. The first section is histogram data, which will @@ -138,7 +251,7 @@ def run_timerlat( is_time_unit = re.compile(r"# Time unit is (\w+)") - output_lines = iter(output.splitlines()) + output_lines = iter(timerlat_output.splitlines()) # Phase 1: Get the headers for line in output_lines: @@ -181,7 +294,7 @@ def run_timerlat( total_usr_latency[label] = line_list[3] # Provide the rtla command formatted data as debug output - print(output) + print(timerlat_output) return "success", TimerlatOutput( time_unit, @@ -194,6 +307,7 @@ def run_timerlat( if params.user_threads else None ), + (timeseries_dict if params.enable_time_series else None), ) diff --git a/arcaflow_plugin_rtla/rtla_schema.py b/arcaflow_plugin_rtla/rtla_schema.py index 6841a7c..640207b 100644 --- a/arcaflow_plugin_rtla/rtla_schema.py +++ b/arcaflow_plugin_rtla/rtla_schema.py @@ -67,6 +67,18 @@ class TimerlatInputParams: "Use rtla user-space threads instead of kernel-space timerlat threads" ), ] = None + enable_time_series: typing.Annotated[ + typing.Optional[bool], + schema.id("enable-time-series"), + schema.name("enable time series"), + schema.description("Enable collection of latency time series data"), + ] = False + time_series_resolution: typing.Annotated[ + typing.Optional[float], + schema.id("time-series-resolution"), + schema.name("time series resolution"), + schema.description("Minimum time in seconds between time series entries"), + ] = 1.0 def to_flags(self) -> str: return params_to_flags( @@ -110,6 +122,23 @@ class LatencyStats: latency_stats_schema = plugin.build_object_schema(LatencyStats) +@dataclass +class LatencyTimeSeries: + timestamp: typing.Annotated[ + str, + schema.name("timestamp"), + schema.description("CPU latency timestamp"), + ] = None + latency_ns: typing.Annotated[ + int, + schema.name("latency in ns"), + schema.description("CPU latency value in nanoseconds"), + ] = None + + +latency_timeseries_schema = plugin.build_object_schema(LatencyTimeSeries) + + @dataclass class TimerlatOutput: time_unit: typing.Annotated[ @@ -142,6 +171,11 @@ class TimerlatOutput: schema.name("total usr latency"), schema.description("Total user latency"), ] = None + latency_time_series: typing.Annotated[ + typing.Optional[typing.Dict[str, typing.List[LatencyTimeSeries]]], + schema.name("latency time series"), + schema.description("Time series of latencies for each CPU and context"), + ] = None @dataclass diff --git a/configs/timerlat_example.yaml b/configs/timerlat_example.yaml index 44cac7f..81e3384 100644 --- a/configs/timerlat_example.yaml +++ b/configs/timerlat_example.yaml @@ -1,4 +1,5 @@ duration: 5 +enable-time-series: True user-threads: True cpus: - 1