From 5bb4024d225d458e409ee7275b9238b9ebce1b06 Mon Sep 17 00:00:00 2001 From: Andre Merzky Date: Wed, 24 Jul 2024 23:44:45 +0200 Subject: [PATCH] add complete ru zmq examples --- examples/zmq/pubsub/ru_zmq_pubsub.py | 86 ++++++++++++++++++++++++++++ examples/zmq/queue/ru_zmq_queue.py | 83 +++++++++++++++++++++++++++ 2 files changed, 169 insertions(+) create mode 100755 examples/zmq/pubsub/ru_zmq_pubsub.py create mode 100755 examples/zmq/queue/ru_zmq_queue.py diff --git a/examples/zmq/pubsub/ru_zmq_pubsub.py b/examples/zmq/pubsub/ru_zmq_pubsub.py new file mode 100755 index 000000000..65b3d9621 --- /dev/null +++ b/examples/zmq/pubsub/ru_zmq_pubsub.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 + +import time + +import threading as mt + +import radical.utils as ru + +CHANNEL = 'test' +TOPIC = 'test' + + +# ------------------------------------------------------------------------------ +# +def put(channel, uid, url, n): + + put = ru.zmq.Putter(channel, url) + for i in range(n): + put.put(ru.as_bytes('%s: message %d' % (uid, i))) + print('%s: message %d' % (uid, i)) + time.sleep(0.1) + + put.put(ru.as_bytes('%s: STOP' % uid)) + + print('%s: done' % uid) + + +# ------------------------------------------------------------------------------ +# +def get(channel, uid, url): + + cont = True + get = ru.zmq.Getter(channel, url) + + while cont: + + msgs = get.get() + if not msgs: + continue + + print('%s: %s' % (uid, ru.as_string(msgs))) + + for msg in msgs: + if 'STOP' in ru.as_string(msg): + cont = False + + print('%s: done' % uid) + + +# ------------------------------------------------------------------------------ +# +def main(): + + bridge = ru.zmq.Queue(CHANNEL) + bridge.start() + + threads = list() + + # start some putters and getters + # NOTE: start more putters than getters for clean termination + for i in range(4): + t = mt.Thread(target=put, args=(CHANNEL, 'put.%d' % i, bridge.addr_put, 5)) + t.daemon = True + t.start() + threads.append(t) + + for i in range(3): + t = mt.Thread(target=get, args=(CHANNEL, 'get.%d' % i, bridge.addr_get)) + t.daemon = True + t.start() + threads.append(t) + + # wait for completion + for t in threads: + t.join() + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + main() + + +# ------------------------------------------------------------------------------ + diff --git a/examples/zmq/queue/ru_zmq_queue.py b/examples/zmq/queue/ru_zmq_queue.py new file mode 100755 index 000000000..aa64f243b --- /dev/null +++ b/examples/zmq/queue/ru_zmq_queue.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 + +import time + +import threading as mt + +import radical.utils as ru + +CHANNEL = 'channel' +TOPIC = 'test' + + +# ------------------------------------------------------------------------------ +# +def pub(uid, url, topic, n): + + pub = ru.zmq.Publisher(topic, url) + for i in range(n): + pub.put(topic, ru.as_bytes('%s: message %d' % (uid, i))) + print('%s: message %d' % (uid, i)) + time.sleep(0.1) + + pub.put(topic, ru.as_bytes('%s: STOP' % uid)) + + print('%s: done' % uid) + + +# ------------------------------------------------------------------------------ +# +def sub(uid, url, topic): + + cont = True + sub = ru.zmq.Subscriber(topic, url) + sub.subscribe(topic) + + while cont: + topic, msg = sub.get() + print('%s: %s' % (uid, ru.as_string(msg))) + + if 'STOP' in ru.as_string(msg): + cont = False + + print('%s: done' % uid) + + +# ------------------------------------------------------------------------------ +# +def main(): + + bridge = ru.zmq.PubSub(CHANNEL) + bridge.start() + + threads = list() + + + # start some subscribers and publishers + for i in range(3): + t = mt.Thread(target=sub, args=('sub.%d' % i, bridge.addr_sub, TOPIC)) + t.daemon = True + t.start() + threads.append(t) + + for i in range(4): + t = mt.Thread(target=pub, args=('pub.%d' % i, bridge.addr_pub, TOPIC, 5)) + t.daemon = True + t.start() + threads.append(t) + + + # wait for completion + for t in threads: + t.join() + + +# ------------------------------------------------------------------------------ +# +if __name__ == '__main__': + + main() + + +# ------------------------------------------------------------------------------ +