Skip to content

Commit

Permalink
Merge pull request #24 from ess-dmsc/DM-1979_support_more_types_for_h…
Browse files Browse the repository at this point in the history
…s00_data

Support more types for hs00 data
  • Loading branch information
matthew-d-jones authored Jul 16, 2020
2 parents 85341a3 + 1eaf86f commit 752440e
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 91 deletions.
186 changes: 98 additions & 88 deletions streaming_data_types/histogram_hs00.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import operator
import flatbuffers
import numpy
import streaming_data_types.fbschemas.histogram_hs00.ArrayFloat as ArrayFloat
import streaming_data_types.fbschemas.histogram_hs00.ArrayDouble as ArrayDouble
import streaming_data_types.fbschemas.histogram_hs00.ArrayUInt as ArrayUInt
import streaming_data_types.fbschemas.histogram_hs00.ArrayULong as ArrayULong
import streaming_data_types.fbschemas.histogram_hs00.DimensionMetaData as DimensionMetaData
import streaming_data_types.fbschemas.histogram_hs00.EventHistogram as EventHistogram
Expand All @@ -13,6 +15,17 @@
FILE_IDENTIFIER = b"hs00"


_array_for_type = {
Array.ArrayUInt: ArrayUInt.ArrayUInt(),
Array.ArrayULong: ArrayULong.ArrayULong(),
Array.ArrayFloat: ArrayFloat.ArrayFloat(),
}


def _create_array_object_for_type(array_type):
return _array_for_type.get(array_type, ArrayDouble.ArrayDouble())


