diff --git a/src/radical/utils/env.py b/src/radical/utils/env.py index c6ac974a..a177f4df 100644 --- a/src/radical/utils/env.py +++ b/src/radical/utils/env.py @@ -389,7 +389,7 @@ def env_prep(environment : Optional[Dict[str,str]] = None, raise RuntimeError('error running "%s": %s' % (cmd, err)) env = env_read_lines(out.split('\n')) - # os.unlink(tmp_name) + os.unlink(tmp_name) _env_cache[cache_md5] = env @@ -402,7 +402,6 @@ def env_prep(environment : Optional[Dict[str,str]] = None, # # FIXME: files could also be cached and re-used (copied or linked) if script_path: - env_write(script_path, env=env, unset=unset, pre_exec=pre_exec) return env diff --git a/src/radical/utils/flux.py b/src/radical/utils/flux.py index 7d499e47..ace552f8 100644 --- a/src/radical/utils/flux.py +++ b/src/radical/utils/flux.py @@ -4,8 +4,6 @@ import os import time import json -import errno -import queue from typing import Optional, List, Dict, Any, Callable @@ -19,7 +17,7 @@ from .logger import Logger from .profile import Profiler from .modules import import_module -from .misc import as_list, ru_open +from .misc import ru_open from .host import get_hostname from .debug import get_stacktrace @@ -38,19 +36,17 @@ def __init__(self, uid : str, log : Logger, prof : Profiler) -> None: - self._uid = uid - self._log = log - self._prof = prof + self._uid = uid + self._log = log + self._prof = prof - self._lock = mt.RLock() - self._term = mt.Event() + self._lock = mt.RLock() + self._term = mt.Event() - self._uri = None - self._env = None - self._proc = None - self._watcher = None - self._listener = None - self._callbacks = list() + self._uri = None + self._env = None + self._proc = None + self._watcher = None try: self._flux = import_module('flux') @@ -61,7 +57,6 @@ def __init__(self, uid : str, raise - # -------------------------------------------------------------------------- # @property @@ -129,25 +124,16 @@ def _locked_start_service(self, env: Optional[Dict[str,str]] = None ) -> Optional[str]: - check = 'flux env; echo "OK"; while true; do echo "ok"; sleep 1; done' - start = 'flux start -o,-v,-S,log-filename=%s.log' % self._uid - cmd = '/bin/bash -c "echo \\\"%s\\\" | %s"' % (check, start) + cmd = ['flux', 'start', 'bash', '-c', 'echo URI:$FLUX_URI && sleep inf'] - penv = None - if env: - penv = {k:v for k,v in os.environ.items()} - for k,v in env.items(): - penv[k] = v - - flux_env = dict() - flux_proc = sp.Popen(cmd, shell=True, env=penv, - stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.STDOUT) + flux_proc = sp.Popen(cmd, encoding="utf-8", + stdin=sp.DEVNULL, stdout=sp.PIPE, stderr=sp.PIPE) + flux_env = dict() while flux_proc.poll() is None: try: line = flux_proc.stdout.readline() - line = bytes.decode(line, 'utf-8').strip() except Exception as e: self._log.exception('flux service failed to start') @@ -156,22 +142,23 @@ def _locked_start_service(self, if not line: continue - self._log.debug('%s', line) - - if line.startswith('export '): - k, v = line.split(' ', 1)[1].strip().split('=', 1) - flux_env[k] = v.strip('"') - self._log.debug('%s = %s' % (k, v.strip('"'))) + self._log.debug('flux output: %s', line) - elif line == 'OK': + if line.startswith('URI:'): + flux_uri = line.split(':', 1)[1].strip() + flux_env['FLUX_URI'] = flux_uri break if flux_proc.poll() is not None: raise RuntimeError('could not execute `flux start`') + # fr = self._flux.uri.uri.FluxURIResolver() + # ret = fr.resolve('pid:%d' % flux_proc.pid) + # flux_env = {'FLUX_URI': ret} + assert 'FLUX_URI' in flux_env, 'no FLUX_URI in env' - # make sure that the flux url can be reched from other hosts + # make sure that the flux url can be reached from other hosts # FIXME: this also routes local access via ssh which may slow comm flux_url = Url(flux_env['FLUX_URI']) flux_url.host = get_hostname() @@ -182,8 +169,6 @@ def _locked_start_service(self, self._uri = flux_uri self._env = flux_env self._proc = flux_proc - self._handles = list() - self._executors = list() self._prof.prof('flux_started', msg=self._uid) @@ -223,11 +208,8 @@ def close_service(self) -> None: if not self._proc: raise RuntimeError('cannot kill flux from this process') - # terminate watcher and listener - self._term.set() - - if self._listener: self._listener.join() - if self._watcher: self._watcher.join() + if self._watcher: + self._watcher.join() # terminate the service process # FIXME: send termination signal to flux for cleanup @@ -249,18 +231,6 @@ class FluxHelper(object): update events for flux jobs known in that instance. ''' - # list of reported flux events - _event_list = [ - 'NEW', - 'DEPEND', - 'SCHED', - 'RUN', - 'CLEANUP', - 'INACTIVE', - ] - - - # -------------------------------------------------------------------------- # def __init__(self) -> None: @@ -317,12 +287,9 @@ def __init__(self) -> None: self._prof = Profiler(self._uid, ns='radical.utils') self._lock = mt.RLock() - self._term = mt.Event() - self._listener = None - self._callbacks = list() - self._queue = queue.Queue() self._exe = None + self._handle = None self._handles = list() # TODO self._executors = list() # TODO @@ -354,14 +321,11 @@ def reset(self): with self._lock: - if self._listener: - self._term.set() - - for handle in self._handles: - del handle + for idx in range(len(self._handles)): + del self._handles[idx] - for exe in self._executors: - del exe + for idx in range(len(self._executors)): + del self._executors[idx] self._exe = None self._handle = None @@ -427,18 +391,17 @@ def start_flux(self) -> None: if self._uri: raise RuntimeError('service already connected: %s' % self._uri) - with ru_open(self._uid + '.dump', 'a') as fout: - fout.write('starting ' + str(os.getpid()) + '\n') - for l in get_stacktrace(): - fout.write(l + '\n') - self._service = _FluxService(self._uid, self._log, self._prof) self._service.start_service() - self._uri = self._service.check_service() self._env = self._service.env + # with ru_open(self._uid + '.dump', 'a') as fout: + # fout.write('start flux pid %d: %s\n' % (os.getpid(), self._uri)) + # for l in get_stacktrace()[:-1]: + # fout.write(l) + self._setup() @@ -452,11 +415,10 @@ def connect_flux(self, uri : Optional[str] = None) -> None: with self._lock: with ru_open(self._uid + '.dump', 'a') as fout: - fout.write('connecting ' + str(os.getpid()) + '\n') + fout.write('connect flux %d: %s\n' % (os.getpid(), uri)) for l in get_stacktrace(): fout.write(l + '\n') - if self._uri: raise RuntimeError('service already connected: %s' % self._uri) @@ -478,146 +440,23 @@ def connect_flux(self, uri : Optional[str] = None) -> None: # def _setup(self): ''' - Once a service is connected, create one handle and start a listener - thread on it to serve any registered callback + Once a service is connected, create a handle and executor ''' with self._lock: assert self._uri, 'not initialized' - # start a listener thread so that we can serve callbacks - self._term.clear() - self._listener = mt.Thread(target=self._listen) - self._listener.daemon = True - self._listener.start() - # create a executor and handle for job management self._exe = self.get_executor() self._handle = self.get_handle() - # -------------------------------------------------------------------------- - # - def register_callback(self, - cb : Callable[[str, str, float, dict], None] - ) -> None: - ''' - Register a callable to be fired when a flux event is collected. The - callable MUST have the following signature : - - def cb(job_id : str, # job which triggered event - event_name : str, # name of event (usually job state) - ts : float, # event creation timestamp - context : dict) # event meta data - - ''' - - with self._lock: - - self._log.debug('register cb %s', cb) - self._callbacks.append(cb) - - - # -------------------------------------------------------------------------- - # - def unregister_callback(self, - cb : Callable[[str, str, float, dict], None] - ) -> None: - ''' - unregister a callback which previously was added via `register_callback` - ''' - - with self._lock: - - self._log.debug('unregister cb %s', cb) - self._callbacks.remove(cb) - - - - # ---------------------------------------------------------------------- - # - def _listen(self) -> None: - ''' - collect events from the connected Flux instance, and invoke any - registered callbacks for each event. - - NOTE: we handle `INACTIVE` separately: we will wait for the respective - job to finish to ensure cleanup and stdio flush - ''' - - self._log.debug('listen for events') - handle = None - try: - handle = self.get_handle() - handle.event_subscribe('job-state') - - while not self._term.is_set(): - - # FIXME: how can recv be timed out or interrupted after work - # completed? - event = handle.event_recv() - - if 'transitions' not in event.payload: - self._log.warn('unexpected flux event: %s' % - event.payload) - continue - - transitions = as_list(event.payload['transitions']) - - for event in transitions: - - self._log.debug('event: %s', event) - job_id, event_name, ts = event - - if event_name not in self._event_list: - # we are not interested in this event - continue - - with self._lock: - try: - for cb in self._callbacks: - context = dict() - - if event_name == 'INACTIVE': - context = self._flux_job.event_wait( - handle, job_id, "finish").context - cb(job_id, event_name, ts, context) - except: - self._log.exception('cb error') - - - except OSError as e: - - if e.errno == errno.EIO: - # flux terminated - self._log.info('connection lost, stop listening') - handle = None - - else: - self._log.exception('Error in listener loop') - - - except Exception: - - self._log.exception('Error in listener loop') - - - finally: - - # disconnect from the Flux instance on any event collection errors - if handle: - handle.event_unsubscribe('job-state') - del handle - - self.reset() - - # -------------------------------------------------------------------------- # def submit_jobs(self, specs: List[Dict[str, Any]], - cb : Optional[Callable[[str, str, float, dict], None]] = None + cb : Optional[Callable[[str, Any], None]] = None ) -> Any: with self._lock: @@ -627,45 +466,37 @@ def submit_jobs(self, assert self._exe, 'no executor' - def jid_cb(fut, evt): - try: - jid = fut.jobid(timeout=0.1) - self._queue.put(jid) - except: - self._log.exception('flux cb failed') - self._queue.put(None) - - + futures = list() for spec in specs: jobspec = json.dumps(spec) - fut = self._exe.submit(jobspec, waitable=False) - self._log.debug('submit: %s', fut) - fut.add_event_callback('submit', jid_cb) + fut = self._flux_job.submit_async(self._handle, jobspec) + futures.append(fut) + + ids = list() + for fut in futures: + flux_id = fut.get_id() + ids.append(flux_id) + self._log.debug('submit: %s', flux_id) if cb: - def app_cb(fut, evt): + def app_cb(fut, event): try: - jid = fut.jobid() - cb(jid, evt.name, evt.timestamp, evt.context) + cb(flux_id, event) except: self._log.exception('app cb failed') for ev in [ 'submit', - # 'alloc', + 'alloc', 'start', 'finish', 'release', # 'free', - 'clean', + # 'clean', 'exception', ]: fut.add_event_callback(ev, app_cb) - ids = list() - for spec in specs: - ids.append(self._queue.get()) - self._log.debug('submitted: %s', ids) return ids @@ -674,10 +505,9 @@ def app_cb(fut, evt): # def attach_jobs(self, ids: List[int], - cb : Optional[Callable[[str, str, float, dict], None]] = None + cb : Optional[Callable[[int, Any], None]] = None ) -> Any: - states = list() with self._lock: if not self._uri: @@ -688,30 +518,27 @@ def attach_jobs(self, for flux_id in ids: fut = self._exe.attach(flux_id) - states.append(fut.state()) self._log.debug('attach %s : %s', flux_id, fut) if cb: - def app_cb(fut, evt): + def app_cb(fut, event): try: - cb(str(flux_id), evt.name, evt.timestamp, evt.context) + cb(flux_id, event) except: self._log.exception('app cb failed') for ev in [ 'submit', - # 'alloc', + 'alloc', 'start', 'finish', 'release', # 'free', - 'clean', + # 'clean', 'exception', ]: fut.add_event_callback(ev, app_cb) - return states - # -------------------------------------------------------------------------- # diff --git a/src/radical/utils/heartbeat.py b/src/radical/utils/heartbeat.py index 4f0c5edd..bab73d4f 100644 --- a/src/radical/utils/heartbeat.py +++ b/src/radical/utils/heartbeat.py @@ -109,7 +109,7 @@ def watch(self, uid): with self._lock: if uid not in self._tstamps: - # self._log.debug('=== hb %s watch %s', self._uid, uid) + # self._log.debug('hb %s watch %s', self._uid, uid) self._tstamps[uid] = None @@ -119,41 +119,41 @@ def _watch(self): # initial heartbeat without delay if self._beat_cb: - # self._log.debug('=== hb %s beat cb init', self._uid) + # self._log.debug('hb %s beat cb init', self._uid) self._beat_cb() while not self._term.is_set(): - # self._log.debug('=== hb %s loop %s', self._uid, self._interval) + # self._log.debug('hb %s loop %s', self._uid, self._interval) time.sleep(self._interval) now = time.time() if self._beat_cb: - # self._log.debug('=== hb %s beat cb', self._uid) + # self._log.debug('hb %s beat cb', self._uid) self._beat_cb() # avoid iteration over changing dict with self._lock: uids = list(self._tstamps.keys()) - # self._log.debug('=== hb %s uids %s', self._uid, uids) + # self._log.debug('hb %s uids %s', self._uid, uids) for uid in uids: - # self._log.debug('=== hb %s check %s', self._uid, uid) + # self._log.debug('hb %s check %s', self._uid, uid) with self._lock: last = self._tstamps.get(uid) if last is None: - self._log.warn('=== hb %s inval %s', self._uid, uid) + self._log.warn('hb %s inval %s', self._uid, uid) continue if now - last > self._timeout: - if self._log: - self._log.warn('=== hb %s tout %s: %.1f - %.1f > %1.f', - self._uid, uid, now, last, self._timeout) + # if self._log: + # self._log.warn('hb %s tout %s: %.1f - %.1f > %1.f', + # self._uid, uid, now, last, self._timeout) ret = None if self._timeout: @@ -167,7 +167,7 @@ def _watch(self): if ret in [None, False]: # could not recover: abandon mothership - self._log.warn('=== hb %s fail %s: fatal (%d)', + self._log.warn('hb %s fail %s: fatal (%d)', self._uid, uid, self._pid) os.kill(self._pid, signal.SIGTERM) time.sleep(1) @@ -180,7 +180,7 @@ def _watch(self): # heartbeat for the new one, so that we can immediately # begin to watch it. assert isinstance(ret, str) - self._log.info('=== hb %s recov %s -> %s (%s)', + self._log.info('hb %s recov %s -> %s (%s)', self._uid, uid, ret, self._term_cb) with self._lock: del self._tstamps[uid] @@ -198,7 +198,7 @@ def beat(self, uid=None, timestamp=None): uid = 'default' with self._lock: - # self._log.debug('=== hb %s beat [%s]', self._uid, uid) + # self._log.debug('hb %s beat [%s]', self._uid, uid) self._tstamps[uid] = timestamp diff --git a/src/radical/utils/zmq/utils.py b/src/radical/utils/zmq/utils.py index 72c60a7f..4c39445b 100644 --- a/src/radical/utils/zmq/utils.py +++ b/src/radical/utils/zmq/utils.py @@ -133,11 +133,11 @@ def log_bulk(log, token, msgs): if isinstance(msgs[0], dict) and 'uid' in msgs[0]: for msg in msgs: - log.debug("=== %s: %s [%s]", token, msg['uid'], msg.get('state')) + log.debug("%s: %s [%s]", token, msg['uid'], msg.get('state')) else: for msg in msgs: - log.debug("=== %s: %s", token, str(msg)[0:32]) + log.debug("%s: %s", token, str(msg)[0:32]) # ------------------------------------------------------------------------------