From efe579340facca272c16eaa2f077a159815d102e Mon Sep 17 00:00:00 2001 From: Flan Date: Tue, 31 Oct 2017 13:48:37 -0500 Subject: [PATCH 1/2] async stuff --- rickshaw/hyperion.py | 0 rickshaw/node_server.py | 61 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) create mode 100644 rickshaw/hyperion.py create mode 100644 rickshaw/node_server.py diff --git a/rickshaw/hyperion.py b/rickshaw/hyperion.py new file mode 100644 index 0000000..e69de29 diff --git a/rickshaw/node_server.py b/rickshaw/node_server.py new file mode 100644 index 0000000..8cffcb9 --- /dev/null +++ b/rickshaw/node_server.py @@ -0,0 +1,61 @@ +"""Asynchonous job creation for rickshaw for use on HPC systems.""" + +import asyncio +import concurrent.futures +from asyncio.subprocess import create_subprocess_exec +from argparse import ArgumentParser + +def make_parser(): + """Makes the argument parser for the rickshaw node server.""" + p = ArgumentParser("rickshaw-node-server", description="Rickshaw Node Server CLI") + p.add_argument("--debug", action='store_true', default=False, + dest='debug', help="runs the server in debug mode.") + p.add_argument("-p", "--nproc", type=int, default=1, dest="nproc", + help="Number of processes") + p.add_argument("-s", "--nsim", type=int, default=1, dest="nsim", + help="Number of simulations to run") + p.add_argument("-n", "--name", type=str, default="cyclus", dest="name", + help="Node name") + p.add_argument("-f", "--format", type=str, default="h5", dest="format", + help="The format of output file, h5 or sqlite") + return p + +async def run_sim(output_q, filename): + inputfile = "" + p = await create_subprocess_exec("cyclus", "-o", filename, "-i", inputfile) + await p.wait() + await output_q.put(filename) + +async def run_sims(output_q, nsim): + i = 0 + pending_tasks = [] + while i < nsim: + while not output_q.empty() and i < nsim: + filename = await output_q.get() + sim_task = asyncio.ensure_future(run_sim(output_q, filename)) + pending_tasks.append(sim_task) + i += 1 + if len(pending_tasks) > 0: + done, pending_tasks = await asyncio.wait(pending_tasks, return_when=concurrent.futures.FIRST_COMPLETED) + pending_tasks = list(pending_tasks) + if len(pending_tasks) > 0: + await asyncio.wait(pending_tasks) + +def main(args=None): + p = make_parser() + ns = p.parse_args(args=args) + executor = concurrent.futures.ThreadPoolExecutor(max_workers=ns.nproc) + loop = asyncio.get_event_loop() + output_q = asyncio.Queue() + for i in range(ns.nproc): + output_q.put_nowait("{name}-{i:03}.{format}".format(name=ns.name, i=i, format=ns.format)) + if ns.debug: + loop.set_debug(True) + try: + loop.run_until_complete(run_sims(output_q, ns.nsim)) + finally: + if not loop.is_closed(): + loop.close() + +if __name__ == '__main__': + main() From 0974504ed927fc5271b039f20015e2e203571225 Mon Sep 17 00:00:00 2001 From: Flan Date: Wed, 1 Nov 2017 11:14:21 -0500 Subject: [PATCH 2/2] x --- rickshaw/deploy.py | 28 ++++++++ rickshaw/main.py | 156 ++++++++++++---------------------------- rickshaw/node_server.py | 26 ++++--- rickshaw/simspec.py | 28 +++++++- 4 files changed, 119 insertions(+), 119 deletions(-) diff --git a/rickshaw/deploy.py b/rickshaw/deploy.py index ffd28dd..c679d9a 100644 --- a/rickshaw/deploy.py +++ b/rickshaw/deploy.py @@ -5,6 +5,8 @@ import matplotlib.pyplot as plt; plt.rcdefaults() import numpy as np +from rickshaw import generate + def read_file(inputfile): dict = inputfile inst = dict['simulation']['region']['institution']['config']['DeployInst'] @@ -139,3 +141,29 @@ def test_schedule(inputfile, parameters): pgrow = demand_curve(pstart, rate, sumt) diff = calc_demand_error(pgrow, totalp) return diff + +def run_deploy(nsims, specific_spec): + i = 0; + min_diff = 1.0 + tempfile = {} + parameters = {} + while i < nsims: + try: + input_file = generate.generate(sim_spec=specific_spec) + if ns.v: + pprint(input_file) + jsonfile = str(i) + '.json' + diff = deploy.test_schedule(input_file, spec['parameters']) + if diff < min_diff: + min_diff = diff + tempfile = input_file + parameters = spec['parameters'] + if diff < 0.05: + with open(jsonfile, 'w') as jf: + json.dump(input_file, jf, indent=4) + except Exception as e: + i+=1 + with open('best.json', 'w') as jf: + json.dump(tempfile, jf, indent=4) + print('Best schedule match had a difference of: ' + str(min_diff)) + deploy.plot_total_power(tempfile, parameters) diff --git a/rickshaw/main.py b/rickshaw/main.py index 8acaa4d..7cc4221 100644 --- a/rickshaw/main.py +++ b/rickshaw/main.py @@ -1,11 +1,11 @@ """Main entry point for rickshaw""" -from argparse import ArgumentParser +import sys import os import subprocess import json import logging import traceback - +from argparse import ArgumentParser try: from pprintpp import pprint except ImportError: @@ -18,7 +18,7 @@ from rickshaw import deploy from rickshaw.generate import CYCLUS_EXECUTABLE -def main(args=None): +def make_parser(): p = ArgumentParser('rickshaw') p.add_argument('-n', dest='n', type=int, help='number of files to generate', default=None) @@ -32,57 +32,53 @@ def main(args=None): p.add_argument('-bn', dest='bn', type=int, help='number of nodes to run on if ran on blue waters', default=None) p.add_argument('-ppn', dest='ppn', type=int, help='number of processors per node for a blue waters run', default=None) p.add_argument('-d', dest = 'd', action="store_true", help='Build a deploy schedule to match the input file') + return p + +def run(specific_spec, ns, name): + try: + input_file = generate.generate(sim_spec=specific_spec) + except Exception as e: + message = traceback.format_exc() + logging.exception(message) + if ns.v: + pprint(input_file) + jsonfile = name + '.json' + try: + with open(jsonfile, 'w') as jf: + json.dump(input_file, jf, indent=4) + except Exception as e: + message = traceback.format_exc() + logging.exception(message) + try: + if ns.rs: + cmd = [CYCLUS_EXECUTABLE[:], jsonfile, '-o', ns.o +'.sqlite'] + logging.info(' '.join(cmd)) + out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, + universal_newlines=True) + if ns.rh: + cmd = [CYCLUS_EXECUTABLE[:], jsonfile, '-o', ns.o +'.h5'] + logging.info(' '.join(cmd)) + out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, + universal_newlines=True) + logging.info(out) + except Exception as e: + message = traceback.format_exc() + message += e.stdout + logging.exception(message) + +def main(args=None): + p = make_parser() ns = p.parse_args(args=args) - spec = {} input_file = "" if ns.i is not None: try: - ext = os.path.splitext(ns.i)[1] - if ext == '.json': - with open(ns.i) as jf: - spec = json.load(jf) - for k,v in simspec['niche_links'].items(): - spec['niche_links'][k] = set(v) - for k,v in simspec['archetypes'].items(): - spec['archetypes'][k] = set(v) - elif ext == '.py': - with open(ns.i) as pf: - py_str = pf.read() - spec = eval(py_str) - except: - print('Failed to parse richshaw input file, please verify file format') - pass + specific_spec = simspec.SimSpec.from_file(ns.i) + except Exception: + print('Simspec failed to build', file=sys.stderr) + else: + specific_spec = simspec.SimSpec() if ns.d: - i = 0; - min_diff = 1.0 - tempfile = {} - parameters = {} - while i < ns.n: - try: - specific_spec = simspec.SimSpec(spec) - except Exception: - print('Simspec failed to build') - try: - input_file = generate.generate(sim_spec=specific_spec) - if ns.v: - pprint(input_file) - jsonfile = str(i) + '.json' - diff = deploy.test_schedule(input_file, spec['parameters']) - if diff < min_diff: - min_diff = diff - tempfile = input_file - parameters = spec['parameters'] - if diff < 0.05: - with open(jsonfile, 'w') as jf: - json.dump(input_file, jf, indent=4) - except Exception as e: - message = traceback.format_exc() - logging.exception(message) - i+=1 - with open('best.json', 'w') as jf: - json.dump(tempfile, jf, indent=4) - print('Best schedule match had a difference of: ' + str(min_diff)) - deploy.plot_total_power(tempfile, parameters) + deploy.run_deploy(ns.n, specific_spec) return if ns.bn is not None: blue_waters.generate_scripts(ns.n, ns.ppn) @@ -96,68 +92,10 @@ def main(args=None): if ns.n is not None: i = 0 while i < ns.n: - try: - specific_spec = simspec.SimSpec(spec) - except Exception: - print('Simspec failed to build') - try: - input_file = generate.generate(sim_spec=specific_spec) - if ns.v: - pprint(input_file) - jsonfile = str(i) + '.json' - with open(jsonfile, 'w') as jf: - json.dump(input_file, jf, indent=4) - except Exception as e: - message = traceback.format_exc() - logging.exception(message) - try: - if ns.rs: - cmd = [CYCLUS_EXECUTABLE[:], jsonfile, '-o', ns.o +'.sqlite'] - logging.info(' '.join(cmd)) - out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, - universal_newlines=True) - if ns.rh: - cmd = [CYCLUS_EXECUTABLE[:], jsonfile, '-o', ns.o +'.h5'] - logging.info(' '.join(cmd)) - out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, - universal_newlines=True) - logging.info(out) - except Exception as e: - message = traceback.format_exc() - message += e.stdout - logging.exception(message) + run(specific_spec, ns, i) i += 1 else: - try: - specific_spec = simspec.SimSpec(spec) - input_file = generate.generate(sim_spec=specific_spec) - except Exception as e: - message = traceback.format_exc() - logging.exception(message) - if ns.v: - pprint(input_file) - jsonfile = ns.op + '.json' - try: - with open(jsonfile, 'w') as jf: - json.dump(input_file, jf, indent=4) - except Exception as e: - message = traceback.format_exc() - logging.exception(message) - try: - if ns.rs: - cmd = [CYCLUS_EXECUTABLE[:], jsonfile, '-o', ns.o +'.sqlite'] - logging.info(' '.join(cmd)) - out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, - universal_newlines=True) - if ns.rh: - cmd = [CYCLUS_EXECUTABLE[:], jsonfile, '-o', ns.o +'.h5'] - logging.info(' '.join(cmd)) - out = subprocess.check_output(cmd, stderr=subprocess.STDOUT, - universal_newlines=True) - logging.info(out) - except Exception as e: - message = traceback.format_exc() - logging.exception(message) + run(specific_spec, ns, ns.op) if __name__ == '__main__': diff --git a/rickshaw/node_server.py b/rickshaw/node_server.py index 8cffcb9..45776dd 100644 --- a/rickshaw/node_server.py +++ b/rickshaw/node_server.py @@ -1,10 +1,14 @@ """Asynchonous job creation for rickshaw for use on HPC systems.""" +import json import asyncio import concurrent.futures from asyncio.subprocess import create_subprocess_exec from argparse import ArgumentParser +from rickshaw.simspec import SimSpec +from rickshaw.generate import generate + def make_parser(): """Makes the argument parser for the rickshaw node server.""" p = ArgumentParser("rickshaw-node-server", description="Rickshaw Node Server CLI") @@ -18,21 +22,27 @@ def make_parser(): help="Node name") p.add_argument("-f", "--format", type=str, default="h5", dest="format", help="The format of output file, h5 or sqlite") + p.add_argument("-i", "--input", type=str, default=None, dest="i", + help="Input templating file for Rickshaw") return p -async def run_sim(output_q, filename): - inputfile = "" - p = await create_subprocess_exec("cyclus", "-o", filename, "-i", inputfile) - await p.wait() - await output_q.put(filename) +async def run_sim(output_q, filename, template): + try: + specific_spec = SimSpec(ni=False) if template is None else SimSpec.from_file(template) + input_dict = generate(sim_spec=specific_spec) + inputfile = json.dumps(input_dict) + p = await create_subprocess_exec("cyclus", "-o", filename, "-i", inputfile, "-f", "json") + await p.wait() + finally: + await output_q.put(filename) -async def run_sims(output_q, nsim): +async def run_sims(output_q, nsim, template): i = 0 pending_tasks = [] while i < nsim: while not output_q.empty() and i < nsim: filename = await output_q.get() - sim_task = asyncio.ensure_future(run_sim(output_q, filename)) + sim_task = asyncio.ensure_future(run_sim(output_q, filename, template)) pending_tasks.append(sim_task) i += 1 if len(pending_tasks) > 0: @@ -52,7 +62,7 @@ def main(args=None): if ns.debug: loop.set_debug(True) try: - loop.run_until_complete(run_sims(output_q, ns.nsim)) + loop.run_until_complete(run_sims(output_q, ns.nsim, ns.i)) finally: if not loop.is_closed(): loop.close() diff --git a/rickshaw/simspec.py b/rickshaw/simspec.py index 5b7e6c5..0008cb2 100644 --- a/rickshaw/simspec.py +++ b/rickshaw/simspec.py @@ -232,6 +232,7 @@ def read_input_def(obj, env): obj[v] = read_input_def(obj[v], env) return obj + class SimSpec(object): """ Manages any constraints placed on Rickshaw generation. @@ -275,7 +276,8 @@ class SimSpec(object): annotations : dict Container for archetype annotations. """ - def __init__(self, spec={}, ni=True): + def __init__(self, spec=None, ni=True): + spec = {} if spec is None else spec self.spec = copy.deepcopy(spec) self.customized = False self.control = choose_control() @@ -321,7 +323,29 @@ def __init__(self, spec={}, ni=True): self.facilities[obj['name']] = obj for key, value in self.facilities.items(): value = read_input_def(value, env) - self.facilities = self.facilities.values() + self.facilities = self.facilities.values() + + @classmethod + def from_file(cls, filename): + spec = {} + try: + ext = os.path.splitext(filename)[1] + if ext == '.json': + with open(filename) as jf: + spec = json.load(jf) + for k,v in simspec['niche_links'].items(): + spec['niche_links'][k] = set(v) + for k,v in simspec['archetypes'].items(): + spec['archetypes'][k] = set(v) + elif ext == '.py': + with open(filename) as pf: + py_str = pf.read() + spec = eval(py_str) + except: + print('Failed to parse richshaw input file, please verify file format') + pass + ss = cls(spec=spec) + return ss