def deserialise_hs00(buffer):
"""
Deserialise flatbuffer hs10 into a histogram.
Expand All @@ -25,12 +38,9 @@ def deserialise_hs00(buffer):

dims = []
for i in range(event_hist.DimMetadataLength()):
bins_fb = ArrayDouble.ArrayDouble()
if (
event_hist.DimMetadata(i).BinBoundariesType() == Array.ArrayUInt
or event_hist.DimMetadata(i).BinBoundariesType() == Array.ArrayULong
):
bins_fb = ArrayULong.ArrayULong()
bins_fb = _create_array_object_for_type(
event_hist.DimMetadata(i).BinBoundariesType()
)

# Get bins
bins_offset = event_hist.DimMetadata(i).BinBoundaries()
Expand All @@ -51,13 +61,7 @@ def deserialise_hs00(buffer):

metadata_timestamp = event_hist.LastMetadataTimestamp()

data_fb = ArrayDouble.ArrayDouble()
if (
event_hist.DataType() == Array.ArrayUInt
or event_hist.DataType() == Array.ArrayULong
):
data_fb = ArrayULong.ArrayULong()

data_fb = _create_array_object_for_type(event_hist.DataType())
data_offset = event_hist.Data()
data_fb.Init(data_offset.Bytes, data_offset.Pos)
shape = event_hist.CurrentShapeAsNumpy().tolist()
Expand All @@ -66,12 +70,7 @@ def deserialise_hs00(buffer):
# Get the errors
errors_offset = event_hist.Errors()
if errors_offset:
errors_fb = ArrayDouble.ArrayDouble()
if (
event_hist.DataType() == Array.ArrayUInt
or event_hist.DataType() == Array.ArrayULong
):
errors_fb = ArrayULong.ArrayULong()
errors_fb = _create_array_object_for_type(event_hist.ErrorsType())
errors_fb.Init(errors_offset.Bytes, errors_offset.Pos)
errors = errors_fb.ValueAsNumpy().reshape(shape)
else:
Expand All @@ -94,30 +93,7 @@ def _serialise_metadata(builder, length, edges, unit, label):
unit_offset = builder.CreateString(unit)
label_offset = builder.CreateString(label)

if isinstance(edges[0], int) or (
isinstance(edges, numpy.ndarray) and numpy.issubdtype(edges[0], numpy.int64)
):
bin_type = Array.ArrayULong
ArrayULong.ArrayULongStartValueVector(builder, len(edges))
# FlatBuffers builds arrays backwards
for x in reversed(edges):
builder.PrependUint64(x)
bins_vector = builder.EndVector(len(edges))
# Add the bins
ArrayULong.ArrayULongStart(builder)
ArrayULong.ArrayULongAddValue(builder, bins_vector)
bins_offset = ArrayULong.ArrayULongEnd(builder)
else:
bin_type = Array.ArrayDouble
ArrayDouble.ArrayDoubleStartValueVector(builder, len(edges))
# FlatBuffers builds arrays backwards
for x in reversed(edges):
builder.PrependFloat64(x)
bins_vector = builder.EndVector(len(edges))
# Add the bins
ArrayDouble.ArrayDoubleStart(builder)
ArrayDouble.ArrayDoubleAddValue(builder, bins_vector)
bins_offset = ArrayDouble.ArrayDoubleEnd(builder)
bins_offset, bin_type = _serialise_array(builder, len(edges), edges)

DimensionMetaData.DimensionMetaDataStart(builder)
DimensionMetaData.DimensionMetaDataAddLength(builder, length)
Expand All @@ -132,6 +108,9 @@ def serialise_hs00(histogram):
"""
Serialise a histogram as an hs00 FlatBuffers message.
If arrays are provided as numpy arrays with type np.uint32, np.uint64, np.float32
or np.float64 then type is preserved in output buffer.
:param histogram: A dictionary containing the histogram to serialise.
"""
source_offset = None
Expand Down Expand Up @@ -170,54 +149,13 @@ def serialise_hs00(histogram):

# Build the data
data_len = reduce(operator.mul, histogram["current_shape"], 1)
flattened_data = numpy.asarray(histogram["data"]).flatten()

if numpy.issubdtype(flattened_data[0], numpy.int64):
data_type = Array.ArrayULong
ArrayULong.ArrayULongStartValueVector(builder, data_len)
# FlatBuffers builds arrays backwards
for x in reversed(flattened_data):
builder.PrependUint64(x)
data_vector = builder.EndVector(data_len)
ArrayULong.ArrayULongStart(builder)
ArrayULong.ArrayULongAddValue(builder, data_vector)
data_offset = ArrayULong.ArrayULongEnd(builder)
else:
data_type = Array.ArrayDouble
ArrayDouble.ArrayDoubleStartValueVector(builder, data_len)
# FlatBuffers builds arrays backwards
for x in reversed(flattened_data):
builder.PrependFloat64(x)
data_vector = builder.EndVector(data_len)
ArrayDouble.ArrayDoubleStart(builder)
ArrayDouble.ArrayDoubleAddValue(builder, data_vector)
data_offset = ArrayDouble.ArrayDoubleEnd(builder)
data_offset, data_type = _serialise_array(builder, data_len, histogram["data"])

errors_offset = None
if "errors" in histogram:
if isinstance(histogram["errors"], numpy.ndarray):
flattened_data = histogram["errors"].flatten()
else:
flattened_data = numpy.asarray(histogram["errors"]).flatten()

if numpy.issubdtype(flattened_data[0], numpy.int64):
error_type = Array.ArrayULong
ArrayULong.ArrayULongStartValueVector(builder, data_len)
for x in reversed(flattened_data):
builder.PrependUint64(x)
errors = builder.EndVector(data_len)
ArrayULong.ArrayULongStart(builder)
ArrayULong.ArrayULongAddValue(builder, errors)
errors_offset = ArrayULong.ArrayULongEnd(builder)
else:
error_type = Array.ArrayDouble
ArrayDouble.ArrayDoubleStartValueVector(builder, data_len)
for x in reversed(flattened_data):
builder.PrependFloat64(x)
errors = builder.EndVector(data_len)
ArrayDouble.ArrayDoubleStart(builder)
ArrayDouble.ArrayDoubleAddValue(builder, errors)
errors_offset = ArrayDouble.ArrayDoubleEnd(builder)
errors_offset, error_type = _serialise_array(
builder, data_len, histogram["errors"]
)

# Build the actual buffer
EventHistogram.EventHistogramStart(builder)
Expand All @@ -244,3 +182,75 @@ def serialise_hs00(histogram):
buffer = builder.Output()
buffer[4:8] = FILE_IDENTIFIER
return bytes(buffer)


def _serialise_array(builder, data_len, data):
flattened_data = numpy.asarray(data).flatten()

# Carefully preserve explicitly supported types
if numpy.issubdtype(flattened_data.dtype, numpy.uint32):
return _serialise_uint32(builder, data_len, flattened_data)
if numpy.issubdtype(flattened_data.dtype, numpy.uint64):
return _serialise_uint64(builder, data_len, flattened_data)
if numpy.issubdtype(flattened_data.dtype, numpy.float32):
return _serialise_float(builder, data_len, flattened_data)
if numpy.issubdtype(flattened_data.dtype, numpy.float64):
return _serialise_double(builder, data_len, flattened_data)

# Otherwise if it looks like an int then use uint64, or use double as last resort
if numpy.issubdtype(flattened_data.dtype, numpy.int64):
return _serialise_uint64(builder, data_len, flattened_data)

return _serialise_double(builder, data_len, flattened_data)


def _serialise_float(builder, data_len, flattened_data):
data_type = Array.ArrayFloat
ArrayFloat.ArrayFloatStartValueVector(builder, data_len)
# FlatBuffers builds arrays backwards
for x in reversed(flattened_data):
builder.PrependFloat32(x)
data_vector = builder.EndVector(data_len)
ArrayFloat.ArrayFloatStart(builder)
ArrayFloat.ArrayFloatAddValue(builder, data_vector)
data_offset = ArrayFloat.ArrayFloatEnd(builder)
return data_offset, data_type


def _serialise_double(builder, data_len, flattened_data):
data_type = Array.ArrayDouble
ArrayDouble.ArrayDoubleStartValueVector(builder, data_len)
# FlatBuffers builds arrays backwards
for x in reversed(flattened_data):
builder.PrependFloat64(x)
data_vector = builder.EndVector(data_len)
ArrayDouble.ArrayDoubleStart(builder)
ArrayDouble.ArrayDoubleAddValue(builder, data_vector)
data_offset = ArrayDouble.ArrayDoubleEnd(builder)
return data_offset, data_type


def _serialise_uint32(builder, data_len, flattened_data):
data_type = Array.ArrayUInt
ArrayUInt.ArrayUIntStartValueVector(builder, data_len)
# FlatBuffers builds arrays backwards
for x in reversed(flattened_data):
builder.PrependUint32(x)
data_vector = builder.EndVector(data_len)
ArrayUInt.ArrayUIntStart(builder)
ArrayUInt.ArrayUIntAddValue(builder, data_vector)
data_offset = ArrayUInt.ArrayUIntEnd(builder)
return data_offset, data_type


def _serialise_uint64(builder, data_len, flattened_data):
data_type = Array.ArrayULong
ArrayULong.ArrayULongStartValueVector(builder, data_len)
# FlatBuffers builds arrays backwards
for x in reversed(flattened_data):
builder.PrependUint64(x)
data_vector = builder.EndVector(data_len)
ArrayULong.ArrayULongStart(builder)
ArrayULong.ArrayULongAddValue(builder, data_vector)
data_offset = ArrayULong.ArrayULongEnd(builder)
return data_offset, data_type
78 changes: 75 additions & 3 deletions tests/test_hs00.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@
from streaming_data_types import SERIALISERS, DESERIALISERS


def create_test_data_with_type(numpy_type):
return {
"source": "some_source",
"timestamp": 123456,
"current_shape": [5],
"dim_metadata": [
{
"length": 5,
"unit": "m",
"label": "some_label",
"bin_boundaries": np.array([0, 1, 2, 3, 4, 5]).astype(numpy_type),
}
],
"last_metadata_timestamp": 123456,
"data": np.array([1, 2, 3, 4, 5]).astype(numpy_type),
"errors": np.array([5, 4, 3, 2, 1]).astype(numpy_type),
"info": "info_string",
}


class TestSerialisationHs00:
def _check_metadata_for_one_dimension(self, data, original_data):
assert np.array_equal(data["bin_boundaries"], original_data["bin_boundaries"])
Expand Down Expand Up @@ -151,7 +171,7 @@ def test_if_buffer_has_wrong_id_then_throws(self):
deserialise_hs00(buf)

def test_serialises_and_deserialises_hs00_message_correctly_for_int_array_data(
self
self,
):
"""
Round-trip to check what we serialise is what we get back.
Expand Down Expand Up @@ -190,8 +210,60 @@ def test_serialises_and_deserialises_hs00_message_correctly_for_int_array_data(
hist["last_metadata_timestamp"] == original_hist["last_metadata_timestamp"]
)

