Skip to content

Commit

Permalink
add missing file, use new methods
Browse files Browse the repository at this point in the history
  • Loading branch information
andre-merzky committed Apr 5, 2024
1 parent 752a5e6 commit 9e38697
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 176 deletions.
3 changes: 1 addition & 2 deletions src/radical/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@
from .profile import PROF_KEY_MAX

from .json_io import read_json, read_json_str, write_json
from .json_io import parse_json, parse_json_str, metric_expand
from .json_io import register_json_class, dumps_json
from .json_io import parse_json, parse_json_str, dumps_json
from .which import which
from .tracer import trace, untrace
from .get_version import get_version
Expand Down
76 changes: 5 additions & 71 deletions src/radical/utils/json_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,8 @@
import re
import json

from .misc import as_string, ru_open


_json_classes = dict()

def register_json_class(cls, check, convert):

global _json_classes
_json_classes[cls.__name__] = [check, convert]


class _json_encoder(json.JSONEncoder):

def default(self, o):
for cls, (check, convert) in _json_classes.items():
if check(o):
return convert(o)
return super().default(o)
from .serialize import to_json, from_json
from .misc import as_string, ru_open


# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -78,8 +62,7 @@ def write_json(data, fname):
fname = tmp

with ru_open(fname, 'w') as f:
json.dump(data, f, sort_keys=True, indent=4, ensure_ascii=False,
cls=_json_encoder)
f.write(to_json(data))
f.write('\n')
f.flush()

