Skip to content

Commit

Permalink
integrate new run API
Browse files Browse the repository at this point in the history
integrate with config
update integration tests
change github workflow to use the *dev* requirements
  • Loading branch information
Rykath committed Oct 28, 2022
1 parent a7f3134 commit cc04ae2
Show file tree
Hide file tree
Showing 37 changed files with 363 additions and 584 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/install-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,9 @@ jobs:
run: |
python -m pip install --upgrade pip
python -m pip install setuptools wheel
python -m pip install pytest pytest-cov
- name: Install package
run: |
USE_FORTRAN=1 python -m pip install -e .
USE_FORTRAN=1 python -m pip install -e .[dev]
- name: Test with pytest
run: |
pytest
Expand Down
29 changes: 29 additions & 0 deletions doc/dev_run_system.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
.. _dev_run:

Development Notes: Run System
#############################

For an overview, requirements and usage, see: :ref:`run_system`

Hierachy
--------

The run system contains two major components: the *Runner* and many *Workers*. Additionally the *Interface* connects these two layers.
*proFit* is started from some base directory (``base_dir``), which contains the configuration and simulation files. The generated data will be written to this directory.
The *Runner* can be configured to use a different temporary directory (``tmp_dir``) which will contain temporary files (e.g. used by the *Interface*, logs and the individual run directories).

The *Command-Worker* will create individual run directories within the temporary directory for each spawned *Worker* by copying a template directory and replacing template expressions with parameter values.

Most paths in the configuration, including the path to the template directory, will therefore be relative to the base directory. The paths to *Interface*-files and for log-files will be relative to the temporary directory and paths within the worker configuration (including pre and post) will be relative to the individual run directories.

Enivronment variables
---------------------

Most of these environment variables are only set if required.

* ``PROFIT_BASE_DIR`` - absolute path to the base directory
* ``PROFIT_INCLUDES`` - JSON list of absolute paths to python files which need to be imported (e.g. contain custom components)
* ``PROFIT_RUN_ID`` - the ``run_id`` to identify the *Worker*, set for each *Worker*
* ``PROFIT_ARRAY_ID`` - modifier to the ``run_id``, needed for batch computation on clusters
* ``PROFIT_WORKER`` - JSON configuration of the *Worker*
* ``PROFIT_INTERFACE`` - JSON configuration of the *Interface*
22 changes: 11 additions & 11 deletions examples/mockup/mockup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@


def rosenbrock(x, y, a, b):
return (a - x)**2 + b * (y - x**2)**2
return (a - x) ** 2 + b * (y - x**2) ** 2


def func(r, u, v, a, b):
return rosenbrock((r - 0.5) + u - 5, 1 + 3 * (v - 0.6), a, b)


@Worker.register('mockup')
class Mockup(Worker):
def main(self):
class Mockup(Worker, label="mockup"):
def work(self):
self.interface.retrieve()
inputs = self.interface.input
names = self.interface.input.dtype.names
r = inputs['r'] if 'r' in names else 0.25
u = inputs['u'] if 'u' in names else 5
v = inputs['v'] if 'v' in names else 0.5
a = inputs['a'] if 'a' in names else 1
b = inputs['b'] if 'b' in names else 2
self.interface.output['f'] = func(r, u, v, a, b)
self.interface.done()
r = inputs["r"] if "r" in names else 0.25
u = inputs["u"] if "u" in names else 5
v = inputs["v"] if "v" in names else 0.5
a = inputs["a"] if "a" in names else 1
b = inputs["b"] if "b" in names else 2
self.interface.output["f"] = func(r, u, v, a, b)
self.interface.transmit()
3 changes: 1 addition & 2 deletions examples/mockup/study/profit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ variables:
f: Output
run:
runner:
class: local
class: fork
parallel: 4
sleep: 0
worker: mockup

ui:
Expand Down
2 changes: 1 addition & 1 deletion profit/al/active_learning.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def update_run(self, candidates):
for idx, value in enumerate(values):
params_array[idx][key] = value
# Start batch
self.runner.spawn_array(params_array, blocking=True)
self.runner.spawn_array(params_array, wait=True)