def test_serialise_and_deserialise_hs00_message_returns_uint32_type(self):
original_hist = create_test_data_with_type(np.uint32)

buf = serialise_hs00(original_hist)
hist = deserialise_hs00(buf)

assert np.issubdtype(
hist["dim_metadata"][0]["bin_boundaries"].dtype,
original_hist["dim_metadata"][0]["bin_boundaries"].dtype,
)
assert np.issubdtype(hist["data"].dtype, original_hist["data"].dtype)
assert np.issubdtype(hist["errors"].dtype, original_hist["errors"].dtype)

def test_serialise_and_deserialise_hs00_message_returns_uint64_type(self):
original_hist = create_test_data_with_type(np.uint64)

buf = serialise_hs00(original_hist)
hist = deserialise_hs00(buf)

assert np.issubdtype(
hist["dim_metadata"][0]["bin_boundaries"].dtype,
original_hist["dim_metadata"][0]["bin_boundaries"].dtype,
)
assert np.issubdtype(hist["data"].dtype, original_hist["data"].dtype)
assert np.issubdtype(hist["errors"].dtype, original_hist["errors"].dtype)

def test_serialise_and_deserialise_hs00_message_returns_float32_type(self):
original_hist = create_test_data_with_type(np.float32)

buf = serialise_hs00(original_hist)
hist = deserialise_hs00(buf)

