Skip to content

Commit

Permalink
Merge pull request ucb-art#8 from hyunjaekwon/add_lsf
Browse files Browse the repository at this point in the history
Add lsf
  • Loading branch information
ayan-biswas authored Feb 18, 2022
2 parents 2799c03 + 40a09ae commit 318b78f
Show file tree
Hide file tree
Showing 12 changed files with 328 additions and 38 deletions.
7 changes: 6 additions & 1 deletion run_scripts/dsn_cell.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import argparse

from pybag.enum import LogLevel

from bag.io import read_yaml
from bag.core import BagProject
from bag.util.misc import register_pdb_hook
Expand All @@ -37,15 +39,18 @@ def parse_options() -> argparse.Namespace:
help='Force simulation even if simulation netlist is unchanged')
parser.add_argument('-c', '--gen_sch', action='store_true', default=False,
help='Generate testbench schematics for debugging.')
parser.add_argument('-q', '--quiet', action='store_true', default=False,
help='Print only warning messages or above.')
args = parser.parse_args()
return args


def run_main(prj: BagProject, args: argparse.Namespace) -> None:
specs: Mapping[str, Any] = read_yaml(args.specs)

log_level = LogLevel.WARN if args.quiet else LogLevel.INFO
DesignerBase.design_cell(prj, specs, extract=args.extract, force_sim=args.force_sim,
force_extract=args.force_extract, gen_sch=args.gen_sch)
force_extract=args.force_extract, gen_sch=args.gen_sch, log_level=log_level)


