Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add latency timeseries output #8

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
</details><details><summary>duration (<code>int</code>)</summary>
<table><tbody><tr><th>Name:</th><td>timerlat duration seconds</td></tr><tr><th>Description:</th><td width="500">Duration of the session in seconds</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>int</code></td>
</tr>
</tbody></table>
</details><details><summary>enable-time-series (<code>bool</code>)</summary>
<table><tbody><tr><th>Name:</th><td>enable time series</td></tr><tr><th>Description:</th><td width="500">Enable collection of latency time series data</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Default (JSON encoded):</th><td><pre><code>false</code></pre></td></tr><tr><th>Type:</th><td><code>bool</code></td></tr>
</tbody></table>
</details><details><summary>entries (<code>int</code>)</summary>
<table><tbody><tr><th>Name:</th><td>histogram entries</td></tr><tr><th>Description:</th><td width="500">Set the number of entries of the histogram (default 256)</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>int</code></td><tr><th>Minimum:</th><td>10</td></tr><tr><th>Maximum:</th><td>9999999</td></tr>
Expand All @@ -77,6 +80,10 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
</details><details><summary>period (<code>int</code>)</summary>
<table><tbody><tr><th>Name:</th><td>timerlat period</td></tr><tr><th>Description:</th><td width="500">Timerlat period in μs</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>int</code></td>
</tr>
</tbody></table>
</details><details><summary>time-series-resolution (<code>float</code>)</summary>
<table><tbody><tr><th>Name:</th><td>time series resolution</td></tr><tr><th>Description:</th><td width="500">Minimum time in seconds between time series entries</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Default (JSON encoded):</th><td><pre><code>1.0</code></pre></td></tr><tr><th>Type:</th><td><code>float</code></td>
</tr>
</tbody></table>
</details><details><summary>user-threads (<code>bool</code>)</summary>
<table><tbody><tr><th>Name:</th><td>use user threads</td></tr><tr><th>Description:</th><td width="500">Use rtla user-space threads instead of kernel-space timerlat threads</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>bool</code></td></tr>
Expand All @@ -100,6 +107,9 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
</details><details><summary>duration (<code>int</code>)</summary>
<table><tbody><tr><th>Name:</th><td>timerlat duration seconds</td></tr><tr><th>Description:</th><td width="500">Duration of the session in seconds</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>int</code></td>
</tr>
</tbody></table>
</details><details><summary>enable-time-series (<code>bool</code>)</summary>
<table><tbody><tr><th>Name:</th><td>enable time series</td></tr><tr><th>Description:</th><td width="500">Enable collection of latency time series data</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Default (JSON encoded):</th><td><pre><code>false</code></pre></td></tr><tr><th>Type:</th><td><code>bool</code></td></tr>
</tbody></table>
</details><details><summary>entries (<code>int</code>)</summary>
<table><tbody><tr><th>Name:</th><td>histogram entries</td></tr><tr><th>Description:</th><td width="500">Set the number of entries of the histogram (default 256)</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>int</code></td><tr><th>Minimum:</th><td>10</td></tr><tr><th>Maximum:</th><td>9999999</td></tr>
Expand All @@ -121,6 +131,10 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
</details><details><summary>period (<code>int</code>)</summary>
<table><tbody><tr><th>Name:</th><td>timerlat period</td></tr><tr><th>Description:</th><td width="500">Timerlat period in μs</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>int</code></td>
</tr>
</tbody></table>
</details><details><summary>time-series-resolution (<code>float</code>)</summary>
<table><tbody><tr><th>Name:</th><td>time series resolution</td></tr><tr><th>Description:</th><td width="500">Minimum time in seconds between time series entries</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Default (JSON encoded):</th><td><pre><code>1.0</code></pre></td></tr><tr><th>Type:</th><td><code>float</code></td>
</tr>
</tbody></table>
</details><details><summary>user-threads (<code>bool</code>)</summary>
<table><tbody><tr><th>Name:</th><td>use user threads</td></tr><tr><th>Description:</th><td width="500">Use rtla user-space threads instead of kernel-space timerlat threads</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>bool</code></td></tr>
Expand Down Expand Up @@ -181,6 +195,29 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
</tbody></table>
</details>
</td></tr></tr>
</tbody></table>
</details><details><summary>latency_time_series (<code>map[<code>string</code>,<code>list[<code>reference[LatencyTimeSeries]</code>]</code>]</code>)</summary>
<table><tbody><tr><th>Name:</th><td>latency time series</td></tr><tr><th>Description:</th><td width="500">Time series of latencies for each CPU and context</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>map[<code>string</code>,<code>list[<code>reference[LatencyTimeSeries]</code>]</code>]</code></td><tr><td colspan="2">
<details>
<summary>Key type</summary>
<table><tbody><tr><th>Type:</th><td><code>string</code></td></tr>
</tbody></table>
</details>
</td></tr>
<tr><td colspan="2">
<details>
<summary>Value type</summary>
<table><tbody><tr><th>Type:</th><td><code>list[<code>reference[LatencyTimeSeries]</code>]</code></td><tr><td colspan="2">
<details>
<summary>List items</summary>
<table><tbody><tr><th>Type:</th><td><code>reference[LatencyTimeSeries]</code></td><tr><th>Referenced object:</th><td>LatencyTimeSeries</td></tr></tr>
</tbody></table>
</details>
</td></tr></tr>
</tbody></table>
</details>
</td></tr>
</tr>
</tbody></table>
</details><details><summary>stats_per_col (<code>list[<code>map[<code>string</code>,<code>any</code>]</code>]</code>)</summary>
<table><tbody><tr><th>Name:</th><td>statistics per column</td></tr><tr><th>Description:</th><td width="500">Latency statistics per captured column</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>list[<code>map[<code>string</code>,<code>any</code>]</code>]</code></td><tr><td colspan="2">
Expand Down Expand Up @@ -237,6 +274,17 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
</tbody></table>
</details></td></tr>
</tr>
</tbody></table>
</details><details><summary>LatencyTimeSeries (<code>object</code>)</summary>
<table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>latency_ns (<code>int</code>)</summary>
<table><tbody><tr><th>Name:</th><td>latency in ns</td></tr><tr><th>Description:</th><td width="500">CPU latency value in nanoseconds</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>int</code></td>
</tr>
</tbody></table>
</details><details><summary>timestamp (<code>string</code>)</summary>
<table><tbody><tr><th>Name:</th><td>timestamp</td></tr><tr><th>Description:</th><td width="500">CPU latency timestamp</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>string</code></td></tr>
</tbody></table>
</details></td></tr>
</tr>
</tbody></table>
</details><details><summary>TimerlatOutput (<code>object</code>)</summary>
<table><tbody><tr><th>Type:</th><td><code>object</code></td><tr><th>Properties</th><td><details><summary>latency_hist (<code>list[<code>map[<code>string</code>,<code>int</code>]</code>]</code>)</summary>
Expand All @@ -262,6 +310,29 @@ Runs the RTLA Timerlat data collection and then processes the results into a mac
</tbody></table>
</details>
</td></tr></tr>
</tbody></table>
</details><details><summary>latency_time_series (<code>map[<code>string</code>,<code>list[<code>reference[LatencyTimeSeries]</code>]</code>]</code>)</summary>
<table><tbody><tr><th>Name:</th><td>latency time series</td></tr><tr><th>Description:</th><td width="500">Time series of latencies for each CPU and context</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>map[<code>string</code>,<code>list[<code>reference[LatencyTimeSeries]</code>]</code>]</code></td><tr><td colspan="2">
<details>
<summary>Key type</summary>
<table><tbody><tr><th>Type:</th><td><code>string</code></td></tr>
</tbody></table>
</details>
</td></tr>
<tr><td colspan="2">
<details>
<summary>Value type</summary>
<table><tbody><tr><th>Type:</th><td><code>list[<code>reference[LatencyTimeSeries]</code>]</code></td><tr><td colspan="2">
<details>
<summary>List items</summary>
<table><tbody><tr><th>Type:</th><td><code>reference[LatencyTimeSeries]</code></td><tr><th>Referenced object:</th><td>LatencyTimeSeries</td></tr></tr>
</tbody></table>
</details>
</td></tr></tr>
</tbody></table>
</details>
</td></tr>
</tr>
</tbody></table>
</details><details><summary>stats_per_col (<code>list[<code>map[<code>string</code>,<code>any</code>]</code>]</code>)</summary>
<table><tbody><tr><th>Name:</th><td>statistics per column</td></tr><tr><th>Description:</th><td width="500">Latency statistics per captured column</td></tr><tr><th>Required:</th><td>No</td></tr><tr><th>Type:</th><td><code>list[<code>map[<code>string</code>,<code>any</code>]</code>]</code></td><tr><td colspan="2">
Expand Down
154 changes: 134 additions & 20 deletions arcaflow_plugin_rtla/rtla_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,19 @@
import re
import sys
import typing
import time
from threading import Event
from datetime import datetime
from arcaflow_plugin_sdk import plugin, predefined_schemas
from rtla_schema import (
TimerlatInputParams,
latency_stats_schema,
latency_timeseries_schema,
TimerlatOutput,
ErrorOutput,
)


def run_oneshot_cmd(command_list: list[str]) -> tuple[str, subprocess.CompletedProcess]:
try:
cmd_out = subprocess.check_output(
command_list,
stderr=subprocess.STDOUT,
text=True,
)
except subprocess.CalledProcessError as err:
return "error", ErrorOutput(
f"{err.cmd[0]} failed with return code {err.returncode}:\n{err.output}"
)
return "completed", cmd_out


dustinblack marked this conversation as resolved.
Show resolved Hide resolved
class StartTimerlatStep:
def __init__(self, exit, finished_early):
self.exit = exit
Expand Down Expand Up @@ -66,7 +55,7 @@ def run_timerlat(
timerlat_cmd.extend(params.to_flags())

try:
proc = subprocess.Popen(
timerlat_proc = subprocess.Popen(
timerlat_cmd,
start_new_session=True,
stdout=subprocess.PIPE,
Expand All @@ -78,9 +67,52 @@ def run_timerlat(
f"{err.cmd[0]} failed with return code {err.returncode}:\n{err.output}"
)

if params.enable_time_series:
timeseries_dict = {}
last_uptimestamp = {}
trace_path = "/sys/kernel/debug/tracing/instances/timerlat_hist/trace_pipe"

# A delay is needed before reading from the trace_path to ensure the file
# exists and data is streaming to the fifo. I've tried to avoid a sleep(),
# but none of the methods I've tested have worked.
wait_time = 0
timeout_seconds = 5
sleep_time = 0.5
trace_file = None
while wait_time < timeout_seconds:
try:
trace_file = open(trace_path, "r")
break
except FileNotFoundError:
time.sleep(sleep_time)
wait_time += sleep_time
continue

if not trace_file:
print("Unable to read tracer output; Skipping time series collection\n")
else:
timeseries_file = open("./timeseries_file", "w")
try:
timeseries_proc = subprocess.Popen(
["cat"],
start_new_session=True,
stdin=trace_file,
stdout=timeseries_file,
stderr=subprocess.PIPE,
text=True,
)
except subprocess.CalledProcessError as err:
return "error", ErrorOutput(
f"""{err.cmd[0]} failed with return code {err.returncode}:\n
{err.output}"""
)
finally:
timeseries_file.close()
trace_file.close()

try:
# Block here, waiting on the cancel signal
print("Gathering data... Use Ctrl-C to stop.")
print("Gathering data... Use Ctrl-C to stop.\n")
dbutenhof marked this conversation as resolved.
Show resolved Hide resolved
self.exit.wait(params.duration)

# Secondary block interrupt is via the KeyboardInterrupt exception.
Expand All @@ -92,9 +124,90 @@ def run_timerlat(
# In either the case of a keyboard interrupt or a cancel signal, we need to
# send the SIGINT to the subprocess.
if self.finished_early:
proc.send_signal(2)
timerlat_proc.send_signal(2)

# Example time series output from the tracer (truncated)
# <idle>-0 [009] d. 123.769498: #1002 context irq timer_latency 458 ns
# <...>-625890 [009] .. 123.769499: #1002 context thread timer_latency 666 ns
# <...>-625890 [009] .. 123.769500: #1002 context user-ret timer_latency 499 ns
# <idle>-0 [002] d. 123.769528: #1003 context irq timer_latency 587 ns
# <...>-625883 [002] .. 123.769532: #1003 context thread timer_latency 712 ns
# <...>-625883 [002] .. 123.769534: #1003 context user-ret timer_latency 462 ns

if params.enable_time_series and trace_file:
# Interrupt the time series collection process and capture the output
timeseries_proc.send_signal(2)

# The calculation of the offset from uptime to current time certainly means
# that our time series is not 100% accurately aligned to the nanosecond, but
# the relative times will be accurate. We'll accept this as good enough.
uptime_offset = time.time() - time.monotonic()

with open("./timeseries_file") as timeseries_output:

if all(False for _ in timeseries_output):
print(
"No results reading tracer output; "
"Skipping time series collection\n"
)
Comment on lines +148 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As Dave mentioned earlier, this check is going to eat the first line of the timeseries output. You should put a comment here noting that, if it's actually intentional.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not following how you are seeing that the first line gets eaten.

>>> i = ["a", "b", "c", "d"]
>>> if all(False for _ in i):
...   print("bad juju")
... 
>>> for line in i:
...   print(line)
... 
a
b
c
d
>>> i = []
>>> if all(False for _ in i):
...   print("bad juju")
... 
bad juju
>>> for line in i:
...   print(line)
... 
>>> 

Copy link
Contributor

@webbnh webbnh Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try your experiment with i set to an iterator or a generator. With it set to a list, of course you see the first item when you check after the all() call, because the list state is unchanged. 😁

(Just to be blisteringly clear, reading a file line-by-line in a loop uses an iterator/generator.)


for line in timeseries_output:
# The first object in the line is a string, and it is possible for
# it to have spaces in it, so we split first on the expected "["
# deliniator for the CPU number field.
# FIXME: I can't be 100% sure that a "[" wont' appear in the string
Comment on lines +157 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"deliniator" ==> "delineator"
"wont'" ==> "won't"

# that is the first object of the line.
stripped_line = line.split("[")
line_list = stripped_line[1].split()

# Because the tracer format is dependent on the underlying OS and
# cannot be controlled by the container, check the tracer output
# format and break gracefully if we don't recognize it
try:
cpu = int(line_list[0][:-1])
uptimestamp = float(line_list[2][:-1])
timestamp = (
datetime.fromtimestamp(uptimestamp + uptime_offset)
.astimezone()
.isoformat()
)
context = str(line_list[5])
latency = int(line_list[7])

except (IndexError, ValueError) as error:
print(
"Unknown tracer format; Skipping time series collection: "
f"{error}\n{line}"
)
timeseries_dict = {}
break

# The trace collects for all CPUs, so skip any CPU we did not select
# via the input to the plugin
if params.cpus and int(cpu) not in params.cpus:
continue
# Create a separate time series for each CPU + context combination
cpu_context = f"cpu{cpu}_{context}"
if cpu_context not in timeseries_dict:
timeseries_dict[cpu_context] = []
if cpu_context not in last_uptimestamp:
last_uptimestamp[cpu_context] = 0
if last_uptimestamp[cpu_context] and (
uptimestamp - last_uptimestamp[cpu_context]
< params.time_series_resolution
):
continue
last_uptimestamp[cpu_context] = uptimestamp
timeseries_dict[cpu_context].append(
latency_timeseries_schema.unserialize(
{
"timestamp": timestamp,
"latency_ns": latency,
}
)
)
webbnh marked this conversation as resolved.
Show resolved Hide resolved

output, _ = proc.communicate()
timerlat_output = timerlat_proc.stdout.read()

# The output from the `rtla timerlat hist` command is columnar in three
# sections, plus headers. The first section is histogram data, which will
Expand Down Expand Up @@ -138,7 +251,7 @@ def run_timerlat(

is_time_unit = re.compile(r"# Time unit is (\w+)")

output_lines = iter(output.splitlines())
output_lines = iter(timerlat_output.splitlines())

# Phase 1: Get the headers
for line in output_lines:
Expand Down Expand Up @@ -181,7 +294,7 @@ def run_timerlat(
total_usr_latency[label] = line_list[3]

# Provide the rtla command formatted data as debug output
print(output)
print(timerlat_output)

return "success", TimerlatOutput(
time_unit,
Expand All @@ -194,6 +307,7 @@ def run_timerlat(
if params.user_threads
else None
),
(timeseries_dict if params.enable_time_series else None),
)


Expand Down
Loading