Expand All @@ -92,8 +75,7 @@ def dumps_json(data):
'''

return json.dumps(data, sort_keys=True, indent=4, ensure_ascii=False,
cls=_json_encoder)
return to_json(data)


# ------------------------------------------------------------------------------
Expand All @@ -116,7 +98,7 @@ def parse_json(json_str, filter_comments=True):
content += re.sub(r'^\s*#.*$', '', line)
content += '\n'

return json.loads(content)
return from_json(content)


# ------------------------------------------------------------------------------
Expand All @@ -129,53 +111,5 @@ def parse_json_str(json_str):
return as_string(parse_json(json_str))


# ------------------------------------------------------------------------------
#
def metric_expand(data):
'''
iterate through the given dictionary, and when encountering a key string of
the form `ru.XYZ` or `rp.ABC`, expand them to their actually defined values.
This the following dict::
{
"ru.EVENT" : "foo"
}
becomes::
{
2 : "foo"
}
'''

try : import radical.pilot as rp # noqa: F401
except: pass
try : import radical.saga as rs # noqa: F401
except: pass
try : import radical.utils as ru # noqa: F401
except: pass

if isinstance(data, str):

if data.count('.') == 1:
elems = data.split('.')
if len(elems[0]) == 2 and elems[0][0] == 'r':
try:
data = eval(data)
finally:

pass
return data

elif isinstance(data, list):
return [metric_expand(elem) for elem in data]

elif isinstance(data, dict):
return {metric_expand(k) : metric_expand(v) for k,v in data.items()}

else:
return data


# ------------------------------------------------------------------------------

147 changes: 147 additions & 0 deletions src/radical/utils/serialize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@

import json
import msgpack


class _CType:

def __init__(self, ctype, encode, decode):

self.ctype : type = ctype
self.encode: callable = encode
self.decode: callable = decode

_ctypes = dict()


# ------------------------------------------------------------------------------
#
def register_serialization(cls, encode, decode):
'''
register a class for json and msgpack serialization / deserialization.
Args:
cls (type): class type to register
encode (callable): converts class instance into encodable data structure
decode (callable): recreates the class instance from that data structure
'''

global _ctypes
_ctypes[cls.__name__] = _CType(cls, encode, decode)


# ------------------------------------------------------------------------------
#
class _json_encoder(json.JSONEncoder):
'''
internal methods to encode registered classes to json
'''
def default(self, obj):
for cname,methods in _ctypes.items():
if isinstance(obj, methods.ctype):
return {'__%s__' % cname: True,
'as_str' : methods.encode(obj)}
return super().default(obj)

# ------------------------------------------------------------------------------
#
def _json_decoder(obj):
'''
internal methods to decode registered classes from json
'''
for cname, methods in _ctypes.items():
if '__%s__' % cname in obj:
print('found %s' % cname)
return methods.decode(obj['as_str'])
return obj


# ------------------------------------------------------------------------------
#
def _msgpack_encoder(obj):
'''
internal methods to encode registered classes to msgpack
'''
for cname,methods in _ctypes.items():
if isinstance(obj, methods.ctype):
return {'__%s__' % cname: True, 'as_str': methods.encode(obj)}
return obj


# ------------------------------------------------------------------------------
#
def _msgpack_decoder(obj):
'''
internal methods to decode registered classes from msgpack
'''
for cname,methods in _ctypes.items():
if '__%s__' % cname in obj:
return methods.decode(obj['as_str'])
return obj


# ------------------------------------------------------------------------------
#
def to_json(data):
'''
convert data to json, using registered classes for serialization
Args:
data (object): data to be serialized
Returns:
str: json serialized data
'''
return json.dumps(data, sort_keys=True, indent=4, ensure_ascii=False,
cls=_json_encoder)


# ------------------------------------------------------------------------------
#
def from_json(data):
'''
convert json data to python data structures, using registered classes for
deserialization
Args:
data (str): json data to be deserialized
Returns:
object: deserialized data
'''
return json.loads(data, object_hook=_json_decoder)


# ------------------------------------------------------------------------------
#
def to_msgpack(data):
'''
convert data to msgpack, using registered classes for serialization
Args:
data (object): data to be serialized
Returns:
bytes: msgpack serialized data
'''
return msgpack.packb(data, default=_msgpack_encoder, use_bin_type=True)


# ------------------------------------------------------------------------------
#
def from_msgpack(data):
'''
convert msgpack data to python data structures, using registered classes for
deserialization
Args:
data (bytes): msgpack data to be deserialized
Returns:
object: deserialized data
'''
return msgpack.unpackb(data, object_hook=_msgpack_decoder, raw=False)


# ------------------------------------------------------------------------------

16 changes: 8 additions & 8 deletions src/radical/utils/zmq/client.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@

import zmq
import msgpack

from typing import Any

import threading as mt

from ..json_io import read_json
from ..misc import as_string
from .utils import no_intr, sock_connect
from ..json_io import read_json
from ..misc import as_string
from ..serialize import to_msgpack, from_msgpack
from .utils import no_intr, sock_connect


# ------------------------------------------------------------------------------
Expand Down Expand Up @@ -61,14 +61,14 @@ def url(self) -> str:
#
def request(self, cmd: str, *args: Any, **kwargs: Any) -> Any:

req = msgpack.packb({'cmd' : cmd,
'args' : args,
'kwargs': kwargs})
req = to_msgpack({'cmd' : cmd,
'args' : args,
'kwargs': kwargs})

no_intr(self._sock.send, req)

msg = no_intr(self._sock.recv)
res = as_string(msgpack.unpackb(msg))
res = as_string(from_msgpack(msg))

# FIXME: assert proper res structure

Expand Down
9 changes: 5 additions & 4 deletions src/radical/utils/zmq/message.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@

from typing import Dict, Any

import msgpack

from ..typeddict import TypedDict
from ..serialize import to_msgpack, from_msgpack, register_serialization


# ------------------------------------------------------------------------------
#
class Message(TypedDict):

# FIXME: register serialization methods for all message types

_schema = {
'_msg_type': str,
}
Expand Down Expand Up @@ -48,11 +49,11 @@ def deserialize(data: Dict[str, Any]):


def packb(self):
return msgpack.packb(self)
return to_msgpack(self)

@staticmethod
def unpackb(bdata):
return Message.deserialize(msgpack.unpackb(bdata))
return Message.deserialize(from_msgpack(bdata))


# ------------------------------------------------------------------------------
Expand Down
9 changes: 5 additions & 4 deletions src/radical/utils/zmq/pipe.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

import zmq
import msgpack

from ..serialize import to_msgpack, from_msgpack

MODE_PUSH = 'push'
MODE_PULL = 'pull'
Expand Down Expand Up @@ -121,7 +122,7 @@ def put(self, msg):
'''

assert self._mode == MODE_PUSH
self._sock.send(msgpack.packb(msg))
self._sock.send(to_msgpack(msg))


# --------------------------------------------------------------------------
Expand All @@ -132,7 +133,7 @@ def get(self):
'''

assert self._mode == MODE_PULL
return msgpack.unpackb(self._sock.recv())
return from_msgpack(self._sock.recv())


# --------------------------------------------------------------------------
Expand All @@ -150,7 +151,7 @@ def get_nowait(self, timeout: float = 0):
socks = dict(self._poller.poll(timeout=int(timeout * 1000)))

if self._sock in socks:
return msgpack.unpackb(self._sock.recv())
return from_msgpack(self._sock.recv())


# ------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 9e38697

Please sign in to comment.