Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/use registry #385

Merged
merged 24 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/radical/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from .typeddict import TypedDict, TypedDictMeta, as_dict
from .config import Config, DefaultConfig

from .zmq import Message
from .zmq import Bridge
from .zmq import Queue, Putter, Getter
from .zmq import PubSub, Publisher, Subscriber
Expand Down
2 changes: 1 addition & 1 deletion src/radical/utils/configs/utils_default.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"report" : "${RADICAL_DEFAULT_REPORT:TRUE}",
"report_tgt" : "${RADICAL_DEFAULT_REPORT_TGT:stderr}",
"report_dir" : "${RADICAL_DEFAULT_REPORT_DIR:$PWD}",
"profile" : "${RADICAL_DEFAULT_PROFILE:TRUE}",
"profile" : "${RADICAL_DEFAULT_PROFILE:FALSE}",
"profile_dir": "${RADICAL_DEFAULT_PROFILE_DIR:$PWD}"
}

41 changes: 30 additions & 11 deletions src/radical/utils/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ def __init__(self, uid, timeout, interval=1, beat_cb=None, term_cb=None,
if not self._log:
self._log = Logger('radical.utils.heartbeat')

self._log.debug('=== hb %s create', self._uid)


# --------------------------------------------------------------------------
#
def start(self):

self._log.debug('start heartbeat')
self._log.debug('=== hb %s start', self._uid)
self._watcher = mt.Thread(target=self._watch)
self._watcher.daemon = True
self._watcher.start()
Expand Down Expand Up @@ -101,41 +103,56 @@ def dump(self, log):
log.debug('hb dump %s: \n%s', self._uid, pprint.pformat(self._tstamps))


# --------------------------------------------------------------------------
#
def watch(self, uid):

with self._lock:
if uid not in self._tstamps:
self._log.debug('=== hb %s watch %s', self._uid, uid)
self._tstamps[uid] = None


# --------------------------------------------------------------------------
#
def _watch(self):

# initial heartbeat without delay
if self._beat_cb:
self._log.debug('=== hb %s beat cb init', self._uid)
self._beat_cb()

while not self._term.is_set():

self._log.debug('=== hb %s loop %s', self._uid, self._interval)

time.sleep(self._interval)
now = time.time()

if self._beat_cb:
self._log.debug('=== hb %s beat cb', self._uid)
self._beat_cb()

# avoid iteration over changing dict
with self._lock:
uids = list(self._tstamps.keys())

self._log.debug('=== hb %s uids %s', self._uid, uids)
for uid in uids:

# self._log.debug('hb %s check %s', self._uid, uid)
self._log.debug('=== hb %s check %s', self._uid, uid)

with self._lock:
last = self._tstamps.get(uid)

if last is None:
self._log.warn('hb %s[%s]: never seen', self._uid, uid)
self._log.warn('=== hb %s inval %s', self._uid, uid)
continue

if now - last > self._timeout:

if self._log:
self._log.warn('hb %s[%s]: %.1f - %.1f > %1.f: timeout',
self._log.warn('=== hb %s tout %s: %.1f - %.1f > %1.f',
self._uid, uid, now, last, self._timeout)

ret = None
Expand All @@ -148,9 +165,10 @@ def _watch(self):
# avoiding termination
ret = True

if ret is None:
if ret in [None, False]:
# could not recover: abandon mothership
self._log.warn('hb fail %s: fatal (%d)', uid, self._pid)
self._log.warn('=== hb %s fail %s: fatal (%d)',
self._uid, uid, self._pid)
os.kill(self._pid, signal.SIGTERM)
time.sleep(1)
os.kill(self._pid, signal.SIGKILL)
Expand All @@ -161,8 +179,9 @@ def _watch(self):
# information for the old uid and register a new
# heartbeat for the new one, so that we can immediately
# begin to watch it.
self._log.info('hb recover %s -> %s (%s)',
uid, ret, self._term_cb)
assert isinstance(ret, str)
self._log.info('=== hb %s recov %s -> %s (%s)',
self._uid, uid, ret, self._term_cb)
with self._lock:
del self._tstamps[uid]
self._tstamps[ret] = time.time()
Expand All @@ -178,8 +197,8 @@ def beat(self, uid=None, timestamp=None):
if not uid:
uid = 'default'

# self._log.debug('hb %s beat [%s]', self._uid, uid)
with self._lock:
self._log.debug('hb %s beat [%s]', self._uid, uid)
self._tstamps[uid] = timestamp


Expand Down Expand Up @@ -233,11 +252,11 @@ def wait_startup(self, uids=None, timeout=None):
self._log.debug('wait time: %s', nok)
break

time.sleep(0.05)
time.sleep(0.25)

if len(ok) != len(uids):
nok = [uid for uid in uids if uid not in ok]
self._log.debug('wait fail: %s', nok)
self._log.error('wait fail: %s', nok)
return nok

else:
Expand Down
6 changes: 3 additions & 3 deletions src/radical/utils/ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def reset_counter(self, prefix, reset_all_others=False):

# ------------------------------------------------------------------------------
#
def generate_id(prefix, mode=ID_SIMPLE, ns=None):
def generate_id(prefix: str, mode=ID_SIMPLE, ns=None):
"""
Generate a human readable, sequential ID for the given prefix.

Expand Down Expand Up @@ -183,8 +183,8 @@ def generate_id(prefix, mode=ID_SIMPLE, ns=None):
and will, for `ID_PRIVATE`, revert to `ID_UUID`.
"""

if not prefix or not isinstance(prefix, str):
raise TypeError("ID generation expect prefix in basestring type")
if not isinstance(prefix, str):
raise TypeError('"prefix" must be a string, not %s' % type(prefix))

if _cache['dockerized'] and mode == ID_PRIVATE:
mode = ID_UUID
Expand Down
1 change: 1 addition & 0 deletions src/radical/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ def _ensure_handler(self):
p = self._path
n = self._name
for t in self._targets:

if t in ['0', 'null'] : h = logging.NullHandler()
elif t in ['-', '1', 'stdout']: h = ColorStreamHandler(sys.stdout)
elif t in ['=', '2', 'stderr']: h = ColorStreamHandler(sys.stderr)
Expand Down
4 changes: 4 additions & 0 deletions src/radical/utils/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ def __init__(self, name, ns=None, path=None):
except OSError:
pass # already exists

# don't open the file on disabled profilers
if not self._enabled:
return

# we set `buffering` to `1` to force line buffering. That is not idea
# performance wise - but will not do an `fsync()` after writes, so OS
# level buffering should still apply. This is supposed to shield
Expand Down
21 changes: 14 additions & 7 deletions src/radical/utils/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def sh_quote(data):

# ------------------------------------------------------------------------------
#
def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):
def sh_callout(cmd, stdout=True, stderr=True,
shell=False, env=None, cwd=None):
'''
call a shell command, return `[stdout, stderr, retval]`.
'''
Expand All @@ -54,7 +55,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):
if stderr : stderr = sp.PIPE
else : stderr = None

p = sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env)
p = sp.Popen(cmd, stdout=stdout, stderr=stderr,
shell=shell, env=env, cwd=cwd)

if not stdout and not stderr:
ret = p.wait()
Expand All @@ -67,7 +69,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):

# ------------------------------------------------------------------------------
#
def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None):
def sh_callout_bg(cmd, stdout=None, stderr=None,
shell=False, env=None, cwd=None):
'''
call a shell command in the background. Do not attempt to pipe STDOUT/ERR,
but only support writing to named files.
Expand All @@ -84,15 +87,15 @@ def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None):
# convert string into arg list if needed
if not shell and is_string(cmd): cmd = shlex.split(cmd)

sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env)
sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env, cwd=cwd)

return


# ------------------------------------------------------------------------------
#
def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True,
shell=False, env=None):
shell=False, env=None, cwd=None):
'''

Run a command, and capture stdout/stderr if so flagged. The call will
Expand All @@ -110,6 +113,9 @@ def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True,
shell: True, False [default]
- pass to popen

cwd: string
- working directory for command to run in

PROC:
- PROC.stdout : `queue.Queue` instance delivering stdout lines
- PROC.stderr : `queue.Queue` instance delivering stderr lines
Expand All @@ -133,7 +139,7 @@ class _P(object):
'''

# ----------------------------------------------------------------------
def __init__(self, cmd, stdin, stdout, stderr, shell, env):
def __init__(self, cmd, stdin, stdout, stderr, shell, env, cwd):

cmd = cmd.strip()

Expand Down Expand Up @@ -165,6 +171,7 @@ def __init__(self, cmd, stdin, stdout, stderr, shell, env):
stderr=self._err_w,
shell=shell,
env=env,
cwd=cwd,
bufsize=1)

t = mt.Thread(target=self._watch)
Expand Down Expand Up @@ -277,7 +284,7 @@ def _watch(self):
# --------------------------------------------------------------------------

return _P(cmd=cmd, stdin=stdin, stdout=stdout, stderr=stderr,
shell=shell, env=env)
shell=shell, env=env, cwd=cwd)


# ------------------------------------------------------------------------------
Expand Down
14 changes: 11 additions & 3 deletions src/radical/utils/typeddict.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class TypedDict(dict, metaclass=TypedDictMeta):

# --------------------------------------------------------------------------
#
def __init__(self, from_dict=None):
def __init__(self, from_dict=None, **kwargs):
'''
Create a typed dictionary (tree) from `from_dict`.

