From 0018bec680b31130a559c3113d959f611d6dc6c3 Mon Sep 17 00:00:00 2001 From: Geish Miladinovic Date: Fri, 22 Oct 2021 14:41:30 +1100 Subject: [PATCH] added ev43 and fix int8 f142 serialise --- .pre-commit-config.yaml | 2 +- README.md | 1 + streaming_data_types/__init__.py | 3 + streaming_data_types/eventdata_ev43.py | 81 +++++++ .../eventdata_ev43/Event43Message.py | 223 ++++++++++++++++++ .../fbschemas/eventdata_ev43/__init__.py | 0 streaming_data_types/logdata_f142.py | 8 +- tests/test_ev43.py | 75 ++++++ tests/test_f142.py | 27 +++ 9 files changed, 415 insertions(+), 5 deletions(-) create mode 100644 streaming_data_types/eventdata_ev43.py create mode 100644 streaming_data_types/fbschemas/eventdata_ev43/Event43Message.py create mode 100644 streaming_data_types/fbschemas/eventdata_ev43/__init__.py create mode 100644 tests/test_ev43.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 038f378..a2b82a1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,7 @@ repos: rev: stable hooks: - id: black - language_version: python3.7 + language_version: python3.6 - repo: https://github.com/pycqa/flake8 rev: 3.8.3 hooks: diff --git a/README.md b/README.md index 420bad7..aecdb09 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ https://github.com/ess-dmsc/streaming-data-types |6s4t|Run stop| |f142|Log data| |ev42|Event data| +|ev43|Event data from multiple pulses| |x5f2|Status messages| |tdct|Timestamps| |ep00|EPICS connection info| diff --git a/streaming_data_types/__init__.py b/streaming_data_types/__init__.py index 2ff37c1..d5e11f0 100644 --- a/streaming_data_types/__init__.py +++ b/streaming_data_types/__init__.py @@ -1,4 +1,5 @@ from streaming_data_types.eventdata_ev42 import deserialise_ev42, serialise_ev42 +from streaming_data_types.eventdata_ev43 import deserialise_ev43, serialise_ev43 from streaming_data_types.histogram_hs00 import deserialise_hs00, serialise_hs00 from streaming_data_types.logdata_f142 import deserialise_f142, serialise_f142 from streaming_data_types.nicos_cache_ns10 import deserialise_ns10, serialise_ns10 @@ -28,6 +29,7 @@ SERIALISERS = { "ev42": serialise_ev42, + "ev43": serialise_ev43, "hs00": serialise_hs00, "f142": serialise_f142, "ns10": serialise_ns10, @@ -47,6 +49,7 @@ DESERIALISERS = { "ev42": deserialise_ev42, + "ev43": deserialise_ev43, "hs00": deserialise_hs00, "f142": deserialise_f142, "ns10": deserialise_ns10, diff --git a/streaming_data_types/eventdata_ev43.py b/streaming_data_types/eventdata_ev43.py new file mode 100644 index 0000000..08b5e3b --- /dev/null +++ b/streaming_data_types/eventdata_ev43.py @@ -0,0 +1,81 @@ +from collections import namedtuple +import flatbuffers +import streaming_data_types.fbschemas.eventdata_ev43.Event43Message as Event43Message +from streaming_data_types.utils import check_schema_identifier +import numpy as np + + +FILE_IDENTIFIER = b"ev43" + + +EventData = namedtuple( + "EventData", + ( + "source_name", + "message_id", + "pulse_time", + "pulse_index", + "time_of_flight", + "detector_id", + ), +) + + +def deserialise_ev43(buffer): + """ + Deserialise FlatBuffer ev43. + + :param buffer: The FlatBuffers buffer. + :return: The deserialised data. + """ + check_schema_identifier(buffer, FILE_IDENTIFIER) + + event = Event43Message.Event43Message.GetRootAsEvent43Message(buffer, 0) + + return EventData( + event.SourceName().decode("utf-8"), + event.MessageId(), + event.PulseTimeAsNumpy(), + event.PulseIndexAsNumpy(), + event.TimeOfFlightAsNumpy(), + event.DetectorIdAsNumpy(), + ) + + +def serialise_ev43( + source_name, message_id, pulse_time, pulse_index, time_of_flight, detector_id +): + """ + Serialise event data as an ev43 FlatBuffers message. + + :param source_name: + :param message_id: + :param pulse_time: + :param pulse_index: + :param time_of_flight: + :param detector_id: + :return: + """ + builder = flatbuffers.Builder(1024) + builder.ForceDefaults(True) + + source = builder.CreateString(source_name) + + pulse_ts_data = builder.CreateNumpyVector(np.asarray(pulse_time).astype(np.uint64)) + pulse_ix_data = builder.CreateNumpyVector(np.asarray(pulse_index).astype(np.uint32)) + tof_data = builder.CreateNumpyVector(np.asarray(time_of_flight).astype(np.uint32)) + det_data = builder.CreateNumpyVector(np.asarray(detector_id).astype(np.uint32)) + + # Build the actual buffer + Event43Message.Event43MessageStart(builder) + Event43Message.Event43MessageAddPulseTime(builder, pulse_ts_data) + Event43Message.Event43MessageAddPulseIndex(builder, pulse_ix_data) + Event43Message.Event43MessageAddDetectorId(builder, det_data) + Event43Message.Event43MessageAddTimeOfFlight(builder, tof_data) + Event43Message.Event43MessageAddMessageId(builder, message_id) + Event43Message.Event43MessageAddSourceName(builder, source) + + data = Event43Message.Event43MessageEnd(builder) + + builder.Finish(data, file_identifier=FILE_IDENTIFIER) + return bytes(builder.Output()) diff --git a/streaming_data_types/fbschemas/eventdata_ev43/Event43Message.py b/streaming_data_types/fbschemas/eventdata_ev43/Event43Message.py new file mode 100644 index 0000000..ad0b58a --- /dev/null +++ b/streaming_data_types/fbschemas/eventdata_ev43/Event43Message.py @@ -0,0 +1,223 @@ +# automatically generated by the FlatBuffers compiler, do not modify + +# namespace: + +import flatbuffers +from flatbuffers.compat import import_numpy + +np = import_numpy() + + +class Event43Message(object): + __slots__ = ["_tab"] + + @classmethod + def GetRootAsEvent43Message(cls, buf, offset): + n = flatbuffers.encode.Get(flatbuffers.packer.uoffset, buf, offset) + x = Event43Message() + x.Init(buf, n + offset) + return x + + @classmethod + def Event43MessageBufferHasIdentifier(cls, buf, offset, size_prefixed=False): + return flatbuffers.util.BufferHasIdentifier( + buf, offset, b"\x65\x76\x34\x33", size_prefixed=size_prefixed + ) + + # Event43Message + def Init(self, buf, pos): + self._tab = flatbuffers.table.Table(buf, pos) + + # Event43Message + def SourceName(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(4)) + if o != 0: + return self._tab.String(o + self._tab.Pos) + return None + + # Event43Message + def MessageId(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(6)) + if o != 0: + return self._tab.Get( + flatbuffers.number_types.Uint64Flags, o + self._tab.Pos + ) + return 0 + + # Event43Message + def PulseTime(self, j): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8)) + if o != 0: + a = self._tab.Vector(o) + return self._tab.Get( + flatbuffers.number_types.Uint64Flags, + a + flatbuffers.number_types.UOffsetTFlags.py_type(j * 8), + ) + return 0 + + # Event43Message + def PulseTimeAsNumpy(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8)) + if o != 0: + return self._tab.GetVectorAsNumpy(flatbuffers.number_types.Uint64Flags, o) + return 0 + + # Event43Message + def PulseTimeLength(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8)) + if o != 0: + return self._tab.VectorLen(o) + return 0 + + # Event43Message + def PulseTimeIsNone(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(8)) + return o == 0 + + # Event43Message + def PulseIndex(self, j): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(10)) + if o != 0: + a = self._tab.Vector(o) + return self._tab.Get( + flatbuffers.number_types.Uint32Flags, + a + flatbuffers.number_types.UOffsetTFlags.py_type(j * 4), + ) + return 0 + + # Event43Message + def PulseIndexAsNumpy(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(10)) + if o != 0: + return self._tab.GetVectorAsNumpy(flatbuffers.number_types.Uint32Flags, o) + return 0 + + # Event43Message + def PulseIndexLength(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(10)) + if o != 0: + return self._tab.VectorLen(o) + return 0 + + # Event43Message + def PulseIndexIsNone(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(10)) + return o == 0 + + # Event43Message + def TimeOfFlight(self, j): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12)) + if o != 0: + a = self._tab.Vector(o) + return self._tab.Get( + flatbuffers.number_types.Uint32Flags, + a + flatbuffers.number_types.UOffsetTFlags.py_type(j * 4), + ) + return 0 + + # Event43Message + def TimeOfFlightAsNumpy(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12)) + if o != 0: + return self._tab.GetVectorAsNumpy(flatbuffers.number_types.Uint32Flags, o) + return 0 + + # Event43Message + def TimeOfFlightLength(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12)) + if o != 0: + return self._tab.VectorLen(o) + return 0 + + # Event43Message + def TimeOfFlightIsNone(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(12)) + return o == 0 + + # Event43Message + def DetectorId(self, j): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14)) + if o != 0: + a = self._tab.Vector(o) + return self._tab.Get( + flatbuffers.number_types.Uint32Flags, + a + flatbuffers.number_types.UOffsetTFlags.py_type(j * 4), + ) + return 0 + + # Event43Message + def DetectorIdAsNumpy(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14)) + if o != 0: + return self._tab.GetVectorAsNumpy(flatbuffers.number_types.Uint32Flags, o) + return 0 + + # Event43Message + def DetectorIdLength(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14)) + if o != 0: + return self._tab.VectorLen(o) + return 0 + + # Event43Message + def DetectorIdIsNone(self): + o = flatbuffers.number_types.UOffsetTFlags.py_type(self._tab.Offset(14)) + return o == 0 + + +def Event43MessageStart(builder): + builder.StartObject(6) + + +def Event43MessageAddSourceName(builder, sourceName): + builder.PrependUOffsetTRelativeSlot( + 0, flatbuffers.number_types.UOffsetTFlags.py_type(sourceName), 0 + ) + + +def Event43MessageAddMessageId(builder, messageId): + builder.PrependUint64Slot(1, messageId, 0) + + +def Event43MessageAddPulseTime(builder, pulseTime): + builder.PrependUOffsetTRelativeSlot( + 2, flatbuffers.number_types.UOffsetTFlags.py_type(pulseTime), 0 + ) + + +def Event43MessageStartPulseTimeVector(builder, numElems): + return builder.StartVector(8, numElems, 8) + + +def Event43MessageAddPulseIndex(builder, pulseIndex): + builder.PrependUOffsetTRelativeSlot( + 3, flatbuffers.number_types.UOffsetTFlags.py_type(pulseIndex), 0 + ) + + +def Event43MessageStartPulseIndexVector(builder, numElems): + return builder.StartVector(4, numElems, 4) + + +def Event43MessageAddTimeOfFlight(builder, timeOfFlight): + builder.PrependUOffsetTRelativeSlot( + 4, flatbuffers.number_types.UOffsetTFlags.py_type(timeOfFlight), 0 + ) + + +def Event43MessageStartTimeOfFlightVector(builder, numElems): + return builder.StartVector(4, numElems, 4) + + +def Event43MessageAddDetectorId(builder, detectorId): + builder.PrependUOffsetTRelativeSlot( + 5, flatbuffers.number_types.UOffsetTFlags.py_type(detectorId), 0 + ) + + +def Event43MessageStartDetectorIdVector(builder, numElems): + return builder.StartVector(4, numElems, 4) + + +def Event43MessageEnd(builder): + return builder.EndObject() diff --git a/streaming_data_types/fbschemas/eventdata_ev43/__init__.py b/streaming_data_types/fbschemas/eventdata_ev43/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/streaming_data_types/logdata_f142.py b/streaming_data_types/logdata_f142.py index 993693c..94a3774 100644 --- a/streaming_data_types/logdata_f142.py +++ b/streaming_data_types/logdata_f142.py @@ -451,11 +451,11 @@ def _serialise_stringarray(builder: flatbuffers.Builder, data: np.ndarray, sourc _map_scalar_type_to_serialiser = { np.dtype("byte"): _serialise_byte, np.dtype("ubyte"): _serialise_ubyte, - np.dtype("int8"): _serialise_short, + np.dtype("int8"): _serialise_byte, np.dtype("int16"): _serialise_short, np.dtype("int32"): _serialise_int, np.dtype("int64"): _serialise_long, - np.dtype("uint8"): _serialise_ushort, + np.dtype("uint8"): _serialise_ubyte, np.dtype("uint16"): _serialise_ushort, np.dtype("uint32"): _serialise_uint, np.dtype("uint64"): _serialise_ulong, @@ -466,11 +466,11 @@ def _serialise_stringarray(builder: flatbuffers.Builder, data: np.ndarray, sourc _map_array_type_to_serialiser = { np.dtype("byte"): _serialise_bytearray, np.dtype("ubyte"): _serialise_ubytearray, - np.dtype("int8"): _serialise_shortarray, + np.dtype("int8"): _serialise_bytearray, np.dtype("int16"): _serialise_shortarray, np.dtype("int32"): _serialise_intarray, np.dtype("int64"): _serialise_longarray, - np.dtype("uint8"): _serialise_ushortarray, + np.dtype("uint8"): _serialise_ubytearray, np.dtype("uint16"): _serialise_ushortarray, np.dtype("uint32"): _serialise_uintarray, np.dtype("uint64"): _serialise_ulongarray, diff --git a/tests/test_ev43.py b/tests/test_ev43.py new file mode 100644 index 0000000..946d7ed --- /dev/null +++ b/tests/test_ev43.py @@ -0,0 +1,75 @@ +import numpy as np +import pytest +from streaming_data_types.eventdata_ev43 import serialise_ev43, deserialise_ev43 +from streaming_data_types import SERIALISERS, DESERIALISERS +from streaming_data_types.exceptions import WrongSchemaException + + +class TestSerialisationEv42: + def test_serialises_and_deserialises_ev43_message_correctly(self): + """ + Round-trip to check what we serialise is what we get back. + """ + original_entry = { + "source_name": "some_source", + "message_id": 123456, + "pulse_time": [567890, 568890], + "pulse_index": [0, 4], + "time_of_flight": [1, 2, 3, 4, 5, 6, 7, 8, 9], + "detector_id": [10, 20, 30, 40, 50, 60, 70, 80, 90], + } + + buf = serialise_ev43(**original_entry) + entry = deserialise_ev43(buf) + + assert entry.source_name == original_entry["source_name"] + assert entry.message_id == original_entry["message_id"] + assert np.array_equal(entry.pulse_time, original_entry["pulse_time"]) + assert np.array_equal(entry.pulse_index, original_entry["pulse_index"]) + assert np.array_equal(entry.time_of_flight, original_entry["time_of_flight"]) + assert np.array_equal(entry.detector_id, original_entry["detector_id"]) + + def test_serialises_and_deserialises_ev43_message_correctly_for_numpy_arrays(self): + """ + Round-trip to check what we serialise is what we get back. + """ + original_entry = { + "source_name": "some_source", + "message_id": 123456, + "pulse_time": np.array([567890, 568890]), + "pulse_index": np.array([0, 4]), + "time_of_flight": np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]), + "detector_id": np.array([10, 20, 30, 40, 50, 60, 70, 80, 90]), + } + + buf = serialise_ev43(**original_entry) + entry = deserialise_ev43(buf) + + assert entry.source_name == original_entry["source_name"] + assert entry.message_id == original_entry["message_id"] + assert np.array_equal(entry.pulse_time, original_entry["pulse_time"]) + assert np.array_equal(entry.pulse_index, original_entry["pulse_index"]) + assert np.array_equal(entry.time_of_flight, original_entry["time_of_flight"]) + assert np.array_equal(entry.detector_id, original_entry["detector_id"]) + + def test_if_buffer_has_wrong_id_then_throws(self): + original_entry = { + "source_name": "some_source", + "message_id": 123456, + "pulse_time": [567890, 568890], + "pulse_index": [0, 4], + "time_of_flight": [1, 2, 3, 4, 5, 6, 7, 8, 9], + "detector_id": [10, 20, 30, 40, 50, 60, 70, 80, 90], + } + buf = serialise_ev43(**original_entry) + + # Manually hack the id + buf = bytearray(buf) + buf[4:8] = b"1234" + + with pytest.raises(WrongSchemaException): + deserialise_ev43(buf) + + def test_schema_type_is_in_global_serialisers_list(self): + assert "ev43" in SERIALISERS + assert "ev43" in DESERIALISERS diff --git a/tests/test_f142.py b/tests/test_f142.py index e6221e5..b0d9796 100644 --- a/tests/test_f142.py +++ b/tests/test_f142.py @@ -25,6 +25,19 @@ def test_serialises_and_deserialises_integer_f142_message_correctly(self): == self.original_entry["timestamp_unix_ns"] ) + def test_serialises_and_deserialises_byte_f142_message_correctly(self): + byte_log = { + "source_name": "some_source", + "value": 0x7F, + "timestamp_unix_ns": 1585332414000000000, + } + buf = serialise_f142(**byte_log) + deserialised_tuple = deserialise_f142(buf) + + assert deserialised_tuple.source_name == byte_log["source_name"] + assert deserialised_tuple.value == byte_log["value"] + assert deserialised_tuple.timestamp_unix_ns == byte_log["timestamp_unix_ns"] + def test_serialises_and_deserialises_float_f142_message_correctly(self): float_log = { "source_name": "some_source", @@ -91,6 +104,20 @@ def test_serialises_and_deserialises_numpy_array_integers_correctly(self): assert np.array_equal(deserialised_tuple.value, array_log["value"]) assert deserialised_tuple.timestamp_unix_ns == array_log["timestamp_unix_ns"] + def test_serialises_and_deserialises_numpy_array_preserves_byte_type_correctly( + self, + ): + array_log = { + "source_name": "some_source", + "value": np.array([1, 2, 3], dtype=np.uint8), + "timestamp_unix_ns": 1585332414000000000, + } + buf = serialise_f142(**array_log) + deserialised_tuple = deserialise_f142(buf) + + assert np.array_equal(deserialised_tuple.value, array_log["value"]) + assert deserialised_tuple.value.dtype == array_log["value"].dtype + def test_serialises_and_deserialises_numpy_array_preserves_integer_type_correctly( self, ):