Skip to content

Commit

Permalink
Adding params to the schema and moving to an implicit parameter input…
Browse files Browse the repository at this point in the history
… model in the plugin (#12)

* adding params

* formatting

* process inputs implicitly instead of explicitly in the plugin

* formatting

* comment cleanup

* refactor and add support for validation checks (#13)

* refactor and add support for validation checks

* change inheritance order for optional output variables

* Fix typo

* formatting

Co-authored-by: Harshith u <[email protected]>
  • Loading branch information
dustinblack and Harshith-umesh authored Dec 12, 2022
1 parent 18254ce commit 572c899
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 50 deletions.
28 changes: 13 additions & 15 deletions arcaflow_plugin_sysbench/sysbench_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@
WorkloadResultsCpu,
WorkloadResultsMemory,
WorkloadError,
sysbench_cpu_input_schema,
sysbench_cpu_output_schema,
sysbench_cpu_results_schema,
sysbench_memory_input_schema,
sysbench_memory_output_schema,
sysbench_memory_results_schema,
)
Expand Down Expand Up @@ -80,12 +82,7 @@ def parse_output(output):

def run_sysbench(params, flags, operation):
try:
cmd = [
"sysbench",
"--threads=" + str(params.threads),
"--events=" + str(params.events),
"--time=" + str(params.time),
]
cmd = ["sysbench"]
cmd = cmd + flags + [operation, "run"]
process_out = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as error:
Expand Down Expand Up @@ -120,9 +117,11 @@ def RunSysbenchCpu(

print("==>> Running sysbench CPU workload ...")

cpu_flags = [
"--cpu-max-prime=" + str(params.cpu_max_prime),
]
serialized_params = sysbench_cpu_input_schema.serialize(params)

cpu_flags = []
for param, value in serialized_params.items():
cpu_flags.append(f"--{param}={value}")

try:
output, results = run_sysbench(params, cpu_flags, "cpu")
Expand Down Expand Up @@ -151,12 +150,11 @@ def RunSysbenchMemory(

print("==>> Running sysbench Memory workload ...")

memory_flags = [
"--memory-block-size=" + str(params.memory_block_size),
"--memory-total-size=" + str(params.memory_total_size),
"--memory-scope=" + str(params.memory_scope),
"--memory-oper=" + str(params.memory_oper),
]
serialized_params = sysbench_memory_input_schema.serialize(params)

memory_flags = []
for param, value in serialized_params.items():
memory_flags.append(f"--{param}={value}")

try:
output, results = run_sysbench(params, memory_flags, "memory")
Expand Down
237 changes: 207 additions & 30 deletions arcaflow_plugin_sysbench/sysbench_schema.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,35 @@
import enum
import typing
from dataclasses import dataclass
from arcaflow_plugin_sdk import plugin, schema
from arcaflow_plugin_sdk import plugin, schema, validation


class OnOff(enum.Enum):
ON = "on"
OFF = "off"


class RandType(enum.Enum):
UNIFORM = "uniform"
GAUSSIAN = "gaussian"
SPECIAL = "special"
PARETO = "pareto"


class SeqRnd(enum.Enum):
SEQ = "seq"
RND = "rnd"


class GlobalLocal(enum.Enum):
GLOBAL = "global"
LOCAL = "local"


class RWN(enum.Enum):
READ = "read"
WRITE = "write"
NONE = "none"


@dataclass
Expand All @@ -20,6 +49,116 @@ class CommonInputParameters:
schema.name("Time"),
schema.description("Limit for total execution time in seconds"),
] = 10
forced_shutdown: typing.Annotated[
typing.Optional[int],
schema.id("forced-shutdown"),
schema.name("Forced Shutdown Seconds"),
schema.description(
"Number of seconds to wait after the 'time' limit before forcing"
" shutdown, or exclude parameter to disable forced shutdown"
),
] = None
thread_stack_size: typing.Annotated[
typing.Optional[str],
schema.id("thread-stack-size"),
schema.name("Thread stack size"),
schema.description("size of stack per thread"),
] = "64K"
rate: typing.Annotated[
typing.Optional[int],
schema.name("Transaction rate"),
schema.description("average transactions rate. 0 for unlimited rate"),
] = 0
validate: typing.Annotated[
typing.Optional[OnOff],
schema.name("Validate"),
schema.description("perform validation checks where possible"),
] = OnOff.OFF
rand_type: typing.Annotated[
typing.Optional[RandType],
schema.id("rand-type"),
schema.name("Random Number Type"),
schema.description("Random numbers distribution"),
] = RandType.SPECIAL
rand_spec_iter: typing.Annotated[
typing.Optional[int],
schema.id("rand-spec-iter"),
schema.name("Rand spec iterations"),
schema.description("Number of iterations used for numbers generation"),
] = 12
rand_spec_pct: typing.Annotated[
typing.Optional[int],
schema.id("rand-spec-pct"),
schema.name("Rand spec percentage"),
schema.description(
"Percentage of values to be treated as 'special' (for special"
" distribution)"
),
] = 1
rand_spec_res: typing.Annotated[
typing.Optional[int],
schema.id("rand-spec-res"),
schema.name("Rand spec res"),
schema.description(
"Percentage of 'special' values to use (for special distribution)"
),
] = 75
rand_seed: typing.Annotated[
typing.Optional[int],
schema.id("rand-seed"),
schema.name("Rand seed"),
schema.description(
"seed for random number generator. When 0, the current time is"
" used as a RNG seed."
),
] = 0
rand_pareto_h: typing.Annotated[
typing.Optional[float],
schema.id("rand-pareto-h"),
schema.name("Rand pareto h"),
schema.description("parameter h for pareto distribution"),
] = 0.2
percentile: typing.Annotated[
typing.Optional[int],
validation.min(0),
validation.max(100),
schema.name("Percentile"),
schema.description(
"percentile to calculate in latency statistics (1-100)."
" Use the special value of 0 to disable percentile calculations"
),
] = 95


# Other common parameters to consider...

# Implementing report-interval would add periodic output to stdout and a new
# data format we would need to account for in the parse_output section
# of the plugin
# --report-interval=N periodically report intermediate
# statistics with a specified interval in seconds. 0 disables intermediate
# reports [0]

# Implementing report-checkpoints would dump to stdout a full run output at
# the checkpoint times, requiring additional processing in the parse_output
# section of the plugin
# --report-checkpoints=[LIST,...] dump full statistics and reset all
# counters at specified points in time. The argument is a list of
# comma-separated values representing the amount of time in seconds elapsed
# from start of test when report checkpoint(s) must be performed. Report
# checkpoints are off by default. []

# Implementing debug would add more verbose output ot stdout that we would
# need to adjust parse_output to process.
# --debug[=on|off] print more debugging info [off]

# Implementing verbosity changes the output and would require adjustments to
# the parse_output process.
# --verbosity=N verbosity level {5 - debug, 0 - only critical messages} [3]

# Implementing histogram significantly adds to the output, but this might be
# valuable information to capture, even by default.
# --histogram[=on|off] print latency histogram in report [off]


@dataclass
Expand All @@ -31,6 +170,7 @@ class SysbenchCpuInputParams(CommonInputParameters):

cpu_max_prime: typing.Annotated[
typing.Optional[int],
schema.id("cpu-max-prime"),
schema.name("CPU max prime"),
schema.description(
"The upper limit of the number of prime numbers generated"
Expand All @@ -47,24 +187,40 @@ class SysbenchMemoryInputParams(CommonInputParameters):

memory_block_size: typing.Annotated[
typing.Optional[str],
schema.id("memory-block-size"),
schema.name("Block Size"),
schema.description("size of memory block for test in KiB/MiB/GiB"),
] = "1KiB"
memory_total_size: typing.Annotated[
typing.Optional[str],
schema.id("memory-total-size"),
schema.name("Total Size"),
schema.description("Total size of data to transfer in GiB"),
] = "100G"
memory_scope: typing.Annotated[
typing.Optional[str],
typing.Optional[GlobalLocal],
schema.id("memory-scope"),
schema.name("Memory Scope"),
schema.description("Memory Access Scope(global/local)"),
] = "global"
] = GlobalLocal.GLOBAL
memory_oper: typing.Annotated[
typing.Optional[str],
typing.Optional[RWN],
schema.id("memory-oper"),
schema.name("Memory Operation"),
schema.description("Type of memory operation(write/read)"),
] = "write"
] = RWN.WRITE
memory_hugetlb: typing.Annotated[
typing.Optional[OnOff],
schema.id("memory-hugetlb"),
schema.name("Memory hugetlb"),
schema.description("Allocate memory from HugeTLB pool (on/off)"),
] = OnOff.OFF
memory_access_mode: typing.Annotated[
typing.Optional[SeqRnd],
schema.id("memory-access-mode"),
schema.name("Memory Access Mode"),
schema.description("memory access mode (seq,rnd)"),
] = SeqRnd.SEQ


@dataclass
Expand Down Expand Up @@ -134,10 +290,10 @@ class CPUmetrics:


@dataclass
class SysbenchMemoryOutputParams:
class SysbenchCommonOutputParams:
"""
This is the data structure for output
parameters returned by sysbench memory benchmark.
This is the data structure for common output
parameters returned by sysbench benchmarks.
"""

totaltime: typing.Annotated[
Expand All @@ -150,6 +306,25 @@ class SysbenchMemoryOutputParams:
schema.name("Total number of events"),
schema.description("Total number of events performed by the workload"),
]
Numberofthreads: typing.Annotated[
float,
schema.name("Number of threads"),
schema.description("Number of threads used by the workload"),
]
Validationchecks: typing.Annotated[
typing.Optional[str],
schema.name("Validation checks"),
schema.description("Validation on/off"),
] = "off"


@dataclass
class SysbenchMemoryOutput:
"""
This is the data structure for specific output
parameters returned by sysbench memory benchmark.
"""

blocksize: typing.Annotated[
str,
schema.name("Block size"),
Expand Down Expand Up @@ -185,40 +360,20 @@ class SysbenchMemoryOutputParams:
"Total number of operations performed by the memory workload"
),
]
Numberofthreads: typing.Annotated[
float,
schema.name("Number of threads"),
schema.description("Number of threads used by the workload"),
]


@dataclass
class SysbenchCpuOutputParams:
class SysbenchCpuOutput:
"""
This is the data structure for output
This is the data structure for specific output
parameters returned by sysbench cpu benchmark.
"""

totaltime: typing.Annotated[
float,
schema.name("Total time"),
schema.description("Total execution time of workload"),
]
totalnumberofevents: typing.Annotated[
float,
schema.name("Total number of events"),
schema.description("Total number of events performed by the workload"),
]
Primenumberslimit: typing.Annotated[
float,
schema.name("Prime numbers limit"),
schema.description("Number of prime numbers to use for CPU workload"),
]
Numberofthreads: typing.Annotated[
float,
schema.name("Number of threads"),
schema.description("Number of threads used by the workload"),
]


@dataclass
Expand Down Expand Up @@ -278,6 +433,24 @@ class SysbenchCpuResultParams:
]


@dataclass
class SysbenchMemoryOutputParams(
SysbenchCommonOutputParams, SysbenchMemoryOutput
):
"""
This is the data structure for all output
parameters returned by sysbench memory benchmark.
"""


@dataclass
class SysbenchCpuOutputParams(SysbenchCommonOutputParams, SysbenchCpuOutput):
"""
This is the data structure for all output
parameters returned by sysbench cpu benchmark.
"""


@dataclass
class WorkloadResultsCpu:
"""
Expand Down Expand Up @@ -348,6 +521,10 @@ class WorkloadError:
]


sysbench_cpu_input_schema = plugin.build_object_schema(SysbenchCpuInputParams)
sysbench_memory_input_schema = plugin.build_object_schema(
SysbenchMemoryInputParams
)
sysbench_cpu_output_schema = plugin.build_object_schema(
SysbenchCpuOutputParams
)
Expand Down
3 changes: 2 additions & 1 deletion configs/sysbench_cpu_example.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
threads: 2
events: 0
time: 15
cpu_max_prime: 12000
cpu-max-prime: 12000
validate: "on"
Loading

0 comments on commit 572c899

Please sign in to comment.