if __name__ == '__main__':
Expand Down
208 changes: 208 additions & 0 deletions src/bag/concurrent/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
# BSD 3-Clause License
#
# Copyright (c) 2018, Regents of the University of California
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""This module define utility classes for launching subprocesses via IBM Load Sharing Facility (LSF).
"""

from typing import Optional, Sequence, Dict, Union, Any, List

import asyncio
import subprocess
from pathlib import Path
from concurrent.futures import CancelledError

from .core import SubProcessManager, FlowInfo


class LSFSubProcessManager(SubProcessManager):
"""A class that provides methods to run multiple subprocesses in parallel using IBM Load Sharing Facility (LSF).
Parameters
----------
queue: str
name of LSF queue to use for submitting jobs.
options: Optional[List[str]]
list of additional command line arguments to pass into the bsub command.
max_workers : Optional[int]
number of maximum allowed subprocesses. If None, defaults to system
CPU count.
cancel_timeout : float
Number of seconds to wait for a process to terminate once SIGTERM or
SIGKILL is issued. Defaults to 10 seconds.
"""

def __init__(self, queue: str, options: Optional[List[str]] = None, max_workers: int = 0,
cancel_timeout: float = 10.0) -> None:
self._queue = queue
self._options = options or []
super().__init__(max_workers, cancel_timeout)

async def async_new_subprocess(self,
args: Union[str, Sequence[str]],
log: str,
env: Optional[Dict[str, str]] = None,
cwd: Optional[str] = None) -> Optional[int]:
"""A coroutine which starts a subprocess.
If this coroutine is cancelled, it will shut down the subprocess gracefully using
SIGTERM/SIGKILL, then raise CancelledError.
Parameters
----------
args : Union[str, Sequence[str]]
command to run, as string or sequence of strings.
log : str
the log file name.
env : Optional[Dict[str, str]]
an optional dictionary of environment variables. None to inherit from parent.
cwd : Optional[str]
the working directory. None to inherit from parent.
Returns
-------
retcode : Optional[int]
the return code of the subprocess.
"""
if isinstance(args, str):
args = [args]

# get log file name, make directory if necessary
log_path = Path(log).resolve()
log_path.parent.mkdir(parents=True, exist_ok=True)

if cwd is not None:
# make sure current working directory exists
Path(cwd).mkdir(parents=True, exist_ok=True)

main_cmd = " ".join(args)

cmd_args = ['bsub', '-K', '-oo', str(log_path)] + self._options + [f'"{main_cmd}"']
cmd = " ".join(cmd_args)

async with self._semaphore:
proc = None
with open(log_path, 'w') as logf:
logf.write(f'command: {cmd}\n')
logf.flush()
try:
# shell must be used to preserve paths and environment variables on compute host
proc = await asyncio.create_subprocess_shell(cmd, stdout=logf, stderr=subprocess.STDOUT, env=env,
cwd=cwd)
retcode = await proc.wait()
return retcode
except CancelledError as err:
await self._kill_subprocess(proc)
raise err

async def async_new_subprocess_flow(self, proc_info_list: Sequence[FlowInfo]) -> Any:
"""A coroutine which runs a series of subprocesses.
If this coroutine is cancelled, it will shut down the current subprocess gracefully using
SIGTERM/SIGKILL, then raise CancelledError.
Parameters
----------
proc_info_list : Sequence[FlowInfo]
a list of processes to execute in series. Each element is a tuple of:
args : Union[str, Sequence[str]]
command to run, as string or list of string arguments.
log : str
log file name.
env : Optional[Dict[str, str]]
environment variable dictionary. None to inherit from parent.
cwd : Optional[str]
working directory path. None to inherit from parent.
vfun : Sequence[Callable[[Optional[int], str], Any]]
a function to validate if it is ok to execute the next process. The output of the
last function is returned. The first argument is the return code, the second
argument is the log file name.
Returns
-------
result : Any
the return value of the last validate function. None if validate function
returns False.
"""
num_proc = len(proc_info_list)
if num_proc == 0:
return None

async with self._semaphore:
for idx, (args, log, env, cwd, vfun) in enumerate(proc_info_list):
if isinstance(args, str):
args = [args]

log_path = Path(log).resolve()
log_path.parent.mkdir(parents=True, exist_ok=True)

if cwd is not None:
# make sure current working directory exists
Path(cwd).mkdir(parents=True, exist_ok=True)

main_cmd = " ".join(args)

cmd_args = ['bsub', '-K', '-o', str(log_path), '-e', str(log_path)] + self._options + [f'"{main_cmd}"']
cmd = " ".join(cmd_args)

proc, retcode = None, None
with open(log_path, 'w') as logf:
logf.write(f'command: {cmd}\n')
logf.flush()
try:
# shell must be used to preserve paths and environment variables on compute host
proc = await asyncio.create_subprocess_shell(cmd, stdout=logf, stderr=subprocess.STDOUT,
env=env, cwd=cwd)
retcode = await proc.wait()
except CancelledError as err:
await self._kill_subprocess(proc)
raise err

fun_output = vfun(retcode, str(log_path))
if idx == num_proc - 1:
return fun_output
elif not fun_output:
return None

# Some utility functions that are currently unused; could be useful in the future for job scheduling
@staticmethod
def get_njobs_per_user(queue):
res = subprocess.Popen(f'bqueues {queue}', shell=True, stdout=subprocess.PIPE).communicate()[0]
header, info = map(lambda s: s.decode('utf-8').split(), res.splitlines())
return int(info[header.index('JL/U')])

@staticmethod
def get_njobs_running(queue):
res = subprocess.Popen(f'bjobs -r -q {queue} | wc -l', shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()[0]
num_lines = int(res.decode('utf-8').strip())
# If there is a job running, there will be a header row, resulting in njobs + 1 lines in the prompt.
# Otherwise, there will be 0 lines
return max(num_lines - 1, 0)
8 changes: 6 additions & 2 deletions src/bag/interface/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@

from __future__ import annotations

from typing import Dict, Any, Sequence
from typing import Dict, Any, Sequence, Type

from pathlib import Path

from ..env import get_bag_work_dir
from ..io.file import write_file
from ..io.template import new_template_env_fs
from ..concurrent.core import SubProcessManager
from ..util.importlib import import_class

from .lef import LEFInterface

Expand All @@ -42,7 +43,10 @@ class AbstractInterface(LEFInterface):
def __init__(self, config: Dict[str, Any]) -> None:
LEFInterface.__init__(self, config)

self._manager = SubProcessManager(max_workers=1)
mgr_class: Type[SubProcessManager] = import_class(config.get('mgr_class', SubProcessManager))
mgr_kwargs: Dict[str, Any] = config.get('mgr_kwargs', {})

self._manager: SubProcessManager = mgr_class(max_workers=1, **mgr_kwargs)
self._temp_env_fs = new_template_env_fs()

def generate_lef(self, impl_lib: str, impl_cell: str, verilog_path: Path, lef_path: Path,
Expand Down
44 changes: 43 additions & 1 deletion src/bag/io/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
"""This module handles file related IO.
"""

from typing import TextIO, Any, Iterable, Union, Dict
from typing import TextIO, Any, Iterable, Union, Dict, Optional

import os
import time
Expand Down Expand Up @@ -140,6 +140,48 @@ def readlines_iter(fname: Union[str, Path]) -> Iterable[str]:
yield line


def is_valid_file(fname: Union[str, Path], ready_str: Optional[str], timeout: float, wait_intvl: float) -> bool:
"""Checks if given file is valid by seeing if it exists and optionally contains a string.
Parameters
----------
fname : Union[str, Path]
the file name.
ready_str : Optional[str]
the string to check if file is ready. None if we should only check to see if file exists
timeout : float
Maximum amount of time to wait.
wait_intvl: float
Amount of time in between iterations to check if file is ready
Returns
-------
is_valid : bool
True if file exists and optionally contains ready_str. False if timed out.
"""
fname = Path(fname)
max_iter = timeout / wait_intvl
iter_cnt = 0
while not fname.is_file():
if iter_cnt > max_iter:
return False
time.sleep(wait_intvl)
iter_cnt += 1

if ready_str is None:
return True

content = read_file(fname)
while ready_str not in content:
if iter_cnt > max_iter:
return False
time.sleep(wait_intvl)
iter_cnt += 1
content = read_file(fname)

return True


def read_yaml(fname: Union[str, Path]) -> Any:
"""Read the given file using YAML.
Expand Down
19 changes: 14 additions & 5 deletions src/bag/simulation/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
and retrieve results.
"""

from typing import Mapping, Any, Tuple, Union, Sequence
from typing import Mapping, Any, Tuple, Union, Sequence, Dict, Type

import abc
from pathlib import Path
Expand All @@ -57,6 +57,7 @@
from pybag.core import get_cdba_name_bits

from ..concurrent.core import SubProcessManager, batch_async_task
from ..util.importlib import import_class
from .data import SimNetlistInfo, SimData


Expand Down Expand Up @@ -170,9 +171,13 @@ class SimProcessManager(SimAccess, abc.ABC):
def __init__(self, tmp_dir: str, sim_config: Mapping[str, Any]) -> None:
SimAccess.__init__(self, tmp_dir, sim_config)

mgr_class: Type[SubProcessManager] = import_class(sim_config.get('mgr_class', SubProcessManager))
mgr_kwargs: Dict[str, Any] = sim_config.get('mgr_kwargs', {})

cancel_timeout = sim_config.get('cancel_timeout_ms', 10000) / 1e3
self._manager = SubProcessManager(max_workers=sim_config.get('max_workers', 0),
cancel_timeout=cancel_timeout)

self._manager: SubProcessManager = mgr_class(max_workers=sim_config.get('max_workers', 0),
cancel_timeout=cancel_timeout, **mgr_kwargs)

@property
def manager(self) -> SubProcessManager:
Expand Down Expand Up @@ -227,9 +232,13 @@ class EmSimProcessManager(EmSimAccess, abc.ABC):
def __init__(self, tmp_dir: str, sim_config: Mapping[str, Any], **kwargs) -> None:
EmSimAccess.__init__(self, tmp_dir, sim_config, **kwargs)

mgr_class: Type[SubProcessManager] = import_class(sim_config.get('mgr_class', SubProcessManager))
mgr_kwargs: Dict[str, Any] = sim_config.get('mgr_kwargs', {})

cancel_timeout = sim_config.get('cancel_timeout_ms', 10000) / 1e3
self._manager = SubProcessManager(max_workers=sim_config.get('max_workers', 0),
cancel_timeout=cancel_timeout)

self._manager: SubProcessManager = mgr_class(max_workers=sim_config.get('max_workers', 0),
cancel_timeout=cancel_timeout, **mgr_kwargs)

@property
def manager(self) -> SubProcessManager:
Expand Down
5 changes: 1 addition & 4 deletions src/bag/simulation/design.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ def design_cell(cls, prj: BagProject, specs: Mapping[str, Any], extract: bool =
precision: int = specs.get('precision', 6)

dsn_cls = cast(Type[DesignerBase], import_class(dsn_str))
if isinstance(root_dir, str):
root_path = Path(root_dir)
else:
root_path = root_dir
root_path = prj.get_root_path(root_dir)

dsn_options = dict(
extract=extract,
Expand Down
Loading

0 comments on commit 318b78f

Please sign in to comment.