Expand All @@ -131,10 +131,19 @@ def __init__(self, from_dict=None):
verify

Names with a leading underscore are not supported.

Supplied `from_dict` and kwargs are used to initialize the object
data -- the `kwargs` take preceedence over the `from_dict` if both
are specified (note that `from_dict` and `self` are invalid
`kwargs`).
'''

self.update(copy.deepcopy(self._defaults))
self.update(from_dict)

if kwargs:
self.update(kwargs)


# --------------------------------------------------------------------------
#
Expand Down Expand Up @@ -297,8 +306,7 @@ def __str__(self):
return str(self._data)

def __repr__(self):
return '<%s object, schema keys: %s>' % \
(type(self).__qualname__, tuple(self._schema.keys()))
return '%s: %s' % (type(self).__qualname__, str(self))


# --------------------------------------------------------------------------
Expand Down
5 changes: 3 additions & 2 deletions src/radical/utils/zmq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@


from .bridge import Bridge
from .queue import Queue, Putter, Getter
from .pubsub import PubSub, Publisher, Subscriber
from .queue import Queue, Putter, Getter, test_queue
from .pubsub import PubSub, Publisher, Subscriber, test_pubsub
from .pipe import Pipe, MODE_PUSH, MODE_PULL
from .client import Client
from .server import Server
from .registry import Registry, RegistryClient
from .message import Message


# ------------------------------------------------------------------------------
Expand Down
31 changes: 23 additions & 8 deletions src/radical/utils/zmq/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@

import threading as mt

from typing import Optional

from ..logger import Logger
from ..profile import Profiler
from ..config import Config
from ..json_io import read_json, write_json

QUEUE = 'QUEUE'
PUBSUB = 'PUBSUB'
UNKNOWN = 'UNKNOWN'


# ------------------------------------------------------------------------------
#
Expand Down Expand Up @@ -43,13 +49,14 @@ def __init__(self, cfg):
self._channel = self._cfg.channel
self._uid = self._cfg.uid
self._pwd = self._cfg.path

if not self._pwd:
self._pwd = os.getcwd()
mtitov marked this conversation as resolved.
Show resolved Hide resolved

self._log = Logger(name=self._uid, ns='radical.utils',
level=self._cfg.log_lvl, path=self._pwd)
self._prof = Profiler(name=self._uid, path=self._pwd)

if self._pwd is None:
self._pwd = os.getcwd()

if 'hb' in self._uid or 'heartbeat' in self._uid:
self._prof.disable()
else:
Expand Down Expand Up @@ -131,25 +138,33 @@ def start(self):
# --------------------------------------------------------------------------
#
@staticmethod
def create(cfg):
def create(channel : str,
kind : Optional[str] = None,
cfg : Optional[dict] = None):

# FIXME: add other config parameters: batch size, log level, etc.

# NOTE: I'd rather have this as class data than as stack data, but
# python stumbles over circular imports at that point :/
# Another option though is to discover and dynamically load
# components.

from .pubsub import PubSub
from .queue import Queue

_btypemap = {'pubsub' : PubSub,
'queue' : Queue}
_btypemap = {PUBSUB: PubSub,
QUEUE : Queue}

kind = cfg['kind']
if not kind:
if 'queue' in channel.lower(): kind = QUEUE
elif 'pubsub' in channel.lower(): kind = PUBSUB
else : kind = UNKNOWN

if kind not in _btypemap:
raise ValueError('unknown bridge type (%s)' % kind)

btype = _btypemap[kind]
bridge = btype(cfg)
bridge = btype(channel, cfg=cfg)

return bridge

Expand Down
Loading