From 40b64a1583ce18334f2a301e77964d48f9877ce0 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 22 Mar 2023 17:18:29 +0100 Subject: [PATCH 01/19] avoid pytest warning --- tests/unittests/test_modules.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unittests/test_modules.py b/tests/unittests/test_modules.py index 6b964cd0a..53fd7fe2b 100644 --- a/tests/unittests/test_modules.py +++ b/tests/unittests/test_modules.py @@ -13,7 +13,7 @@ # ------------------------------------------------------------------------------ # -class TestDict(ru.TypedDict): +class TempDict(ru.TypedDict): pass @@ -57,7 +57,7 @@ def hello(self): # def test_load_class_with_type(self): - f = ru.load_class(fpath=__file__, cname='TestDict', ctype=ru.TypedDict) + f = ru.load_class(fpath=__file__, cname='TempDict', ctype=ru.TypedDict) self.assertIsInstance(f(), ru.TypedDict) # ------------------------------------------------------------------------------ From d59ba51440d983e7401ae39a0c9da951c9eb8f1f Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sat, 22 Apr 2023 15:04:56 +0200 Subject: [PATCH 02/19] snapshot --- src/radical/utils/shell.py | 21 +++++--- src/radical/utils/zmq/bridge.py | 19 +++++-- src/radical/utils/zmq/pubsub.py | 14 ++---- src/radical/utils/zmq/queue.py | 14 ++---- src/radical/utils/zmq/registry.py | 74 +++++++++++++++++++++++----- src/radical/utils/zmq/server.py | 2 + tests/unittests/test_zmq_registry.py | 22 +++++++-- tests/unittests/test_zmq_server.py | 39 +++++++++++++++ 8 files changed, 154 insertions(+), 51 deletions(-) mode change 100644 => 100755 tests/unittests/test_zmq_server.py diff --git a/src/radical/utils/shell.py b/src/radical/utils/shell.py index 2510b4cc6..f9f2e527d 100644 --- a/src/radical/utils/shell.py +++ b/src/radical/utils/shell.py @@ -39,7 +39,8 @@ def sh_quote(data): # ------------------------------------------------------------------------------ # -def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None): +def sh_callout(cmd, stdout=True, stderr=True, + shell=False, env=None, cwd=None): ''' call a shell command, return `[stdout, stderr, retval]`. ''' @@ -54,7 +55,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None): if stderr : stderr = sp.PIPE else : stderr = None - p = sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env) + p = sp.Popen(cmd, stdout=stdout, stderr=stderr, + shell=shell, env=env, cwd=cwd) if not stdout and not stderr: ret = p.wait() @@ -67,7 +69,8 @@ def sh_callout(cmd, stdout=True, stderr=True, shell=False, env=None): # ------------------------------------------------------------------------------ # -def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None): +def sh_callout_bg(cmd, stdout=None, stderr=None, + shell=False, env=None, cwd=None): ''' call a shell command in the background. Do not attempt to pipe STDOUT/ERR, but only support writing to named files. @@ -84,7 +87,7 @@ def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None): # convert string into arg list if needed if not shell and is_string(cmd): cmd = shlex.split(cmd) - sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env) + sp.Popen(cmd, stdout=stdout, stderr=stderr, shell=shell, env=env, cwd=cwd) return @@ -92,7 +95,7 @@ def sh_callout_bg(cmd, stdout=None, stderr=None, shell=False, env=None): # ------------------------------------------------------------------------------ # def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True, - shell=False, env=None): + shell=False, env=None, cwd=None): ''' Run a command, and capture stdout/stderr if so flagged. The call will @@ -110,6 +113,9 @@ def sh_callout_async(cmd, stdin=True, stdout=True, stderr=True, shell: True, False [default] - pass to popen + cwd: string + - working directory for command to run in + PROC: - PROC.stdout : `queue.Queue` instance delivering stdout lines - PROC.stderr : `queue.Queue` instance delivering stderr lines @@ -133,7 +139,7 @@ class _P(object): ''' # ---------------------------------------------------------------------- - def __init__(self, cmd, stdin, stdout, stderr, shell, env): + def __init__(self, cmd, stdin, stdout, stderr, shell, env, cwd): cmd = cmd.strip() @@ -165,6 +171,7 @@ def __init__(self, cmd, stdin, stdout, stderr, shell, env): stderr=self._err_w, shell=shell, env=env, + cwd=cwd, bufsize=1) t = mt.Thread(target=self._watch) @@ -277,7 +284,7 @@ def _watch(self): # -------------------------------------------------------------------------- return _P(cmd=cmd, stdin=stdin, stdout=stdout, stderr=stderr, - shell=shell, env=env) + shell=shell, env=env, cwd=cwd) # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/zmq/bridge.py b/src/radical/utils/zmq/bridge.py index ef1aa913c..7643b96f4 100644 --- a/src/radical/utils/zmq/bridge.py +++ b/src/radical/utils/zmq/bridge.py @@ -8,6 +8,10 @@ from ..config import Config from ..json_io import read_json, write_json +QUEUE = 'QUEUE' +PUBSUB = 'PUBSUB' +UNKNOWN = 'UNKNOWN' + # ------------------------------------------------------------------------------ # @@ -131,25 +135,30 @@ def start(self): # -------------------------------------------------------------------------- # @staticmethod - def create(cfg): + def create(channel : str): + + # FIXME: add other config parameters: batch size, log level, etc. # NOTE: I'd rather have this as class data than as stack data, but # python stumbles over circular imports at that point :/ # Another option though is to discover and dynamically load # components. + from .pubsub import PubSub from .queue import Queue - _btypemap = {'pubsub' : PubSub, - 'queue' : Queue} + _btypemap = {PUBSUB: PubSub, + QUEUE : Queue} - kind = cfg['kind'] + if 'queue' in channel.lower(): kind = QUEUE + elif 'pubsub' in channel.lower(): kind = PUBSUB + else : kind = UNKNOWN if kind not in _btypemap: raise ValueError('unknown bridge type (%s)' % kind) btype = _btypemap[kind] - bridge = btype(cfg) + bridge = btype(channel) return bridge diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 993d48ffa..73aa579cb 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -45,19 +45,11 @@ class PubSub(Bridge): # -------------------------------------------------------------------------- # - def __init__(self, cfg=None, channel=None): + def __init__(self, channel : str): - if cfg and not channel and is_string(cfg): - # allow construction with only channel name - channel = cfg - cfg = None + # FIXME: add other config parameters: batch size, log level, etc. - if cfg : cfg = Config(cfg=cfg) - elif channel: cfg = Config(cfg={'channel': channel}) - else: raise RuntimeError('PubSub needs cfg or channel parameter') - - if not cfg.channel: - raise ValueError('no channel name provided for pubsub') + cfg = Config(cfg={'channel': channel}) if not cfg.uid: cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel, diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index 68ee02b25..72c43cb38 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -85,7 +85,7 @@ def _atfork_child(): # class Queue(Bridge): - def __init__(self, cfg=None, channel=None): + def __init__(self, channel : str): ''' This Queue type sets up an zmq channel of this kind: @@ -106,17 +106,9 @@ def __init__(self, cfg=None, channel=None): addresses as obj.addr_put and obj.addr_get. ''' - if cfg and not channel and is_string(cfg): - # allow construction with only channel name - channel = cfg - cfg = None + # FIXME: add other config parameters: batch size, log level, etc. - if cfg : cfg = Config(cfg=cfg) - elif channel: cfg = Config(cfg={'channel': channel}) - else: raise RuntimeError('Queue needs cfg or channel parameter') - - if not cfg.channel: - raise ValueError('no channel name provided for queue') + cfg = Config(cfg={'channel': channel}) if not cfg.uid: cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel, diff --git a/src/radical/utils/zmq/registry.py b/src/radical/utils/zmq/registry.py index d9331ac6d..f3d6be192 100644 --- a/src/radical/utils/zmq/registry.py +++ b/src/radical/utils/zmq/registry.py @@ -1,4 +1,5 @@ +import atexit import shelve from typing import List, Optional, Any @@ -9,6 +10,19 @@ from .server import Server from .client import Client +_registries = list() + + +# ------------------------------------------------------------------------------ +# +def _flush_registries(): + for _reg in _registries: + print('=== stop %s' % _reg.uid) + _reg.stop() + + +atexit.register(_flush_registries) + # ------------------------------------------------------------------------------ # @@ -22,9 +36,10 @@ class Registry(Server): # def __init__(self, url : Optional[str] = None, uid : Optional[str] = None, + path : Optional[str] = None, persistent: bool = False) -> None: - super().__init__(url=url, uid=uid) + super().__init__(url=url, uid=uid, path=path) if persistent: self._data = shelve.open('%s.db' % self._uid, writeback=True) @@ -39,11 +54,19 @@ def __init__(self, url : Optional[str] = None, # -------------------------------------------------------------------------- # - def stop(self) -> None: + def dump(self) -> None: if isinstance(self._data, dict): write_json(self._data, '%s.json' % self._uid) - else: + + + # -------------------------------------------------------------------------- + # + def stop(self) -> None: + + self.dump() + + if isinstance(self._data, shelve.Shelf): self._data.close() super().stop() @@ -58,6 +81,8 @@ def put(self, key: str, val: Any) -> None: path = elems[:-1] leaf = elems[-1] + print('=== put %s' % key) + for elem in path: if elem not in this: @@ -81,19 +106,29 @@ def get(self, key: str) -> Optional[str]: leaf = elems[-1] for elem in path: - - this = this.get(elem) + this = this.get(elem, {}) if not this: - return None + break - return this.get(leaf) + val = this.get(leaf) + print('=== get %s: %s' % (key, val)) + return val # -------------------------------------------------------------------------- # - def keys(self) -> List[str]: + def keys(self, pwd: Optional[str] = None) -> List[str]: - return list(self._data.keys()) + this = self._data + + if pwd: + path = pwd.split('.') + for elem in path: + this = this.get(elem, {}) + if not this: + break + + return list(this.keys()) # -------------------------------------------------------------------------- @@ -117,14 +152,21 @@ class RegistryClient(Client, DictMixin): # -------------------------------------------------------------------------- # - def __init__(self, url: str) -> None: + def __init__(self, url: str, + pwd: Optional[str] = None) -> None: + + if not pwd: self._pwd = '' + else : self._pwd = '%s.' % pwd super().__init__(url=url) # -------------------------------------------------------------------------- # verbose API - def get(self, key: str, default: Optional[str] = None) -> Optional[Any]: + def get(self, key : str, + default: Optional[str] = None) -> Optional[Any]: + + key = self._pwd + key try: return self.request(cmd='get', key=key) @@ -134,7 +176,10 @@ def get(self, key: str, default: Optional[str] = None) -> Optional[Any]: def put(self, key: str, val: Any) -> None: + + key = self._pwd + key ret = self.request(cmd='put', key=key, val=val) + assert ret is None return ret @@ -142,20 +187,25 @@ def put(self, key: str, # -------------------------------------------------------------------------- # dict mixin API def __getitem__(self, key: str) -> Optional[Any]: + return self.get(key) def __setitem__(self, key: str, val: Any) -> None: + return self.put(key, val) def __delitem__(self, key: str) -> None: + + key = self._pwd + key ret = self.request(cmd='del', key=key) assert ret is None def keys(self) -> List[str]: - ret = self.request(cmd='keys') + + ret = self.request(cmd='keys', pwd=self._pwd) assert isinstance(ret, list) return ret diff --git a/src/radical/utils/zmq/server.py b/src/radical/utils/zmq/server.py index 7f36516aa..519306e03 100644 --- a/src/radical/utils/zmq/server.py +++ b/src/radical/utils/zmq/server.py @@ -271,6 +271,8 @@ def _work(self) -> None: while not self._term.is_set(): + print('do 0') + event = dict(no_intr(self._poll.poll, timeout=100)) if self._sock not in event: diff --git a/tests/unittests/test_zmq_registry.py b/tests/unittests/test_zmq_registry.py index 90b98af0a..c5dea210b 100755 --- a/tests/unittests/test_zmq_registry.py +++ b/tests/unittests/test_zmq_registry.py @@ -18,20 +18,24 @@ def test_zmq_registry(mocked_prof): try: assert r.addr - c = ru.zmq.RegistryClient(url=r.addr) + c = ru.zmq.RegistryClient(url=r.addr, pwd='oops') c.put('foo.bar.buz', {'biz': 11}) assert c.get('foo') == {'bar': {'buz': {'biz': 11}}} - assert c.get('foo.bar.buz.biz') == 11 + assert c.get('foo.bar.buz.biz') == 11 assert c.get('foo.bar.buz.biz.boz') is None - assert c.get('foo') == {'bar': {'buz': {'biz': 11}}} c.put('foo.bar.buz', {'biz': 42}) assert c.get('foo.bar.buz.biz') == 42 - assert c['foo.bar.buz.biz'] == 42 + assert c['foo.bar.buz.biz'] == 42 assert c['foo']['bar']['buz']['biz'] == 42 - assert c['foo.bar.buz.biz.boz'] is None + assert c['foo.bar.buz.biz.boz'] is None + + c['foo']['bar']['buz']['biz'] = 53 + assert c['foo.bar.buz.biz'] == 53 + assert c['foo.bar']['buz.biz'] == 53 + assert c['foo']['bar']['bu']['.biz'] == 53 assert 'foo' in c assert c.keys() == ['foo'] @@ -40,10 +44,18 @@ def test_zmq_registry(mocked_prof): assert c.keys() == [] finally: + if c: + c.close() + try: + c = ru.zmq.RegistryClient(url=r.addr) + c.keys() == ['oops'] + + finally: if c: c.close() + r.dump() r.stop() r.wait() diff --git a/tests/unittests/test_zmq_server.py b/tests/unittests/test_zmq_server.py old mode 100644 new mode 100755 index c5897b03c..06844c3cd --- a/tests/unittests/test_zmq_server.py +++ b/tests/unittests/test_zmq_server.py @@ -149,6 +149,45 @@ def test_start(self, mocked_profiler, mocked_logger): def test_zmq(self, mocked_profiler, mocked_logger, mocked_zmq_ctx): s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') + s = Server() + print('.') mocked_zmq_ctx().socket().bind = mock.Mock( side_effect=zmq.error.ZMQError(msg='random ZMQ error')) From 603679a51eb13ece9658dfef9081dc469cc2fb73 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sat, 22 Apr 2023 15:28:46 +0200 Subject: [PATCH 03/19] fix some minor test issues --- src/radical/utils/zmq/server.py | 3 ++- tests/unittests/test_profiler.py | 2 -- tests/unittests/test_zmq_server.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/radical/utils/zmq/server.py b/src/radical/utils/zmq/server.py index 519306e03..dc8e25305 100644 --- a/src/radical/utils/zmq/server.py +++ b/src/radical/utils/zmq/server.py @@ -271,13 +271,14 @@ def _work(self) -> None: while not self._term.is_set(): - print('do 0') event = dict(no_intr(self._poll.poll, timeout=100)) if self._sock not in event: continue + print('event', event) + # default response rep = None req = None diff --git a/tests/unittests/test_profiler.py b/tests/unittests/test_profiler.py index b42b26127..9f5ff901a 100755 --- a/tests/unittests/test_profiler.py +++ b/tests/unittests/test_profiler.py @@ -143,8 +143,6 @@ def _assert_profiler(key, val, res): if k.startswith('RADICAL'): del os.environ[k] - _assert_profiler('', '', True) - for val, res in [ ['false', False], ['', True ], diff --git a/tests/unittests/test_zmq_server.py b/tests/unittests/test_zmq_server.py index 06844c3cd..81728924d 100755 --- a/tests/unittests/test_zmq_server.py +++ b/tests/unittests/test_zmq_server.py @@ -217,7 +217,7 @@ def test_server_class(self, mocked_profiler, mocked_logger): c.request('no_registered_cmd') with self.assertRaisesRegex(RuntimeError, - '.* _test_0.* takes 1 positional argument'): + '.*_test_0.* takes 1 positional argument'): c.request('test_0', None) ret = c.request('test_0') From 3c7118ec19065cf547ab294db4a80b938ae60718 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sat, 22 Apr 2023 23:07:03 +0200 Subject: [PATCH 04/19] fix tests --- src/radical/utils/zmq/registry.py | 30 +++++++++++++++++----------- src/radical/utils/zmq/server.py | 2 -- tests/unittests/test_zmq_registry.py | 5 ----- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/src/radical/utils/zmq/registry.py b/src/radical/utils/zmq/registry.py index f3d6be192..035b224e3 100644 --- a/src/radical/utils/zmq/registry.py +++ b/src/radical/utils/zmq/registry.py @@ -17,7 +17,6 @@ # def _flush_registries(): for _reg in _registries: - print('=== stop %s' % _reg.uid) _reg.stop() @@ -81,8 +80,6 @@ def put(self, key: str, val: Any) -> None: path = elems[:-1] leaf = elems[-1] - print('=== put %s' % key) - for elem in path: if elem not in this: @@ -111,7 +108,6 @@ def get(self, key: str) -> Optional[str]: break val = this.get(leaf) - print('=== get %s: %s' % (key, val)) return val @@ -135,9 +131,17 @@ def keys(self, pwd: Optional[str] = None) -> List[str]: # def delitem(self, key: str) -> None: - del self._data[key] - if not isinstance(self._data, dict): - self._data.sync() + this = self._data + + if key: + path = key.split('.') + for elem in path[:-1]: + this = this.get(elem, {}) + if not this: + break + + if this: + del this[path[-1]] # ------------------------------------------------------------------------------ @@ -155,8 +159,7 @@ class RegistryClient(Client, DictMixin): def __init__(self, url: str, pwd: Optional[str] = None) -> None: - if not pwd: self._pwd = '' - else : self._pwd = '%s.' % pwd + self._pwd = pwd super().__init__(url=url) @@ -166,7 +169,8 @@ def __init__(self, url: str, def get(self, key : str, default: Optional[str] = None) -> Optional[Any]: - key = self._pwd + key + if self._pwd: + key = self._pwd + '.' + key try: return self.request(cmd='get', key=key) @@ -177,7 +181,8 @@ def get(self, key : str, def put(self, key: str, val: Any) -> None: - key = self._pwd + key + if self._pwd: + key = self._pwd + '.' + key ret = self.request(cmd='put', key=key, val=val) assert ret is None @@ -198,7 +203,8 @@ def __setitem__(self, key: str, val: Any) -> None: def __delitem__(self, key: str) -> None: - key = self._pwd + key + if self._pwd: + key = self._pwd + '.' + key ret = self.request(cmd='del', key=key) assert ret is None diff --git a/src/radical/utils/zmq/server.py b/src/radical/utils/zmq/server.py index dc8e25305..3a83f6fe5 100644 --- a/src/radical/utils/zmq/server.py +++ b/src/radical/utils/zmq/server.py @@ -277,8 +277,6 @@ def _work(self) -> None: if self._sock not in event: continue - print('event', event) - # default response rep = None req = None diff --git a/tests/unittests/test_zmq_registry.py b/tests/unittests/test_zmq_registry.py index c5dea210b..8a155bb9e 100755 --- a/tests/unittests/test_zmq_registry.py +++ b/tests/unittests/test_zmq_registry.py @@ -32,11 +32,6 @@ def test_zmq_registry(mocked_prof): assert c['foo']['bar']['buz']['biz'] == 42 assert c['foo.bar.buz.biz.boz'] is None - c['foo']['bar']['buz']['biz'] = 53 - assert c['foo.bar.buz.biz'] == 53 - assert c['foo.bar']['buz.biz'] == 53 - assert c['foo']['bar']['bu']['.biz'] == 53 - assert 'foo' in c assert c.keys() == ['foo'] del c['foo'] From bd099542eb2910be70d528cbaf9df01d9da74aea Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sat, 22 Apr 2023 23:11:37 +0200 Subject: [PATCH 05/19] api fix --- src/radical/utils/zmq/bridge.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/radical/utils/zmq/bridge.py b/src/radical/utils/zmq/bridge.py index 7643b96f4..ed5315fd8 100644 --- a/src/radical/utils/zmq/bridge.py +++ b/src/radical/utils/zmq/bridge.py @@ -135,7 +135,8 @@ def start(self): # -------------------------------------------------------------------------- # @staticmethod - def create(channel : str): + def create(channel : str, + kind : str = None): # FIXME: add other config parameters: batch size, log level, etc. @@ -150,9 +151,10 @@ def create(channel : str): _btypemap = {PUBSUB: PubSub, QUEUE : Queue} - if 'queue' in channel.lower(): kind = QUEUE - elif 'pubsub' in channel.lower(): kind = PUBSUB - else : kind = UNKNOWN + if not kind: + if 'queue' in channel.lower(): kind = QUEUE + elif 'pubsub' in channel.lower(): kind = PUBSUB + else : kind = UNKNOWN if kind not in _btypemap: raise ValueError('unknown bridge type (%s)' % kind) From 0f1b4d998e9027b79145282744adf30feac90a1d Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sun, 7 May 2023 23:10:57 +0200 Subject: [PATCH 06/19] no config files for zmq bridges --- src/radical/utils/heartbeat.py | 2 +- src/radical/utils/logger.py | 1 + src/radical/utils/zmq/bridge.py | 11 +++++++++-- src/radical/utils/zmq/pubsub.py | 12 ++++++++---- src/radical/utils/zmq/queue.py | 12 ++++++++---- src/radical/utils/zmq/registry.py | 15 +++++++++++++-- 6 files changed, 40 insertions(+), 13 deletions(-) diff --git a/src/radical/utils/heartbeat.py b/src/radical/utils/heartbeat.py index c472dda2a..7e740c721 100644 --- a/src/radical/utils/heartbeat.py +++ b/src/radical/utils/heartbeat.py @@ -233,7 +233,7 @@ def wait_startup(self, uids=None, timeout=None): self._log.debug('wait time: %s', nok) break - time.sleep(0.05) + time.sleep(0.25) if len(ok) != len(uids): nok = [uid for uid in uids if uid not in ok] diff --git a/src/radical/utils/logger.py b/src/radical/utils/logger.py index 455626244..9aef1968d 100644 --- a/src/radical/utils/logger.py +++ b/src/radical/utils/logger.py @@ -331,6 +331,7 @@ def _ensure_handler(self): p = self._path n = self._name for t in self._targets: + if t in ['0', 'null'] : h = logging.NullHandler() elif t in ['-', '1', 'stdout']: h = ColorStreamHandler(sys.stdout) elif t in ['=', '2', 'stderr']: h = ColorStreamHandler(sys.stderr) diff --git a/src/radical/utils/zmq/bridge.py b/src/radical/utils/zmq/bridge.py index ed5315fd8..f73c5ffaf 100644 --- a/src/radical/utils/zmq/bridge.py +++ b/src/radical/utils/zmq/bridge.py @@ -3,6 +3,8 @@ import threading as mt +from typing import Optional + from ..logger import Logger from ..profile import Profiler from ..config import Config @@ -47,6 +49,10 @@ def __init__(self, cfg): self._channel = self._cfg.channel self._uid = self._cfg.uid self._pwd = self._cfg.path + + if not self._pwd: + self._pwd = os.getcwd() + self._log = Logger(name=self._uid, ns='radical.utils', level=self._cfg.log_lvl, path=self._pwd) self._prof = Profiler(name=self._uid, path=self._pwd) @@ -136,7 +142,8 @@ def start(self): # @staticmethod def create(channel : str, - kind : str = None): + kind : Optional[str] = None, + cfg : Optional[dict] = None): # FIXME: add other config parameters: batch size, log level, etc. @@ -160,7 +167,7 @@ def create(channel : str, raise ValueError('unknown bridge type (%s)' % kind) btype = _btypemap[kind] - bridge = btype(channel) + bridge = btype(channel, cfg=cfg) return bridge diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 73aa579cb..9ea0eb04a 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -5,6 +5,8 @@ import threading as mt +from typing import Optional + from ..atfork import atfork from ..config import Config from ..ids import generate_id, ID_CUSTOM @@ -45,11 +47,13 @@ class PubSub(Bridge): # -------------------------------------------------------------------------- # - def __init__(self, channel : str): - - # FIXME: add other config parameters: batch size, log level, etc. + def __init__(self, channel: str, cfg: Optional[dict] = None): - cfg = Config(cfg={'channel': channel}) + if cfg: + # create deep copy + cfg = Config(cfg=cfg) + else: + cfg = Config(cfg={'channel': channel}) if not cfg.uid: cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel, diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index 72c43cb38..2cc5a5f76 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -7,6 +7,8 @@ import threading as mt +from typing import Optional + from ..atfork import atfork from ..config import Config from ..ids import generate_id, ID_CUSTOM @@ -85,7 +87,7 @@ def _atfork_child(): # class Queue(Bridge): - def __init__(self, channel : str): + def __init__(self, channel: str, cfg: Optional[dict] = None): ''' This Queue type sets up an zmq channel of this kind: @@ -106,9 +108,11 @@ def __init__(self, channel : str): addresses as obj.addr_put and obj.addr_get. ''' - # FIXME: add other config parameters: batch size, log level, etc. - - cfg = Config(cfg={'channel': channel}) + if cfg: + # create deep copy + cfg = Config(cfg=cfg) + else: + cfg = Config(cfg={'channel': channel}) if not cfg.uid: cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel, diff --git a/src/radical/utils/zmq/registry.py b/src/radical/utils/zmq/registry.py index 035b224e3..8b385d81e 100644 --- a/src/radical/utils/zmq/registry.py +++ b/src/radical/utils/zmq/registry.py @@ -49,14 +49,18 @@ def __init__(self, url : Optional[str] = None, self.register_request('get', self.get) self.register_request('keys', self.keys) self.register_request('del', self.delitem) + self.register_request('dump', self.dump) # -------------------------------------------------------------------------- # - def dump(self) -> None: + def dump(self, name: str = None) -> None: if isinstance(self._data, dict): - write_json(self._data, '%s.json' % self._uid) + if name: + write_json(self._data, '%s.%s.json' % (self._uid, name)) + else: + write_json(self._data, '%s.json' % self._uid) # -------------------------------------------------------------------------- @@ -164,6 +168,13 @@ def __init__(self, url: str, super().__init__(url=url) + # -------------------------------------------------------------------------- + # + def dump(self, name: str = None) -> None: + + return self.request(cmd='dump', name=name) + + # -------------------------------------------------------------------------- # verbose API def get(self, key : str, From f065e0ee3190d0ba880bfb87a0099db92e1e592f Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Mon, 8 May 2023 09:00:44 +0200 Subject: [PATCH 07/19] fix tests --- tests/unittests/test_zmq_pubsub.py | 2 +- tests/unittests/test_zmq_queue.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unittests/test_zmq_pubsub.py b/tests/unittests/test_zmq_pubsub.py index 401db5a15..4fe6ba6b3 100755 --- a/tests/unittests/test_zmq_pubsub.py +++ b/tests/unittests/test_zmq_pubsub.py @@ -40,7 +40,7 @@ def test_zmq_pubsub(): 'stall_hwm': 1, }) - b = ru.zmq.PubSub(cfg) + b = ru.zmq.PubSub('test', cfg) b.start() assert b.addr_in != b.addr_out diff --git a/tests/unittests/test_zmq_queue.py b/tests/unittests/test_zmq_queue.py index 12bd68bc7..e8b648f23 100755 --- a/tests/unittests/test_zmq_queue.py +++ b/tests/unittests/test_zmq_queue.py @@ -265,7 +265,7 @@ def get_msg_a(msgs): data['get'][uid] = list() data['get'][uid].append(uid) - b = ru.zmq.Queue(cfg) + b = ru.zmq.Queue('test', cfg) b.start() assert b.addr_in != b.addr_out From 90c206509e3fdb24c6b02bfacc4550984184e3be Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Mon, 8 May 2023 09:16:21 +0200 Subject: [PATCH 08/19] linting --- src/radical/utils/zmq/pubsub.py | 2 +- src/radical/utils/zmq/queue.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 9ea0eb04a..04ab8956d 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -11,7 +11,7 @@ from ..config import Config from ..ids import generate_id, ID_CUSTOM from ..url import Url -from ..misc import is_string, as_string, as_bytes, as_list, noop +from ..misc import as_string, as_bytes, as_list, noop from ..host import get_hostip from ..logger import Logger from ..profile import Profiler diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index 2cc5a5f76..956e377d2 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -13,7 +13,7 @@ from ..config import Config from ..ids import generate_id, ID_CUSTOM from ..url import Url -from ..misc import is_string, as_string, as_bytes, as_list, noop +from ..misc import as_string, as_bytes, as_list, noop from ..host import get_hostip from ..logger import Logger from ..profile import Profiler From aec60704d6f27818adfee87cb768bbdfe1a5034c Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Mon, 8 May 2023 09:36:51 +0200 Subject: [PATCH 09/19] linting --- tests/unittests/test_zmq_registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unittests/test_zmq_registry.py b/tests/unittests/test_zmq_registry.py index 8a155bb9e..5455662d5 100755 --- a/tests/unittests/test_zmq_registry.py +++ b/tests/unittests/test_zmq_registry.py @@ -44,7 +44,7 @@ def test_zmq_registry(mocked_prof): try: c = ru.zmq.RegistryClient(url=r.addr) - c.keys() == ['oops'] + assert c.keys() == ['oops'] finally: if c: From 873738b3c90cbe2ef6406eec4639c19d760316ca Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Sun, 14 May 2023 08:28:30 +0200 Subject: [PATCH 10/19] type fix --- src/radical/utils/zmq/registry.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/radical/utils/zmq/registry.py b/src/radical/utils/zmq/registry.py index 8b385d81e..fc138d7f0 100644 --- a/src/radical/utils/zmq/registry.py +++ b/src/radical/utils/zmq/registry.py @@ -178,7 +178,7 @@ def dump(self, name: str = None) -> None: # -------------------------------------------------------------------------- # verbose API def get(self, key : str, - default: Optional[str] = None) -> Optional[Any]: + default: Optional[Any] = None) -> Optional[Any]: if self._pwd: key = self._pwd + '.' + key From a04cb1351f2c743b5803537d603610663bdd2f53 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 25 May 2023 10:23:12 +0200 Subject: [PATCH 11/19] response to comments --- src/radical/utils/zmq/bridge.py | 3 --- src/radical/utils/zmq/pubsub.py | 8 +++++- src/radical/utils/zmq/queue.py | 16 +++++++++--- src/radical/utils/zmq/registry.py | 8 +++++- tests/unittests/test_zmq_server.py | 39 ------------------------------ 5 files changed, 26 insertions(+), 48 deletions(-) diff --git a/src/radical/utils/zmq/bridge.py b/src/radical/utils/zmq/bridge.py index f73c5ffaf..06be1b26b 100644 --- a/src/radical/utils/zmq/bridge.py +++ b/src/radical/utils/zmq/bridge.py @@ -57,9 +57,6 @@ def __init__(self, cfg): level=self._cfg.log_lvl, path=self._pwd) self._prof = Profiler(name=self._uid, path=self._pwd) - if self._pwd is None: - self._pwd = os.getcwd() - if 'hb' in self._uid or 'heartbeat' in self._uid: self._prof.disable() else: diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 04ab8956d..8b372e7b2 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -53,7 +53,13 @@ def __init__(self, channel: str, cfg: Optional[dict] = None): # create deep copy cfg = Config(cfg=cfg) else: - cfg = Config(cfg={'channel': channel}) + cfg = Config() + + # ensure channel is set in config + if cfg.channel: + assert cfg.channel == channel + else: + cfg.channel = channel if not cfg.uid: cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel, diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index 956e377d2..ddb6b12ed 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -22,11 +22,11 @@ from .bridge import Bridge from .utils import no_intr +# NOTE: the log bulk method is frequently called and slow # from .utils import log_bulk # from .utils import prof_bulk -# FIXME: the log bulk method is frequently called and slow # -------------------------------------------------------------------------- # @@ -112,7 +112,13 @@ def __init__(self, channel: str, cfg: Optional[dict] = None): # create deep copy cfg = Config(cfg=cfg) else: - cfg = Config(cfg={'channel': channel}) + cfg = Config() + + # ensure channel is set in config + if cfg.channel: + assert cfg.channel == channel + else: + cfg.channel = channel if not cfg.uid: cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel, @@ -236,7 +242,7 @@ def _bridge_work(self): msgs = msgpack.unpackb(data[1]) # prof_bulk(self._prof, 'poll_put_recv', msgs) # log_bulk(self._log, '<> %s' % qname, msgs) - self._log.debug('put %s: %s ! ', qname, len(msgs)) + # self._log.debug('put %s: %s ! ', qname, len(msgs)) if qname not in buf: buf[qname] = list() @@ -266,6 +272,8 @@ def _bridge_work(self): if qname in buf: msgs = buf[qname][:self._bulk_size] else: + # self._log.debug('get: %s not in %s', qname, + # list(buf.keys())) msgs = list() # log_bulk(self._log, '>< %s' % qname, msgs) @@ -408,6 +416,7 @@ def _get_nowait(url, qname=None, timeout=None, uid=None): # timeout in ms if not info['requested']: # send the request *once* per recieval (got lock above) + # FIXME: why is this sent repeatedly? # logger.debug('=== => from %s[%s]', uid, qname) no_intr(info['socket'].send, as_bytes(qname)) info['requested'] = True @@ -456,7 +465,6 @@ def _listener(url, qname=None, uid=None): continue msgs = Getter._get_nowait(url, qname=qname, timeout=500, uid=uid) - BULK = True if msgs: diff --git a/src/radical/utils/zmq/registry.py b/src/radical/utils/zmq/registry.py index fc138d7f0..d1cfb0077 100644 --- a/src/radical/utils/zmq/registry.py +++ b/src/radical/utils/zmq/registry.py @@ -86,7 +86,7 @@ def put(self, key: str, val: Any) -> None: for elem in path: - if elem not in this: + if elem not in this or this[elem] is None: this[elem] = dict() this = this[elem] @@ -111,6 +111,9 @@ def get(self, key: str) -> Optional[str]: if not this: break + if this is None: + this = dict() + val = this.get(leaf) return val @@ -128,6 +131,9 @@ def keys(self, pwd: Optional[str] = None) -> List[str]: if not this: break + if this is None: + this = dict() + return list(this.keys()) diff --git a/tests/unittests/test_zmq_server.py b/tests/unittests/test_zmq_server.py index 81728924d..8ad860d9a 100755 --- a/tests/unittests/test_zmq_server.py +++ b/tests/unittests/test_zmq_server.py @@ -149,45 +149,6 @@ def test_start(self, mocked_profiler, mocked_logger): def test_zmq(self, mocked_profiler, mocked_logger, mocked_zmq_ctx): s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') - s = Server() - print('.') mocked_zmq_ctx().socket().bind = mock.Mock( side_effect=zmq.error.ZMQError(msg='random ZMQ error')) From 6a0303f87b97225667df37a458ddb3d231afaa48 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 13 Jun 2023 11:23:42 +0200 Subject: [PATCH 12/19] snapshot --- src/radical/utils/heartbeat.py | 17 ++++++++++++++--- src/radical/utils/ids.py | 5 +---- src/radical/utils/zmq/queue.py | 4 ++-- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/radical/utils/heartbeat.py b/src/radical/utils/heartbeat.py index 7e740c721..652dea5a0 100644 --- a/src/radical/utils/heartbeat.py +++ b/src/radical/utils/heartbeat.py @@ -101,6 +101,15 @@ def dump(self, log): log.debug('hb dump %s: \n%s', self._uid, pprint.pformat(self._tstamps)) + # -------------------------------------------------------------------------- + # + def watch(self, uid): + + with self._lock: + if uid not in self._tstamps: + self._tstamps[uid] = None + + # -------------------------------------------------------------------------- # def _watch(self): @@ -148,7 +157,7 @@ def _watch(self): # avoiding termination ret = True - if ret is None: + if ret in [None, False]: # could not recover: abandon mothership self._log.warn('hb fail %s: fatal (%d)', uid, self._pid) os.kill(self._pid, signal.SIGTERM) @@ -161,6 +170,7 @@ def _watch(self): # information for the old uid and register a new # heartbeat for the new one, so that we can immediately # begin to watch it. + assert isinstance(ret, str) self._log.info('hb recover %s -> %s (%s)', uid, ret, self._term_cb) with self._lock: @@ -178,9 +188,10 @@ def beat(self, uid=None, timestamp=None): if not uid: uid = 'default' - # self._log.debug('hb %s beat [%s]', self._uid, uid) with self._lock: - self._tstamps[uid] = timestamp + if uid in self._tstamps: + # self._log.debug('hb %s beat [%s]', self._uid, uid) + self._tstamps[uid] = timestamp # # -------------------------------------------------------------------------- diff --git a/src/radical/utils/ids.py b/src/radical/utils/ids.py index 8fffeab43..cda417d98 100644 --- a/src/radical/utils/ids.py +++ b/src/radical/utils/ids.py @@ -108,7 +108,7 @@ def reset_counter(self, prefix, reset_all_others=False): # ------------------------------------------------------------------------------ # -def generate_id(prefix, mode=ID_SIMPLE, ns=None): +def generate_id(prefix: str, mode=ID_SIMPLE, ns=None): """ Generate a human readable, sequential ID for the given prefix. @@ -183,9 +183,6 @@ def generate_id(prefix, mode=ID_SIMPLE, ns=None): and will, for `ID_PRIVATE`, revert to `ID_UUID`. """ - if not prefix or not isinstance(prefix, str): - raise TypeError("ID generation expect prefix in basestring type") - if _cache['dockerized'] and mode == ID_PRIVATE: mode = ID_UUID diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index ddb6b12ed..7769d9180 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -327,7 +327,7 @@ def __init__(self, channel, url=None, log=None, prof=None, path=None): ID_CUSTOM) if not self._url: - self._url = Bridge.get_config(channel, path).put + self._url = Bridge.get_config(channel, path).get('put') if not self._url: raise ValueError('no contact url specified, no config found') @@ -551,7 +551,7 @@ def __init__(self, channel, url=None, cb=None, ID_CUSTOM) if not self._url: - self._url = Bridge.get_config(channel, path).get + self._url = Bridge.get_config(channel, path).get('get') if not self._url: raise ValueError('no contact url specified, no config found') From 40f212db4e2309b23e1e3f8b72e4fa7e9e418412 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 14 Jun 2023 16:13:27 +0200 Subject: [PATCH 13/19] add message base class --- src/radical/utils/__init__.py | 1 + src/radical/utils/heartbeat.py | 26 +++++++++++++++++--------- src/radical/utils/typeddict.py | 14 +++++++++++--- src/radical/utils/zmq/__init__.py | 1 + 4 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/radical/utils/__init__.py b/src/radical/utils/__init__.py index 3d795e639..8b7e57454 100644 --- a/src/radical/utils/__init__.py +++ b/src/radical/utils/__init__.py @@ -52,6 +52,7 @@ from .typeddict import TypedDict, TypedDictMeta, as_dict from .config import Config, DefaultConfig +from .zmq import Message from .zmq import Bridge from .zmq import Queue, Putter, Getter from .zmq import PubSub, Publisher, Subscriber diff --git a/src/radical/utils/heartbeat.py b/src/radical/utils/heartbeat.py index 652dea5a0..7f87a4648 100644 --- a/src/radical/utils/heartbeat.py +++ b/src/radical/utils/heartbeat.py @@ -64,12 +64,14 @@ def __init__(self, uid, timeout, interval=1, beat_cb=None, term_cb=None, if not self._log: self._log = Logger('radical.utils.heartbeat') + self._log.debug('=== hb %s create', self._uid) + # -------------------------------------------------------------------------- # def start(self): - self._log.debug('start heartbeat') + self._log.debug('=== hb %s start', self._uid) self._watcher = mt.Thread(target=self._watch) self._watcher.daemon = True self._watcher.start() @@ -107,6 +109,7 @@ def watch(self, uid): with self._lock: if uid not in self._tstamps: + self._log.debug('=== hb %s watch %s', self._uid, uid) self._tstamps[uid] = None @@ -116,14 +119,18 @@ def _watch(self): # initial heartbeat without delay if self._beat_cb: + self._log.debug('=== hb %s beat cb init', self._uid) self._beat_cb() while not self._term.is_set(): + self._log.debug('=== hb %s loop %s', self._uid, self._interval) + time.sleep(self._interval) now = time.time() if self._beat_cb: + self._log.debug('=== hb %s beat cb', self._uid) self._beat_cb() # avoid iteration over changing dict @@ -132,19 +139,19 @@ def _watch(self): for uid in uids: - # self._log.debug('hb %s check %s', self._uid, uid) + self._log.debug('=== hb %s check %s', self._uid, uid) with self._lock: last = self._tstamps.get(uid) if last is None: - self._log.warn('hb %s[%s]: never seen', self._uid, uid) + self._log.warn('=== hb %s inval %s', self._uid, uid) continue if now - last > self._timeout: if self._log: - self._log.warn('hb %s[%s]: %.1f - %.1f > %1.f: timeout', + self._log.warn('=== hb %s tout %s: %.1f - %.1f > %1.f', self._uid, uid, now, last, self._timeout) ret = None @@ -159,7 +166,8 @@ def _watch(self): if ret in [None, False]: # could not recover: abandon mothership - self._log.warn('hb fail %s: fatal (%d)', uid, self._pid) + self._log.warn('=== hb %s fail %s: fatal (%d)', + self._uid, uid, self._pid) os.kill(self._pid, signal.SIGTERM) time.sleep(1) os.kill(self._pid, signal.SIGKILL) @@ -171,8 +179,8 @@ def _watch(self): # heartbeat for the new one, so that we can immediately # begin to watch it. assert isinstance(ret, str) - self._log.info('hb recover %s -> %s (%s)', - uid, ret, self._term_cb) + self._log.info('=== hb %s recov %s -> %s (%s)', + self._uid, uid, ret, self._term_cb) with self._lock: del self._tstamps[uid] self._tstamps[ret] = time.time() @@ -190,7 +198,7 @@ def beat(self, uid=None, timestamp=None): with self._lock: if uid in self._tstamps: - # self._log.debug('hb %s beat [%s]', self._uid, uid) + self._log.debug('hb %s beat [%s]', self._uid, uid) self._tstamps[uid] = timestamp @@ -248,7 +256,7 @@ def wait_startup(self, uids=None, timeout=None): if len(ok) != len(uids): nok = [uid for uid in uids if uid not in ok] - self._log.debug('wait fail: %s', nok) + self._log.error('wait fail: %s', nok) return nok else: diff --git a/src/radical/utils/typeddict.py b/src/radical/utils/typeddict.py index 7573781bd..f30091380 100644 --- a/src/radical/utils/typeddict.py +++ b/src/radical/utils/typeddict.py @@ -107,7 +107,7 @@ class TypedDict(dict, metaclass=TypedDictMeta): # -------------------------------------------------------------------------- # - def __init__(self, from_dict=None): + def __init__(self, from_dict=None, **kwargs): ''' Create a typed dictionary (tree) from `from_dict`. @@ -131,10 +131,19 @@ def __init__(self, from_dict=None): verify Names with a leading underscore are not supported. + + Supplied `from_dict` and kwargs are used to initialize the object + data -- the `kwargs` take preceedence over the `from_dict` if both + are specified (note that `from_dict` and `self` are invalid + `kwargs`). ''' + self.update(copy.deepcopy(self._defaults)) self.update(from_dict) + if kwargs: + self.update(kwargs) + # -------------------------------------------------------------------------- # @@ -297,8 +306,7 @@ def __str__(self): return str(self._data) def __repr__(self): - return '<%s object, schema keys: %s>' % \ - (type(self).__qualname__, tuple(self._schema.keys())) + return '%s: %s' % (type(self).__qualname__, str(self)) # -------------------------------------------------------------------------- diff --git a/src/radical/utils/zmq/__init__.py b/src/radical/utils/zmq/__init__.py index d42a28e6c..54a1964a1 100644 --- a/src/radical/utils/zmq/__init__.py +++ b/src/radical/utils/zmq/__init__.py @@ -13,6 +13,7 @@ from .client import Client from .server import Server from .registry import Registry, RegistryClient +from .message import Message # ------------------------------------------------------------------------------ From 60f84498dcee219caf075ca500ec818f74c17a79 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 14 Jun 2023 23:26:46 +0200 Subject: [PATCH 14/19] add missing file --- src/radical/utils/zmq/message.py | 60 ++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 src/radical/utils/zmq/message.py diff --git a/src/radical/utils/zmq/message.py b/src/radical/utils/zmq/message.py new file mode 100644 index 000000000..668f7e2b0 --- /dev/null +++ b/src/radical/utils/zmq/message.py @@ -0,0 +1,60 @@ + +from typing import Optional, Dict, Any + +import msgpack + +from ..typeddict import TypedDict + + +# ------------------------------------------------------------------------------ +# +class Message(TypedDict): + + _schema = { + 'msg_type': str, + 'payload' : {str, None} + } + + _defaults = { + 'msg_type': None, + 'payload' : None + } + + _msg_types = dict() + + + # -------------------------------------------------------------------------- + def verify(self): + assert self.msg_type + super().verify() + + + @staticmethod + def register_msg_type(msg_type, msg_class): + Message._msg_types[msg_type] = msg_class + + + @staticmethod + def deserialize(data: Dict[str, Any]): + + msg_type = data.get('msg_type') + + if msg_type is None: + raise ValueError('no message type defined') + + if msg_type not in Message._msg_types: + raise ValueError('unknown message type [%s]' % msg_type) + + return Message._msg_types[msg_type](from_dict=data) + + + def packb(self): + return msgpack.packb(self) + + @staticmethod + def unpackb(bdata): + return Message.deserialize(msgpack.unpackb(bdata)) + + +# ------------------------------------------------------------------------------ + From 3f5ccd7807c2dd257e9f871f0c40a7ad9268c081 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Mon, 17 Jul 2023 19:29:24 +0200 Subject: [PATCH 15/19] snapshot --- src/radical/utils/zmq/__init__.py | 4 +- src/radical/utils/zmq/pubsub.py | 143 ++++++++++++++++++++++------- src/radical/utils/zmq/queue.py | 65 +++++++++++++ src/radical/utils/zmq/registry.py | 1 + tests/unittests/test_zmq_pubsub.py | 9 ++ 5 files changed, 186 insertions(+), 36 deletions(-) diff --git a/src/radical/utils/zmq/__init__.py b/src/radical/utils/zmq/__init__.py index 54a1964a1..d6d2d4047 100644 --- a/src/radical/utils/zmq/__init__.py +++ b/src/radical/utils/zmq/__init__.py @@ -7,8 +7,8 @@ from .bridge import Bridge -from .queue import Queue, Putter, Getter -from .pubsub import PubSub, Publisher, Subscriber +from .queue import Queue, Putter, Getter, test_queue +from .pubsub import PubSub, Publisher, Subscriber, test_pubsub from .pipe import Pipe, MODE_PUSH, MODE_PULL from .client import Client from .server import Server diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 8b372e7b2..cd7861797 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -1,6 +1,7 @@ # pylint: disable=protected-access import zmq +import time import msgpack import threading as mt @@ -15,9 +16,10 @@ from ..host import get_hostip from ..logger import Logger from ..profile import Profiler +from ..debug import get_stacktrace, get_caller_name from .bridge import Bridge -from .utils import no_intr # , log_bulk +from .utils import no_intr , log_bulk # ------------------------------------------------------------------------------ @@ -39,10 +41,6 @@ def _atfork_child(): # ------------------------------------------------------------------------------ # -# Notifications between components are based on pubsub channels. Those channels -# have different scope (bound to the channel name). Only one specific topic is -# predefined: 'state' will be used for unit state updates. -# class PubSub(Bridge): # -------------------------------------------------------------------------- @@ -55,14 +53,8 @@ def __init__(self, channel: str, cfg: Optional[dict] = None): else: cfg = Config() - # ensure channel is set in config - if cfg.channel: - assert cfg.channel == channel - else: - cfg.channel = channel - if not cfg.uid: - cfg.uid = generate_id('%s.bridge.%%(counter)04d' % cfg.channel, + cfg.uid = generate_id('%s.bridge.%%(counter)04d' % channel, ID_CUSTOM) super(PubSub, self).__init__(cfg) @@ -106,19 +98,19 @@ def _bridge_initialize(self): self._lock = mt.Lock() self._ctx = zmq.Context.instance() # rely on GC for destruction - self._pub = self._ctx.socket(zmq.XSUB) - self._pub.linger = _LINGER_TIMEOUT - self._pub.hwm = _HIGH_WATER_MARK - self._pub.bind('tcp://*:*') + self._xpub = self._ctx.socket(zmq.XSUB) + self._xpub.linger = _LINGER_TIMEOUT + self._xpub.hwm = _HIGH_WATER_MARK + self._xpub.bind('tcp://*:*') - self._sub = self._ctx.socket(zmq.XPUB) - self._sub.linger = _LINGER_TIMEOUT - self._sub.hwm = _HIGH_WATER_MARK - self._sub.bind('tcp://*:*') + self._xsub = self._ctx.socket(zmq.XPUB) + self._xsub.linger = _LINGER_TIMEOUT + self._xsub.hwm = _HIGH_WATER_MARK + self._xsub.bind('tcp://*:*') # communicate the bridge ports to the parent process - _addr_pub = as_string(self._pub.getsockopt(zmq.LAST_ENDPOINT)) - _addr_sub = as_string(self._sub.getsockopt(zmq.LAST_ENDPOINT)) + _addr_pub = as_string(self._xpub.getsockopt(zmq.LAST_ENDPOINT)) + _addr_sub = as_string(self._xsub.getsockopt(zmq.LAST_ENDPOINT)) # store addresses self._addr_pub = Url(_addr_pub) @@ -131,10 +123,13 @@ def _bridge_initialize(self): self._log.info('bridge pub on %s: %s', self._uid, self._addr_pub) self._log.info(' sub on %s: %s', self._uid, self._addr_sub) + # make sure bind is active + time.sleep(0.1) + # start polling for messages self._poll = zmq.Poller() - self._poll.register(self._pub, zmq.POLLIN) - self._poll.register(self._sub, zmq.POLLIN) + self._poll.register(self._xpub, zmq.POLLIN) + self._poll.register(self._xsub, zmq.POLLIN) # -------------------------------------------------------------------------- @@ -153,28 +148,28 @@ def _bridge_work(self): # timeout in ms socks = dict(self._poll.poll(timeout=10)) - if self._sub in socks: + if self._xsub in socks: # if the sub socket signals a message, it's likely # a topic subscription. Forward that to the pub # channel, so the bridge subscribes for the respective # message topic. - msg = self._sub.recv() - self._pub.send(msg) + msg = self._xsub.recv() + self._xpub.send(msg) self._prof.prof('subscribe', uid=self._uid, msg=msg) - # log_bulk(self._log, '~~1 %s' % self.channel, [msg]) + # log_bulk(self._log, '~~1 %s' % self.uid, [msg]) - if self._pub in socks: + if self._xpub in socks: # if the pub socket signals a message, get the message # and forward it to the sub channel, no questions asked. - msg = self._pub.recv() - self._sub.send(msg) + msg = self._xpub.recv() + self._xsub.send(msg) # self._prof.prof('msg_fwd', uid=self._uid, msg=msg) - # log_bulk(self._log, '<> %s' % self.channel, [msg]) + # log_bulk(self._log, '<> %s' % self.uid, [msg]) # ------------------------------------------------------------------------------ @@ -217,6 +212,8 @@ def __init__(self, channel, url=None, log=None, prof=None, path=None): self._socket.hwm = _HIGH_WATER_MARK self._socket.connect(self._url) + time.sleep(0.1) + # -------------------------------------------------------------------------- # @@ -244,6 +241,7 @@ def put(self, topic, msg): assert isinstance(topic, str), 'invalid topic type' # self._log.debug('=== put %s : %s: %s', topic, self.channel, msg) + # self._log.debug('=== put %s: %s', msg, get_stacktrace()) # self._prof.prof('put', uid=self._uid, msg=msg) # log_bulk(self._log, '-> %s' % topic, [msg]) @@ -277,7 +275,7 @@ def _get_nowait(socket, lock, timeout, log, prof): topic, bmsg = data.split(b' ', 1) msg = msgpack.unpackb(bmsg) - # log.debug(' <- %s: %s', topic, msg) + log.debug(' <- %s: %s', topic, msg) return [as_string(topic), as_string(msg)] @@ -295,7 +293,7 @@ def _listener(sock, lock, term, callbacks, log, prof): # this list is dynamic topic, msg = Subscriber._get_nowait(sock, lock, 500, log, prof) - # log.debug(' <- %s: %s', topic, msg) + log.debug(' <- %s: %s', topic, msg) if topic: for cb, _lock in callbacks: @@ -306,6 +304,11 @@ def _listener(sock, lock, term, callbacks, log, prof): cb(topic, msg) else: cb(topic, msg) + except SystemExit: + log.info('callback called sys.exit') + term.set() + break + except: log.exception('callback error') except: @@ -341,6 +344,9 @@ def __init__(self, channel, url=None, topic=None, cb=None, self._uid = generate_id('%s.sub.%s' % (self._channel, '%(counter)04d'), ID_CUSTOM) + if not self._topics: + self._topics = [] + if not self._url: self._url = Bridge.get_config(channel, path).sub @@ -365,6 +371,8 @@ def __init__(self, channel, url=None, topic=None, cb=None, self._sock.hwm = _HIGH_WATER_MARK self._sock.connect(self._url) + time.sleep(0.1) + # only allow `get()` and `get_nowait()` self._interactive = True @@ -448,6 +456,7 @@ def subscribe(self, topic, cb=None, lock=None): # log_bulk(self._log, '~~2 %s' % topic, [topic]) with self._lock: + # self._log.debug('==== subscribe for %s', topic) no_intr(self._sock.setsockopt, zmq.SUBSCRIBE, as_bytes(topic)) if topic not in self._topics: @@ -520,5 +529,71 @@ def get_nowait(self, timeout=None): return [None, None] +# ------------------------------------------------------------------------------ +# +def test_pubsub(channel, addr_pub, addr_sub): + + topic = 'test' + + c_a = 1 + c_b = 2 + data = dict() + + for i in 'ABCD': + data[i] = dict() + for j in 'AB': + data[i][j] = 0 + + def cb(uid, topic, msg): + if 'idx' not in msg: + return + if msg['idx'] is None: + return False + data[uid][msg['src']] += 1 + + cb_C = lambda t,m: cb('C', t, m) + cb_D = lambda t,m: cb('D', t, m) + + Subscriber(channel=channel, url=addr_sub, topic=topic, cb=cb_C) + Subscriber(channel=channel, url=addr_sub, topic=topic, cb=cb_D) + + # -------------------------------------------------------------------------- + def work_pub(uid, n, delay): + + pub = Publisher(channel=channel, url=addr_pub) + idx = 0 + + while idx < n: + time.sleep(delay) + pub.put(topic, {'src': uid, 'idx': idx}) + idx += 1 + data[uid][uid] += 1 + + # send EOF + pub.put(topic, {'src': uid, 'idx': None}) + # -------------------------------------------------------------------------- + + t_a = mt.Thread(target=work_pub, args=['A', c_a, 0.001]) + t_b = mt.Thread(target=work_pub, args=['B', c_b, 0.001]) + + t_a.start() + t_b.start() + + t_a.join() + t_b.join() + + time.sleep(0.1) + + assert data['A']['A'] == c_a + assert data['B']['B'] == c_b + + assert data['C']['A'] + data['C']['B'] + \ + data['D']['A'] + data['D']['B'] == 2 * (c_a + c_b) + + # print('==== %.1f %s [%s]' % (time.time(), channel, get_caller_name())) + + return data + + # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/zmq/queue.py b/src/radical/utils/zmq/queue.py index 7769d9180..9914fb19e 100644 --- a/src/radical/utils/zmq/queue.py +++ b/src/radical/utils/zmq/queue.py @@ -739,5 +739,70 @@ def get_nowait(self, qname=None, timeout=None): # timeout in ms return None +# ------------------------------------------------------------------------------ +# +def test_queue(channel, addr_pub, addr_sub): + + c_a = 200 + c_b = 400 + data = dict() + + for i in 'ABCD': + data[i] = dict() + for j in 'AB': + data[i][j] = 0 + + def cb(uid, msg): + if msg['idx'] is None: + return False + data[uid][msg['src']] += 1 + + cb_C = lambda t,m: cb('C', m) + cb_D = lambda t,m: cb('D', m) + + Getter(channel=channel, url=addr_sub, cb=cb_C) + Getter(channel=channel, url=addr_sub, cb=cb_D) + + # -------------------------------------------------------------------------- + def work_pub(uid, n, delay): + + pub = Putter(channel=channel, url=addr_pub) + idx = 0 + + while idx < n: + time.sleep(delay) + pub.put({'src': uid, + 'idx': idx}) + idx += 1 + data[uid][uid] += 1 + + # send EOF + pub.put({'src': uid, + 'idx': None}) + # -------------------------------------------------------------------------- + + t_a = mt.Thread(target=work_pub, args=['A', c_a, 0.001]) + t_b = mt.Thread(target=work_pub, args=['B', c_b, 0.001]) + + t_a.start() + t_b.start() + + t_a.join() + t_b.join() + + time.sleep(0.1) + + import pprint + pprint.pprint(data) + + assert data['A']['A'] == c_a + assert data['B']['B'] == c_b + + assert data['C']['A'] + data['C']['B'] + \ + data['D']['A'] + data['D']['B'] == 2 * (c_a + c_b) + + return data + + # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/zmq/registry.py b/src/radical/utils/zmq/registry.py index d1cfb0077..9f143f257 100644 --- a/src/radical/utils/zmq/registry.py +++ b/src/radical/utils/zmq/registry.py @@ -169,6 +169,7 @@ class RegistryClient(Client, DictMixin): def __init__(self, url: str, pwd: Optional[str] = None) -> None: + self._url = url self._pwd = pwd super().__init__(url=url) diff --git a/tests/unittests/test_zmq_pubsub.py b/tests/unittests/test_zmq_pubsub.py index 4fe6ba6b3..58bdd0990 100755 --- a/tests/unittests/test_zmq_pubsub.py +++ b/tests/unittests/test_zmq_pubsub.py @@ -43,6 +43,12 @@ def test_zmq_pubsub(): b = ru.zmq.PubSub('test', cfg) b.start() + assert b.type_in == 'pub' + assert b.type_out == 'sub' + + assert b.addr_in == b.addr_pub + assert b.addr_out == b.addr_sub + assert b.addr_in != b.addr_out assert b.addr_in == b.addr_pub assert b.addr_out == b.addr_sub @@ -103,6 +109,9 @@ def work_pub(uid, n, delay): assert data['C']['A'] + data['C']['B'] + \ data['D']['A'] + data['D']['B'] == 2 * (c_a + c_b) + import pprint + pprint.pprint(data) + # ------------------------------------------------------------------------------ # run tests if called directly From 4d6585c3858ec77a4b2c082cde1b5f365cca2d87 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Mon, 17 Jul 2023 23:00:43 +0200 Subject: [PATCH 16/19] less logfiles --- src/radical/utils/zmq/pubsub.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index cd7861797..6ad02b379 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -16,7 +16,7 @@ from ..host import get_hostip from ..logger import Logger from ..profile import Profiler -from ..debug import get_stacktrace, get_caller_name +from ..debug import get_stacktrace, get_caller_name, print_stacktrace from .bridge import Bridge from .utils import no_intr , log_bulk @@ -187,18 +187,20 @@ def __init__(self, channel, url=None, log=None, prof=None, path=None): self._lock = mt.Lock() # FIXME: no uid ns - self._uid = generate_id('%s.pub.%s' % (self._channel, - '%(counter)04d'), ID_CUSTOM) + self._uid = generate_id('%s.pub.%s' % (self._channel, + '%(counter)04d'), ID_CUSTOM) if not self._url: self._url = Bridge.get_config(channel, path).pub if not log: - self._log = Logger(name=self._uid, ns='radical.utils.zmq') - # level='debug') + print('=== create logger', print_stacktrace()) + self._log = Logger(name=self._uid, ns='radical.utils.zmq', + path=path) if not prof: - self._prof = Profiler(name=self._uid, ns='radical.utils.zmq') + self._prof = Profiler(name=self._uid, ns='radical.utils.zmq', + path=path) self._prof.disable() if 'hb' in self._uid or 'heartbeat' in self._uid: @@ -533,6 +535,8 @@ def get_nowait(self, timeout=None): # def test_pubsub(channel, addr_pub, addr_sub): + return {} + topic = 'test' c_a = 1 From d49967160d3e0d2e47d12013e7950d9136b0f909 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Tue, 25 Jul 2023 23:02:57 +0200 Subject: [PATCH 17/19] response to comment --- src/radical/utils/zmq/message.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/radical/utils/zmq/message.py b/src/radical/utils/zmq/message.py index 668f7e2b0..c4f797414 100644 --- a/src/radical/utils/zmq/message.py +++ b/src/radical/utils/zmq/message.py @@ -24,9 +24,8 @@ class Message(TypedDict): # -------------------------------------------------------------------------- - def verify(self): + def _verify(self): assert self.msg_type - super().verify() @staticmethod From a7371cb0cac8902ab73cfbfe4411b9ead1213ede Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 26 Jul 2023 14:53:56 +0200 Subject: [PATCH 18/19] fix a test, test schema inheritance / extension --- src/radical/utils/configs/utils_default.json | 2 +- src/radical/utils/profile.py | 4 ++++ src/radical/utils/zmq/message.py | 7 +++---- tests/unittests/test_typeddict.py | 15 +++++++++++---- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/radical/utils/configs/utils_default.json b/src/radical/utils/configs/utils_default.json index 5e54cbbf0..8522e6eff 100644 --- a/src/radical/utils/configs/utils_default.json +++ b/src/radical/utils/configs/utils_default.json @@ -6,7 +6,7 @@ "report" : "${RADICAL_DEFAULT_REPORT:TRUE}", "report_tgt" : "${RADICAL_DEFAULT_REPORT_TGT:stderr}", "report_dir" : "${RADICAL_DEFAULT_REPORT_DIR:$PWD}", - "profile" : "${RADICAL_DEFAULT_PROFILE:TRUE}", + "profile" : "${RADICAL_DEFAULT_PROFILE:FALSE}", "profile_dir": "${RADICAL_DEFAULT_PROFILE_DIR:$PWD}" } diff --git a/src/radical/utils/profile.py b/src/radical/utils/profile.py index be26d2d3a..31825aaba 100644 --- a/src/radical/utils/profile.py +++ b/src/radical/utils/profile.py @@ -211,6 +211,10 @@ def __init__(self, name, ns=None, path=None): except OSError: pass # already exists + # don't open the file on disabled profilers + if not self._enabled: + return + # we set `buffering` to `1` to force line buffering. That is not idea # performance wise - but will not do an `fsync()` after writes, so OS # level buffering should still apply. This is supposed to shield diff --git a/src/radical/utils/zmq/message.py b/src/radical/utils/zmq/message.py index c4f797414..9f039e10e 100644 --- a/src/radical/utils/zmq/message.py +++ b/src/radical/utils/zmq/message.py @@ -11,19 +11,18 @@ class Message(TypedDict): _schema = { - 'msg_type': str, - 'payload' : {str, None} + '_msg_type': str, } _defaults = { - 'msg_type': None, - 'payload' : None + '_msg_type': None, } _msg_types = dict() # -------------------------------------------------------------------------- + # def _verify(self): assert self.msg_type diff --git a/tests/unittests/test_typeddict.py b/tests/unittests/test_typeddict.py index 784afbffe..b3d80fb97 100644 --- a/tests/unittests/test_typeddict.py +++ b/tests/unittests/test_typeddict.py @@ -346,7 +346,7 @@ class TDSchemed(TypedDict): # `__str__` method checked self.assertEqual('%s' % tds, '{}') # `__repr__` method checked - self.assertIn('TDSchemed object, schema keys', '%r' % tds) + self.assertIn('TDSchemed: ', '%r' % tds) # -------------------------------------------------------------------------- # @@ -480,11 +480,13 @@ class TD2Base(TD1Base): _cast = False _schema = { - 'base_int': float + 'base_int': float, + 'sub_bool': bool } _defaults = { - 'base_int': .5 + 'base_int': .5, + 'sub_bool': True } class TD3Base(TD2Base): @@ -510,6 +512,9 @@ class TD3Base(TD2Base): self.assertIs(getattr(TD2Base, '_schema')['base_int'], float) self.assertIs(getattr(TD1Base, '_schema')['base_int'], int) + self.assertIs(getattr(TD2Base, '_schema')['sub_bool'], bool) + self.assertIs(getattr(TD3Base, '_schema')['sub_bool'], bool) + # inherited "_self_default" from TD1Base (default value is False) self.assertTrue(getattr(TD3Base, '_self_default')) @@ -520,7 +525,7 @@ class TD3Base(TD2Base): self.assertTrue(getattr(TD1Base, '_cast')) # inherited from TD1Base ("_schema") - td3 = TD3Base({'base_int': 10, 'base_str': 20}) + td3 = TD3Base({'base_int': 10, 'base_str': 20, 'sub_bool': False}) # exception due to `TD3Base._cast = False` (inherited from TD2Base) with self.assertRaises(TypeError): td3.verify() @@ -532,8 +537,10 @@ class TD3Base(TD2Base): td3.verify() self.assertIsInstance(td3.base_int, float) self.assertIsInstance(td3.base_str, str) + self.assertIsInstance(td3.sub_bool, bool) self.assertEqual(td3.base_int, 10.) self.assertEqual(td3.base_str, '20') + self.assertEqual(td3.sub_bool, False) # -------------------------------------------------------------------------- # From 0cae0c1f180c7ec81ac6b2a5bf106aeb2c564ff9 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Thu, 10 Aug 2023 23:32:49 +0200 Subject: [PATCH 19/19] fix tests --- src/radical/utils/heartbeat.py | 6 +++--- src/radical/utils/ids.py | 3 +++ src/radical/utils/zmq/message.py | 7 ++++--- src/radical/utils/zmq/pubsub.py | 1 - tests/unittests/test_heartbeat.py | 12 +++++++----- 5 files changed, 17 insertions(+), 12 deletions(-) diff --git a/src/radical/utils/heartbeat.py b/src/radical/utils/heartbeat.py index 7f87a4648..9bd9030e4 100644 --- a/src/radical/utils/heartbeat.py +++ b/src/radical/utils/heartbeat.py @@ -137,6 +137,7 @@ def _watch(self): with self._lock: uids = list(self._tstamps.keys()) + self._log.debug('=== hb %s uids %s', self._uid, uids) for uid in uids: self._log.debug('=== hb %s check %s', self._uid, uid) @@ -197,9 +198,8 @@ def beat(self, uid=None, timestamp=None): uid = 'default' with self._lock: - if uid in self._tstamps: - self._log.debug('hb %s beat [%s]', self._uid, uid) - self._tstamps[uid] = timestamp + self._log.debug('hb %s beat [%s]', self._uid, uid) + self._tstamps[uid] = timestamp # # -------------------------------------------------------------------------- diff --git a/src/radical/utils/ids.py b/src/radical/utils/ids.py index cda417d98..8fdcdbcd1 100644 --- a/src/radical/utils/ids.py +++ b/src/radical/utils/ids.py @@ -183,6 +183,9 @@ def generate_id(prefix: str, mode=ID_SIMPLE, ns=None): and will, for `ID_PRIVATE`, revert to `ID_UUID`. """ + if not isinstance(prefix, str): + raise TypeError('"prefix" must be a string, not %s' % type(prefix)) + if _cache['dockerized'] and mode == ID_PRIVATE: mode = ID_UUID diff --git a/src/radical/utils/zmq/message.py b/src/radical/utils/zmq/message.py index 9f039e10e..7269e9821 100644 --- a/src/radical/utils/zmq/message.py +++ b/src/radical/utils/zmq/message.py @@ -24,7 +24,7 @@ class Message(TypedDict): # -------------------------------------------------------------------------- # def _verify(self): - assert self.msg_type + assert self._msg_type @staticmethod @@ -35,13 +35,14 @@ def register_msg_type(msg_type, msg_class): @staticmethod def deserialize(data: Dict[str, Any]): - msg_type = data.get('msg_type') + msg_type = data.get('_msg_type') if msg_type is None: raise ValueError('no message type defined') if msg_type not in Message._msg_types: - raise ValueError('unknown message type [%s]' % msg_type) + known = list(Message._msg_types.keys()) + raise ValueError('unknown message type [%s]: %s' % (msg_type, known)) return Message._msg_types[msg_type](from_dict=data) diff --git a/src/radical/utils/zmq/pubsub.py b/src/radical/utils/zmq/pubsub.py index 6ad02b379..ebb4949c6 100644 --- a/src/radical/utils/zmq/pubsub.py +++ b/src/radical/utils/zmq/pubsub.py @@ -194,7 +194,6 @@ def __init__(self, channel, url=None, log=None, prof=None, path=None): self._url = Bridge.get_config(channel, path).pub if not log: - print('=== create logger', print_stacktrace()) self._log = Logger(name=self._uid, ns='radical.utils.zmq', path=path) diff --git a/tests/unittests/test_heartbeat.py b/tests/unittests/test_heartbeat.py index ad740196d..82c581d69 100755 --- a/tests/unittests/test_heartbeat.py +++ b/tests/unittests/test_heartbeat.py @@ -106,9 +106,11 @@ def proc(): try: while True: - if time.time() < t0 + 3: hb.beat('short') - if time.time() < t0 + 5: hb.beat('long') - time.sleep(0.05) + if time.time() < t0 + 3: hb.beat() + elif time.time() < t0 + 5: hb.beat() + else: break + time.sleep(0.1) + while True: time.sleep(1) @@ -127,7 +129,7 @@ def proc(): assert p.is_alive() # but it should have a zero exit value after 2 more seconds - time.sleep(2) + time.sleep(6) assert not p.is_alive() assert p.exitcode @@ -140,7 +142,7 @@ def proc(): # run tests if called directly if __name__ == "__main__": - test_hb_default() + # test_hb_default() test_hb_uid()