Replies: 3 comments 5 replies
-
Here's some early results from last week for instance bootstrap scaling across 64 nodes with up to 48 brokers per node (sorted by the time to "ready", i.e.
For lack of a better place to put it, here's the test script I used to generate the above results #!/usr/bin/env python3
import argparse
import itertools
import math
import os
import sys
import time
import flux
import flux.job
import flux.uri
from flux.idset import IDset
from flux.resource import resource_list
class InstanceBench:
def __init__(
self,
flux_handle,
nnodes,
brokers_per_node=1,
topo="kary:2",
conf=None,
progress=None,
):
self.flux_handle = flux_handle
self.nnodes = nnodes
self.brokers_per_node = brokers_per_node
self.topo = topo
self.id = None
self.t0 = None
self.t_submit = None
self.t_start = None
self.t_uri = None
self.t_shell_init = None
self.t_ready = None
self.t_finish = None
self.child_handle = None
self.then_cb = None
self.then_args = []
self.then_kw_args = {}
self.progress = progress
self.size = nnodes * brokers_per_node
self.topo = topo
self.name = f"[N:{nnodes:<4d} SZ:{self.size:<4d} {self.topo:<8}]"
broker_opts = ["-Sbroker.rc2_none=1"]
if topo is not None:
broker_opts.append(f"-Stbon.topo={topo}")
if conf is not None:
broker_opts.append("-c{{tmpdir}}/conf.json")
jobspec = flux.job.JobspecV1.from_command(
command=["flux", "broker", *broker_opts],
exclusive=True,
num_nodes=nnodes,
num_tasks=nnodes * brokers_per_node,
)
jobspec.setattr_shell_option("mpi", "none")
if conf is not None:
jobspec.add_file("conf.json", conf)
self.jobspec = jobspec
def log(self, msg):
ts = self.ts or (time.time() - self.t0)
print(f"{self.name}: {ts:6.3f}s: {msg}", file=sys.stderr, flush=True)
self.ts = None
def then(self, cb, *args, **kw_args):
self.then_cb = cb
self.then_args = args
self.then_kw_args = kw_args
def submit(self):
self.t0 = time.time()
flux.job.submit_async(self.flux_handle, self.jobspec).then(self.submit_cb)
return self
def submit_cb(self, future):
try:
self.id = future.get_id()
except OSError as exc:
print(exc, file=sys.stderr)
return
if self.progress:
job = flux.job.JobInfo(
{
"id": self.id,
"state": flux.constants.FLUX_JOB_STATE_SCHED,
"t_submit": time.time(),
}
)
self.progress.add_job(job)
flux.job.event_watch_async(self.flux_handle, self.id).then(self.bg_wait_cb)
def child_ready_cb(self, future):
future.get()
self.t_ready = time.time()
self.log("ready")
self.size = self.child_handle.attr_get("size")
self.topo = self.child_handle.attr_get("tbon.topo")
# Shutdown and report timing:
self.log("requesting shutdown")
shutdown = self.child_handle.rpc("shutdown.start", {"loglevel": 1})
def bg_wait_cb(self, future):
event = future.get_event()
if self.progress:
self.progress.process_event(self.id, event)
if not event:
# The job has unexpectedly exited since we're at the end
# of the eventlog. Run `flux job attach` since this will dump
# any errors or output, then raise an exception.
os.system(f"flux job attach {self.id} >&2")
raise OSError(f"{self.id}: unexpectedly exited")
self.ts = event.timestamp - self.t0
# self.log(f"{event.name}")
if event.name == "submit":
self.t_submit = event.timestamp
elif event.name == "alloc":
self.t_alloc = event.timestamp
elif event.name == "start":
self.t_start = event.timestamp
flux.job.event_watch_async(
self.flux_handle, self.id, eventlog="guest.exec.eventlog"
).then(self.shell_init_wait_cb)
elif event.name == "memo" and "uri" in event.context:
self.t_uri = event.timestamp
uri = str(flux.uri.JobURI(event.context["uri"]))
self.log(f"opening handle to {self.id}")
self.child_handle = flux.Flux(uri)
# Set main handle reactor as reactor for his child handle so
# events can be processed:
self.child_handle.flux_set_reactor(self.flux_handle.get_reactor())
self.log("connected to child job")
# Wait for child instance to be ready:
self.child_handle.rpc("state-machine.wait").then(self.child_ready_cb)
elif event.name == "finish":
self.t_finish = event.timestamp
future.cancel(stop=True)
if self.then_cb is not None:
self.then_cb(self, *self.then_args, **self.then_kw_wargs)
if self.progress:
# Notify ProgressBar that this job is done via a None event
self.progress.process_event(self.id, None)
def shell_init_wait_cb(self, future):
event = future.get_event()
if not event:
return
self.ts = event.timestamp - self.t0
self.log(f"exec.{event.name}")
if event.name == "shell.init":
self.t_shell_init = event.timestamp
future.cancel(stop=True)
def timing_header(self, file=sys.stdout):
print(
"%5s %5s %8s %8s %8s %8s %8s %8s %8s"
% (
"NODES",
"SIZE",
"TOPO",
"T_START",
"T_URI",
"T_INIT",
"T_READY",
"(TOTAL)",
"T_SHUTDN",
),
file=file,
)
def report_timing(self, file=sys.stdout):
print(
"%5s %5s %8s %8.3f %8.3f %8.3f %8.3f %8.3f %8.3f"
% (
self.nnodes,
self.size,
self.topo,
self.t_start - self.t_alloc,
self.t_uri - self.t_start,
self.t_shell_init - self.t_alloc,
self.t_ready - self.t_shell_init,
self.t_ready - self.t_alloc,
self.t_finish - self.t_ready,
),
file=file,
)
def generate_values(end):
"""
Generate a list of powers of 2 (including `1` by default), up to and
including `end`. If `end` is not a power of 2 insert it as the last
element in list to ensure it is present.
The list is returned in reverse order (largest values first)
"""
stop = int(math.log2(end)) + 1
values = [1 << i for i in range(stop)]
if end not in values:
values.append(end)
return values.reverse()
def parse_args():
parser = argparse.ArgumentParser(
prog="instance-timing", formatter_class=flux.util.help_formatter()
)
parser.add_argument(
"-N",
"--max-nodes",
metavar="N",
type=int,
default=None,
help="Scale up to N nodes by powers of two",
)
parser.add_argument(
"-B",
"--max-brokers-per-node",
type=int,
metavar="N",
default=1,
help="Run powers of 2 brokers-per-node up to N",
)
parser.add_argument(
"--topo",
metavar="TOPO,...",
type=str,
default="kary:2",
help="add one or more tbon.topo values to test",
)
parser.add_argument(
"-L",
"--log-file",
metavar="FILE",
help="log results to FILE in addition to stdout",
)
return parser.parse_args()
def get_max_nnodes(flux_handle):
"""
Get the maximum nodes available in the default queue or anonymous
queue if there are no queues configured.
"""
resources = resource_list(flux.Flux()).get()
try:
config = flux_handle.rpc("config.get").get()
defaultq = config["policy"]["jobspec"]["defaults"]["system"]["queue"]
constraint = config["queues"][defaultq]["requires"]
avail = resources["up"].copy_constraint({"properties": constraint})
except KeyError:
avail = resources["up"]
return avail.nnodes
def print_results(instances, ofile=sys.stdout):
instances[0].timing_header(ofile)
for ib in instances:
ib.report_timing(ofile)
def main():
args = parse_args()
args.topo = args.topo.split(",")
h = flux.Flux()
if not args.max_nodes:
args.max_nodes = get_max_nnodes(h)
nnodes = generate_values(args.max_nodes)
bpn = generate_values(args.max_brokers_per_node)
inputs = itertools.product(nnodes, bpn, args.topo)
progress = flux.job.watcher.JobProgressBar(h)
progress.start()
instances = []
for i in inputs:
instances.append(
InstanceBench(
h, i[0], brokers_per_node=i[1], topo=i[2], progress=progress
).submit()
)
h.reactor_run()
print_results(instances)
if args.log_file:
with open(args.log_file, "w") as ofile:
print_results(instances, ofile=ofile)
if __name__ == "__main__":
main()
# vi: ts=4 sw=4 expandtab |
Beta Was this translation helpful? Give feedback.
-
On Friday, I was able to get 1040 nodes on dane. This system has 112 cores per node for a total of 116480 cores. All these tests were with the default mvapich2 on our systems
Here's the raw results with 3 different instance topologies from the MPI
Here's a quick plot of the results. If I have time I might try splitting out data by processes per node as well. Edit: I realized that the |
Beta Was this translation helpful? Give feedback.
-
@nhanford was asking about numbers for running OpenMPI at scale. Unfortunately, it's not quite as large an experiment, but I wasn't able to get 1040 nodes to work.
Note that currently we have to use |
Beta Was this translation helpful? Give feedback.
-
We have an opportunity to do some scaling tests on the dane cluster (~1300 available nodes).
This discussion is for recording what we'd like to test and the results. The current plan is to test:
kary:2
,kary:256
, ...,kary:0
andbinomial
srun
launched MPIBeta Was this translation helpful? Give feedback.
All reactions