assert np.issubdtype(
hist["dim_metadata"][0]["bin_boundaries"].dtype,
original_hist["dim_metadata"][0]["bin_boundaries"].dtype,
)
assert np.issubdtype(hist["data"].dtype, original_hist["data"].dtype)
assert np.issubdtype(hist["errors"].dtype, original_hist["errors"].dtype)

def test_serialise_and_deserialise_hs00_message_returns_float64_type(self):
original_hist = create_test_data_with_type(np.float64)

buf = serialise_hs00(original_hist)
hist = deserialise_hs00(buf)

assert np.issubdtype(
hist["dim_metadata"][0]["bin_boundaries"].dtype,
original_hist["dim_metadata"][0]["bin_boundaries"].dtype,
)
assert np.issubdtype(hist["data"].dtype, original_hist["data"].dtype)
assert np.issubdtype(hist["errors"].dtype, original_hist["errors"].dtype)

def test_serialises_and_deserialises_hs00_message_correctly_when_float_input_is_not_ndarray(
self
self,
):
"""
Round-trip to check what we serialise is what we get back.
Expand Down Expand Up @@ -239,7 +311,7 @@ def test_serialises_and_deserialises_hs00_message_correctly_when_float_input_is_
)

def test_serialises_and_deserialises_hs00_message_correctly_when_int_input_is_not_ndarray(
self
self,
):
"""
Round-trip to check what we serialise is what we get back.
Expand Down

0 comments on commit 752440e

Please sign in to comment.