diff --git a/benchmark/benchmark.py b/benchmark/benchmark.py index e468945ed..ac4bf6d18 100644 --- a/benchmark/benchmark.py +++ b/benchmark/benchmark.py @@ -4,10 +4,11 @@ import logging import os from os import path +import signal import subprocess from subprocess import Popen import tempfile -from typing import Optional +from typing import List, Optional import urllib.request import hydra @@ -237,6 +238,81 @@ def _get_ec2_instance_id() -> Optional[str]: return instance_id +class ResourceMonitoring(): + def __init__(self, with_bwm: bool): + """Resource monitoring setup. + + with_bwm: Whether to start bandwidth monitor tool `bwm-ng`. Optional because it's not available + in the default AL2023 distro so you have to install it first.""" + self.mpstat_process = None + self.bwm_ng_process = None + self.with_bwm = with_bwm + self.output_files = [] + + def _start(self) -> None: + log.debug("Starting resource monitors...") + self.mpstat_process = self._start_mpstat() + if self.with_bwm: + self.bwm_ng_process = self._start_bwm_ng() + + def _close(self) -> None: + log.debug("Shutting down resource monitors...") + for process in [self.mpstat_process, self.bwm_ng_process]: + self._stop_resource_monitor(process) + + for output_file in self.output_files: + self._close_output_file(output_file) + + def _close_output_file(self, output_file): + try: + output_file.close() + except Exception: + log.error("Error closing {output_file}:", exc_info=True) + + def _stop_resource_monitor(self, process): + try: + if process: + process.send_signal(signal.SIGINT) + process.wait() + except Exception: + log.error("Error shutting down monitoring:", exc_info=True) + + def _start_monitor_with_builtin_repeat(self, process_args: List[str], output_file) -> any: + """Start process_args with output to output_file. + + Used for starting processes in the background to do monitoring; good for tools that repeat the + measurement themselves so only need to be started once, and that can write their output to stdout. + """ + f = open(output_file, 'w') + self.output_files.append(f) + log.debug(f"Starting monitoring tool {''.join(process_args)}") + return subprocess.Popen(process_args, stdout=f) + + def _start_mpstat(self) -> any: + return self._start_monitor_with_builtin_repeat([ + "/usr/bin/mpstat", + "-P", "ALL", # cores + "-o", "JSON", + "1", # interval + ], 'mpstat.json') + + def _start_bwm_ng(self) -> any: + """Starts bwm-ng, which probably needs to be installed. + + https://www.gropp.org/?id=projects&sub=bwm-ng""" + return self._start_monitor_with_builtin_repeat([ + '/usr/local/bin/bwm-ng', + '-o', 'csv' + ], 'bwm-ng.csv') + + @contextmanager + def managed(with_bwm=False): + resource = ResourceMonitoring(with_bwm) + try: + resource._start() + yield resource + finally: + resource._close() @hydra.main(version_base=None, config_path="conf", config_name="config") def run_experiment(cfg: DictConfig) -> None: @@ -258,8 +334,8 @@ def run_experiment(cfg: DictConfig) -> None: metadata.update(mount_metadata) mount_dir = mount_metadata["mount_dir"] try: - # TODO: Add resource monitoring during FIO job - _run_fio(cfg, mount_dir) + with ResourceMonitoring.managed(cfg['with_bwm']): + _run_fio(cfg, mount_dir) metadata["success"] = True except Exception as e: log.error(f"Error running experiment: {e}") diff --git a/benchmark/conf/config.yaml b/benchmark/conf/config.yaml index ce987a65c..d2c9281a7 100644 --- a/benchmark/conf/config.yaml +++ b/benchmark/conf/config.yaml @@ -32,6 +32,9 @@ upload_checksums: !!null mountpoint_max_background: !!null mountpoint_congestion_threshold: !!null +# For monitoring network bandwidth +with_bwm: false + iterations: 1 hydra: