Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/use registry #385

Merged
merged 24 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/radical/utils/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ def wait_startup(self, uids=None, timeout=None):
self._log.debug('wait time: %s', nok)
break

time.sleep(0.05)
time.sleep(0.25)

if len(ok) != len(uids):
nok = [uid for uid in uids if uid not in ok]
Expand Down
1 change: 1 addition & 0 deletions src/radical/utils/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ def _ensure_handler(self):
p = self._path
n = self._name
for t in self._targets:

if t in ['0', 'null'] : h = logging.NullHandler()
elif t in ['-', '1', 'stdout']: h = ColorStreamHandler(sys.stdout)
elif t in ['=', '2', 'stderr']: h = ColorStreamHandler(sys.stderr)
Expand Down
21 changes: 14 additions & 7 deletions src/radical/utils/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def sh_quote(data):

# ------------------------------------------------------------------------------
#
def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):
def sh_callout(cmd, stdout=True, stderr=True,
shell=False, env=None, cwd=None):
'''
call a shell command, return `[stdout, stderr, retval]`.
'''
Expand All @@ -54,7 +55,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):
if stderr : stderr = sp.PIPE
else : stderr = None

p = sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env)
p = sp.Popen(cmd, stdout=stdout, stderr=stderr,
shell=shell, env=env, cwd=cwd)

if not stdout and not stderr:
ret = p.wait()
Expand All @@ -67,7 +69,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None):

# ------------------------------------------------------------------------------
#
def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None):
def sh_callout_bg(cmd, stdout=None, stderr=None,
shell=False, env=None, cwd=None):
'''
call a shell command in the background. Do not attempt to pipe STDOUT/ERR,
but only support writing to named files.
Expand All @@ -84,15 +87,15 @@ def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None):
# convert string into arg list if needed
if not shell and is_string(cmd): cmd = shlex.split(cmd)

sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env)
sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env, cwd=cwd)

return


# ------------------------------------------------------------------------------
#
def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True,
shell=False, env=None):
shell=False, env=None, cwd=None):
'''

Run a command, and capture stdout/stderr if so flagged. The call will
Expand All @@ -110,6 +113,9 @@ def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True,
shell: True, False [default]
- pass to popen

cwd: string
- working directory for command to run in

