Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional bandwidth monitoring to benchmark.py #1289

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import logging
import os
from os import path
import signal
import subprocess
from subprocess import Popen
import tempfile
from typing import Optional
from typing import List, Optional
import urllib.request

import hydra
Expand Down Expand Up @@ -237,6 +238,81 @@ def _get_ec2_instance_id() -> Optional[str]:

return instance_id

class ResourceMonitoring():
def __init__(self, with_bwm: bool):
"""Resource monitoring setup.

with_bwm: Whether to start bandwidth monitor tool `bwm-ng`. Optional because it's not available
in the default AL2023 distro so you have to install it first."""
self.mpstat_process = None
self.bwm_ng_process = None
self.with_bwm = with_bwm
self.output_files = []

def _start(self) -> None:
log.debug("Starting resource monitors...")
self.mpstat_process = self._start_mpstat()
if self.with_bwm:
self.bwm_ng_process = self._start_bwm_ng()

def _close(self) -> None:
log.debug("Shutting down resource monitors...")
for process in [self.mpstat_process, self.bwm_ng_process]:
self._stop_resource_monitor(process)

for output_file in self.output_files:
self._close_output_file(output_file)

def _close_output_file(self, output_file):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does this really need to be its own method?

try:
output_file.close()
except Exception:
log.error("Error closing {output_file}:", exc_info=True)

def _stop_resource_monitor(self, process):
try:
if process:
process.send_signal(signal.SIGINT)
process.wait()
except Exception:
log.error("Error shutting down monitoring:", exc_info=True)

def _start_monitor_with_builtin_repeat(self, process_args: List[str], output_file) -> any:
"""Start process_args with output to output_file.

Used for starting processes in the background to do monitoring; good for tools that repeat the
measurement themselves so only need to be started once, and that can write their output to stdout.
"""
f = open(output_file, 'w')
self.output_files.append(f)
log.debug(f"Starting monitoring tool {''.join(process_args)}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will output the args but with no spaces :(

Suggested change
log.debug(f"Starting monitoring tool {''.join(process_args)}")
log.debug(f"Starting monitoring tool {' '.join(process_args)}")

return subprocess.Popen(process_args, stdout=f)

def _start_mpstat(self) -> any:
return self._start_monitor_with_builtin_repeat([
"/usr/bin/mpstat",
"-P", "ALL", # cores
"-o", "JSON",
"1", # interval
], 'mpstat.json')

def _start_bwm_ng(self) -> any:
"""Starts bwm-ng, which probably needs to be installed.

https://www.gropp.org/?id=projects&sub=bwm-ng"""
return self._start_monitor_with_builtin_repeat([
'/usr/local/bin/bwm-ng',
'-o', 'csv'
], 'bwm-ng.csv')

@contextmanager
def managed(with_bwm=False):
resource = ResourceMonitoring(with_bwm)
try:
resource._start()
yield resource
finally:
resource._close()

@hydra.main(version_base=None, config_path="conf", config_name="config")
def run_experiment(cfg: DictConfig) -> None:
Expand All @@ -258,8 +334,8 @@ def run_experiment(cfg: DictConfig) -> None:
metadata.update(mount_metadata)
mount_dir = mount_metadata["mount_dir"]
try:
# TODO: Add resource monitoring during FIO job
_run_fio(cfg, mount_dir)
with ResourceMonitoring.managed(cfg['with_bwm']):
_run_fio(cfg, mount_dir)
metadata["success"] = True
except Exception as e:
log.error(f"Error running experiment: {e}")
Expand Down
3 changes: 3 additions & 0 deletions benchmark/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ upload_checksums: !!null
mountpoint_max_background: !!null
mountpoint_congestion_threshold: !!null

# For monitoring network bandwidth
with_bwm: false

iterations: 1

hydra:
Expand Down
Loading