def update_data(self):
"""Update the variables with the runner data."""
Expand Down
8 changes: 2 additions & 6 deletions profit/al/mcmc_al.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ def __init__(self, runner, variables, reference_data, ntrain, warmup_cycles=defa
self.accepted = np.zeros((self.ntrain, self.ndim), dtype=bool)
self.log_random = np.log(np.random.random((self.ntrain, self.ndim)))

interface_class = self.runner.interface.__class__
RunnerInterface = interface_class(self.runner.run_config['interface'], ntrain * self.ndim + 1,
self.runner.base_config['input'], self.runner.base_config['output'],
logger_parent=self.runner.logger)
self.runner.interface = RunnerInterface
self.runner.interface.resize(ntrain * self.ndim + 1)

if delayed_acceptance:
from profit.sur import Surrogate
Expand All @@ -92,7 +88,7 @@ def warmup(self, save_intermediate=base_defaults['save_intermediate']):
if self.initial_points is None else self.initial_points

# TODO: Implement warmup with default AL model
"""
"""
if self.delayed_acceptance_surrogate:
from profit.al.default_al import DefaultAL
from profit.util.variable import OutputVariable
Expand Down
2 changes: 1 addition & 1 deletion profit/al/simple_al.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def warmup(self, save_intermediate=base_defaults['save_intermediate']):
rand = values[key][0]
params_array[idx][key] = rand

self.runner.spawn_array(params_array, blocking=True)
self.runner.spawn_array(params_array, wait=True)
self.update_data()

self.surrogate.train(self.variables.input[:self.nwarmup], self.variables.output[:self.nwarmup])
Expand Down
223 changes: 22 additions & 201 deletions profit/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
VALID_FORMATS = ('.yaml', '.py')

"""
yaml has to be configured to represent OrderedDict
yaml has to be configured to represent OrderedDict
see https://stackoverflow.com/questions/16782112/can-pyyaml-dump-dict-items-in-non-alphabetical-order
and https://stackoverflow.com/questions/5121931/in-python-how-can-you-load-yaml-mappings-as-ordereddicts
"""
Expand Down Expand Up @@ -282,12 +282,15 @@ def from_file(cls, filename=defaults.config_file):

def load_includes(self):
from profit.util import load_includes
import os
import json

if isinstance(self.include, str):
self.include = [self.include]

self.include = [path.abspath(path.join(self.base_dir, p)) for p in self.include]
load_includes(self.include)
os.environ["PROFIT_INCLUDES"] = json.dumps(self.include)


@BaseConfig.register("run")
Expand Down Expand Up @@ -321,204 +324,26 @@ def process_entries(self, base_config):
Default values from the global profit.defaults.py file are loaded.
"""
labels = {}
defaults = "run"

def process_entries(self, base_config):
"""Set paths and process entries of sub configs."""

if not path.isabs(self.log_path):
self.log_path = path.abspath(path.join(base_config.base_dir, self.log_path))

for key in self.labels:
getattr(self, key.lower()).process_entries(base_config)


@RunConfig.register("runner")
class RunnerConfig(AbstractConfig):
"""Base Runner config."""
labels = {}
defaults = None


@RunnerConfig.register("local")
class LocalRunnerConfig(RunnerConfig):
"""
Example:
.. code-block:: yaml
class: local
parallel: all # maximum number of simultaneous runs (for spawn array)
sleep: 0 # number of seconds to sleep while polling
fork: true # whether to spawn the worker via forking instead of a subprocess (via a shell)
"""
labels = {}
defaults = "run_runner_local"

def process_entries(self, base_config):
"""Converts `parallel: all` to number of available cpus"""
from os import sched_getaffinity
if self.parallel == 'all':
self.parallel = len(sched_getaffinity(0))


@RunnerConfig.register("slurm")
class SlurmRunnerConfig(RunnerConfig):
"""
Example:
.. code-block:: yaml
class: slurm
parallel: null # maximum number of simultaneous runs (for spawn array)
sleep: 0 # number of seconds to sleep while (internally) polling
poll: 60 # number of seconds between external polls (to catch failed runs), use with care!
path: slurm.bash # the path to the generated batch script (relative to the base directory)
custom: false # whether a custom batch script is already provided at 'path'
prefix: srun # prefix for the command
OpenMP: false # whether to set OMP_NUM_THREADS and OMP_PLACES
cpus: 1 # number of cpus (including hardware threads) to use (may specify 'all')
options: # (long) options to be passed to slurm: e.g. time, mem-per-cpu, account, constraint
job-name: profit
"""
labels = {}
defaults = "run_runner_slurm"

def process_entries(self, base_config):
"""Converts paths to absolute and check type of 'cpus'"""
# Convert path to absolute path
if not path.isabs(self.path):
self.path = path.abspath(path.join(base_config.base_dir, self.path))
# Check type of 'cpus'
if (type(self.cpus) is not int or self.cpus < 1) and self.cpus != 'all':
raise ValueError(f'config option "cpus" may only be a positive integer or "all" and not {self.cpus}')


@RunConfig.register("interface")
class InterfaceConfig(AbstractConfig):
"""Base runner interface config."""
labels = {}
defaults = None


@InterfaceConfig.register("memmap")
class MemmapInterfaceConfig(InterfaceConfig):
"""
Example:
.. code-block:: yaml
class: memmap
path: interface.npy # path to memory mapped interface file, relative to base directory
"""
labels = {}
defaults = "run_interface_memmap"

def process_entries(self, base_config):
"""Converts 'path' to absolute."""
if not path.isabs(self.path):
self.path = path.abspath(path.join(base_config.base_dir, self.path))


@InterfaceConfig.register("zeromq")
class ZeroMQInterfaceConfig(InterfaceConfig):
"""
Example:
.. code-block:: yaml
class: zeromq
transport: tcp # transport system used by zeromq
port: 9000 # port for the interface
address: null # override bind address used by zeromq
connect: null # override connect address used by zeromq
timeout: 2500 # zeromq polling timeout, in ms
retries: 3 # number of zeromq connection retries
retry-sleep: 1 # sleep between retries, in s
"""
labels = {}
defaults = "run_interface_zeromq"


@RunConfig.register("pre")
class PreConfig(AbstractConfig):
"""Base config for preprocessors."""
labels = {}
defaults = None


@PreConfig.register("template")
class TemplatePreConfig(PreConfig):
"""
Example:
.. code-block:: yaml
class: template
path: template # directory to copy from, relative to base directory
param_files: null # files in template which contain placeholders for variables, null means all files
# can be a filename or a list of filenames
"""
labels = {}
defaults = "run_pre_template"

def process_entries(self, base_config):
"""Convert 'path' to absolute and set 'param_files'."""
if not path.isabs(self.path):
self.path = path.abspath(path.join(base_config.base_dir, self.path))

if isinstance(self.param_files, str):
self.param_files = [self.param_files]


@RunConfig.register("post")
class PostConfig(AbstractConfig):
"""Base class for postprocessor configs."""
labels = {}
defaults = None


@PostConfig.register("json")
class JsonPostConfig(PostConfig):
"""
Example:
.. code-block:: yaml
class: json
path: stdout # file to read from, relative to the run directory
"""
labels = {}
defaults = "run_post_json"


@PostConfig.register("numpytxt")
class NumpytxtPostConfig(PostConfig):
"""
Example:
.. code-block:: yaml
class: numpytxt
path: stdout # file to read from, relative to the run directory
names: "f g" # list or string of output variables in order, default read from config/variables
options: # options which are passed on to numpy.genfromtxt() (fname & dtype are used internally)
deletechars: ""
"""
labels = {}
defaults = "run_post_numpytxt"

def process_entries(self, base_config):
"""Sets the included names of variables. The Keyword 'all' includes all variables."""
if isinstance(self.names, str):
self.names = list(base_config.output.keys()) if self.names == 'all' else self.names.split()

defaults = "run"

@PostConfig.register("hdf5")
class HDF5PostConfig(PostConfig):
"""
Example:
.. code-block:: yaml
def update(self, **entries):
"""Updates the attributes with user inputs. No warning is issued if the attribute set by the user is unknown.
class: hdf5
path: output.hdf5 # file to read from, relative to the run directory
"""
labels = {}
defaults = "run_post_hdf5"
Parameters:
entries (dict): User input of the config parameters.
"""
for name, value in entries.items():
if hasattr(self, name) or name in map(str.lower, self.labels):
attr = getattr(self, name, None)
if isinstance(attr, dict):
attr.update(value)
setattr(self, name, attr)
else:
setattr(self, name, value)
else:
setattr(self, name, value)


@BaseConfig.register("fit")
Expand Down Expand Up @@ -628,8 +453,8 @@ def process_entries(self, base_config):
for encoders, select in [(self._input_encoders, input_select), (self._output_encoders, output_select)]:
if select is not None:
encoders.append({
"class": name,
"columns": select,
"class": name,
"columns": select,
"parameters": {k: float(v) for k, v in config.get("parameters", {})}
if not isinstance(config, str) else {},
})
Expand Down Expand Up @@ -738,10 +563,6 @@ class UIConfig(AbstractConfig):
defaults = "ui"


@RunnerConfig.register("default")
@InterfaceConfig.register("default")
@PreConfig.register("default")
@PostConfig.register("default")
@AcquisitionFunctionConfig.register("default")
class DefaultConfig(AbstractConfig):
"""Default config for all run sub configs which just updates the attributes with user entries."""
Expand Down
Loading

0 comments on commit cc04ae2

Please sign in to comment.