PROC:
- PROC.stdout : `queue.Queue` instance delivering stdout lines
- PROC.stderr : `queue.Queue` instance delivering stderr lines
Expand All @@ -133,7 +139,7 @@ class _P(object):
'''

# ----------------------------------------------------------------------
def __init__(self, cmd, stdin, stdout, stderr, shell, env):
def __init__(self, cmd, stdin, stdout, stderr, shell, env, cwd):

cmd = cmd.strip()

Expand Down Expand Up @@ -165,6 +171,7 @@ def __init__(self, cmd, stdin, stdout, stderr, shell, env):
stderr=self._err_w,
shell=shell,
env=env,
cwd=cwd,
bufsize=1)

t = mt.Thread(target=self._watch)
Expand Down Expand Up @@ -277,7 +284,7 @@ def _watch(self):
# --------------------------------------------------------------------------

return _P(cmd=cmd, stdin=stdin, stdout=stdout, stderr=stderr,
shell=shell, env=env)
shell=shell, env=env, cwd=cwd)


# ------------------------------------------------------------------------------
Expand Down
31 changes: 23 additions & 8 deletions src/radical/utils/zmq/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,17 @@

import threading as mt

from typing import Optional

from ..logger import Logger
from ..profile import Profiler
from ..config import Config
from ..json_io import read_json, write_json

QUEUE = 'QUEUE'
PUBSUB = 'PUBSUB'
UNKNOWN = 'UNKNOWN'


# ------------------------------------------------------------------------------
#
Expand Down Expand Up @@ -43,13 +49,14 @@ def __init__(self, cfg):
self._channel = self._cfg.channel
self._uid = self._cfg.uid
self._pwd = self._cfg.path

if not self._pwd:
self._pwd = os.getcwd()
mtitov marked this conversation as resolved.
Show resolved Hide resolved

self._log = Logger(name=self._uid, ns='radical.utils',
level=self._cfg.log_lvl, path=self._pwd)
self._prof = Profiler(name=self._uid, path=self._pwd)

if self._pwd is None:
self._pwd = os.getcwd()

if 'hb' in self._uid or 'heartbeat' in self._uid:
self._prof.disable()
else:
Expand Down Expand Up @@ -131,25 +138,33 @@ def start(self):
# --------------------------------------------------------------------------
#
@staticmethod
def create(cfg):
def create(channel : str,
kind : Optional[str] = None,
cfg : Optional[dict] = None):

# FIXME: add other config parameters: batch size, log level, etc.

# NOTE: I'd rather have this as class data than as stack data, but
# python stumbles over circular imports at that point :/
# Another option though is to discover and dynamically load
# components.

from .pubsub import PubSub
from .queue import Queue

_btypemap = {'pubsub' : PubSub,
'queue' : Queue}
_btypemap = {PUBSUB: PubSub,
QUEUE : Queue}

kind = cfg['kind']
if not kind:
if 'queue' in channel.lower(): kind = QUEUE
elif 'pubsub' in channel.lower(): kind = PUBSUB
else : kind = UNKNOWN

if kind not in _btypemap:
raise ValueError('unknown bridge type (%s)' % kind)

btype = _btypemap[kind]
bridge = btype(cfg)
bridge = btype(channel, cfg=cfg)

return bridge

Expand Down
26 changes: 14 additions & 12 deletions src/radical/utils/zmq/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

import threading as mt

from typing import Optional

from ..atfork import atfork
from ..config import Config
from ..ids import generate_id, ID_CUSTOM
from ..url import Url
from ..misc import is_string, as_string, as_bytes, as_list, noop
from ..misc import as_string, as_bytes, as_list, noop
from ..host import get_hostip
from ..logger import Logger
from ..profile import Profiler
Expand Down Expand Up @@ -45,19 +47,19 @@ class PubSub(Bridge):

# --------------------------------------------------------------------------
#
def __init__(self, cfg=None, channel=None):

if cfg and not channel and is_string(cfg):
# allow construction with only channel name
channel = cfg
cfg = None
def __init__(self, channel: str, cfg: Optional[dict] = None):

if cfg : cfg = Config(cfg=cfg)
elif channel: cfg = Config(cfg={'channel': channel})
else: raise RuntimeError('PubSub needs cfg or channel parameter')
if cfg:
# create deep copy
cfg = Config(cfg=cfg)
else:
cfg = Config()

if not cfg.channel:
raise ValueError('no channel name provided for pubsub')
# ensure channel is set in config
if cfg.channel:
assert cfg.channel == channel
else:
cfg.channel = channel

if not cfg.uid:
cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel,
Expand Down
34 changes: 19 additions & 15 deletions src/radical/utils/zmq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import threading as mt

from typing import Optional

from ..atfork import atfork
from ..config import Config
from ..ids import generate_id, ID_CUSTOM
from ..url import Url
from ..misc import is_string, as_string, as_bytes, as_list, noop
from ..misc import as_string, as_bytes, as_list, noop
from ..host import get_hostip
from ..logger import Logger
from ..profile import Profiler
Expand All @@ -20,11 +22,11 @@
from .bridge import Bridge
from .utils import no_intr

# NOTE: the log bulk method is frequently called and slow
# from .utils import log_bulk
# from .utils import prof_bulk


# FIXME: the log bulk method is frequently called and slow

# --------------------------------------------------------------------------
#
Expand Down Expand Up @@ -85,7 +87,7 @@ def _atfork_child():
#
class Queue(Bridge):

def __init__(self, cfg=None, channel=None):
def __init__(self, channel: str, cfg: Optional[dict] = None):
'''
This Queue type sets up an zmq channel of this kind:

Expand All @@ -106,17 +108,17 @@ def __init__(self, cfg=None, channel=None):
addresses as obj.addr_put and obj.addr_get.
'''

if cfg and not channel and is_string(cfg):
# allow construction with only channel name
channel = cfg
cfg = None

if cfg : cfg = Config(cfg=cfg)
elif channel: cfg = Config(cfg={'channel': channel})
else: raise RuntimeError('Queue needs cfg or channel parameter')
if cfg:
# create deep copy
cfg = Config(cfg=cfg)
else:
cfg = Config()

if not cfg.channel:
raise ValueError('no channel name provided for queue')
# ensure channel is set in config
if cfg.channel:
assert cfg.channel == channel
else:
cfg.channel = channel

if not cfg.uid:
cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel,
Expand Down Expand Up @@ -240,7 +242,7 @@ def _bridge_work(self):
msgs = msgpack.unpackb(data[1])
# prof_bulk(self._prof, 'poll_put_recv', msgs)
# log_bulk(self._log, '<> %s' % qname, msgs)
self._log.debug('put %s: %s ! ', qname, len(msgs))
# self._log.debug('put %s: %s ! ', qname, len(msgs))

if qname not in buf:
buf[qname] = list()
Expand Down Expand Up @@ -270,6 +272,8 @@ def _bridge_work(self):
if qname in buf:
msgs = buf[qname][:self._bulk_size]
else:
# self._log.debug('get: %s not in %s', qname,
# list(buf.keys()))
msgs = list()

# log_bulk(self._log, '>< %s' % qname, msgs)
Expand Down Expand Up @@ -412,6 +416,7 @@ def _get_nowait(url, qname=None, timeout=None, uid=None): # timeout in ms
if not info['requested']:

# send the request *once* per recieval (got lock above)
# FIXME: why is this sent repeatedly?
# logger.debug('=== => from %s[%s]', uid, qname)
no_intr(info['socket'].send, as_bytes(qname))
info['requested'] = True
Expand Down Expand Up @@ -460,7 +465,6 @@ def _listener(url, qname=None, uid=None):
continue

msgs = Getter._get_nowait(url, qname=qname, timeout=500, uid=uid)

BULK = True
if msgs:

Expand Down
Loading