diff --git a/src/main/java/com/uber/cadence/internal/common/InternalUtils.java b/src/main/java/com/uber/cadence/internal/common/InternalUtils.java index 73498885a..cc59d52ee 100644 --- a/src/main/java/com/uber/cadence/internal/common/InternalUtils.java +++ b/src/main/java/com/uber/cadence/internal/common/InternalUtils.java @@ -18,6 +18,7 @@ package com.uber.cadence.internal.common; import com.google.common.base.Defaults; +import com.google.common.collect.Lists; import com.uber.cadence.DataBlob; import com.uber.cadence.History; import com.uber.cadence.HistoryEvent; @@ -31,7 +32,6 @@ import com.uber.cadence.workflow.WorkflowMethod; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -153,23 +153,41 @@ public static SearchAttributes convertMapToSearchAttributes( return new SearchAttributes().setIndexedFields(mapOfByteBuffer); } - // This method deserialize the DataBlob data to the HistoriyEvent data - public static History DeserializeFromBlobToHistoryEvents( + // This method serializes history to blob data + public static DataBlob SerializeFromHistoryToBlobData(History history) { + + // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 + TSerializer serializer = new TSerializer(); + DataBlob blob = new DataBlob(); + try { + blob.setData(serializer.serialize(history)); + } catch (org.apache.thrift.TException err) { + throw new RuntimeException("Serialize history to blob data failed", err); + } + + return blob; + } + + // This method deserialize the DataBlob data to the History data + public static History DeserializeFromBlobDataToHistory( List blobData, HistoryEventFilterType historyEventFilterType) throws TException { - List events = new ArrayList(); + // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 + TDeserializer deSerializer = new TDeserializer(); + List events = Lists.newArrayList(); for (DataBlob data : blobData) { History history = new History(); try { byte[] dataByte = data.getData(); - dataByte = Arrays.copyOfRange(dataByte, 1, dataByte.length); + // TODO: verify the beginning index + dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length); deSerializer.deserialize(history, dataByte); if (history == null || history.getEvents() == null || history.getEvents().size() == 0) { return null; } } catch (org.apache.thrift.TException err) { - throw new TException("Deserialize blob data to history event failed with unknown error"); + throw new TException("Deserialize blob data to history failed with unknown error"); } events.addAll(history.getEvents()); @@ -184,22 +202,43 @@ public static History DeserializeFromBlobToHistoryEvents( // This method serializes history event to blob data public static List SerializeFromHistoryEventToBlobData(List events) { - List blobs = new ArrayList<>(events.size()); + + // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 + TSerializer serializer = new TSerializer(); + List blobs = Lists.newArrayListWithCapacity(events.size()); for (HistoryEvent event : events) { DataBlob blob = new DataBlob(); try { blob.setData(serializer.serialize(event)); } catch (org.apache.thrift.TException err) { - throw new RuntimeException("Serialize to blob data failed", err); + throw new RuntimeException("Serialize history event to blob data failed", err); } blobs.add(blob); } - return blobs; } - private static final TDeserializer deSerializer = new TDeserializer(); - private static final TSerializer serializer = new TSerializer(); + // This method serializes blob data to history event + public static List DeserializeFromBlobDataToHistoryEvents(List blobData) + throws TException { + + // TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218 + TDeserializer deSerializer = new TDeserializer(); + List events = Lists.newArrayList(); + for (DataBlob data : blobData) { + try { + HistoryEvent event = new HistoryEvent(); + byte[] dataByte = data.getData(); + // TODO: verify the beginning index + dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length); + deSerializer.deserialize(event, dataByte); + events.add(event); + } catch (org.apache.thrift.TException err) { + throw new TException("Deserialize blob data to history event failed with unknown error"); + } + } + return events; + } /** Prohibit instantiation */ private InternalUtils() {} diff --git a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java index 2044dff5a..8efa3bbd4 100644 --- a/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java +++ b/src/main/java/com/uber/cadence/serviceclient/WorkflowServiceTChannel.java @@ -574,7 +574,7 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory( GetWorkflowExecutionHistoryResponse res = result.getSuccess(); if (res.getRawHistory() != null) { History history = - InternalUtils.DeserializeFromBlobToHistoryEvents( + InternalUtils.DeserializeFromBlobDataToHistory( res.getRawHistory(), getRequest.getHistoryEventFilterType()); res.setHistory(history); } @@ -2154,7 +2154,7 @@ private void getWorkflowExecutionHistory( GetWorkflowExecutionHistoryResponse res = result.getSuccess(); if (res.getRawHistory() != null) { History history = - InternalUtils.DeserializeFromBlobToHistoryEvents( + InternalUtils.DeserializeFromBlobDataToHistory( res.getRawHistory(), getRequest.getHistoryEventFilterType()); res.setHistory(history); } diff --git a/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java b/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java index 6fe9ffb72..23ecb652c 100644 --- a/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java +++ b/src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java @@ -17,14 +17,23 @@ package com.uber.cadence.internal.common; +import static com.uber.cadence.EventType.WorkflowExecutionStarted; import static junit.framework.TestCase.assertEquals; +import static org.junit.Assert.assertNotNull; -import com.uber.cadence.SearchAttributes; +import com.google.common.collect.Lists; +import com.googlecode.junittoolbox.MultithreadingTester; +import com.googlecode.junittoolbox.RunnableAssert; +import com.uber.cadence.*; import com.uber.cadence.converter.DataConverterException; import com.uber.cadence.workflow.WorkflowUtils; import java.io.FileOutputStream; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.HashMap; +import java.util.List; import java.util.Map; +import junit.framework.TestCase; import org.junit.Test; public class InternalUtilsTest { @@ -47,4 +56,101 @@ public void testConvertMapToSearchAttributesException() throws Throwable { attr.put("InvalidValue", new FileOutputStream("dummy")); InternalUtils.convertMapToSearchAttributes(attr); } + + @Test + public void testSerialization_History() { + + RunnableAssert r = + new RunnableAssert("history_serialization") { + @Override + public void run() { + HistoryEvent event = + new HistoryEvent() + .setEventId(1) + .setVersion(1) + .setEventType(WorkflowExecutionStarted) + .setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)) + .setWorkflowExecutionStartedEventAttributes( + new WorkflowExecutionStartedEventAttributes() + .setAttempt(1) + .setFirstExecutionRunId("test")); + + List historyEvents = Lists.newArrayList(event); + History history = new History().setEvents(historyEvents); + DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history); + assertNotNull(blob); + + try { + History result = + InternalUtils.DeserializeFromBlobDataToHistory( + Lists.newArrayList(blob), HistoryEventFilterType.ALL_EVENT); + assertNotNull(result); + assertEquals(1, result.events.size()); + assertEquals(event.getEventId(), result.events.get(0).getEventId()); + assertEquals(event.getVersion(), result.events.get(0).getVersion()); + assertEquals(event.getEventType(), result.events.get(0).getEventType()); + assertEquals(event.getTimestamp(), result.events.get(0).getTimestamp()); + assertEquals( + event.getWorkflowExecutionStartedEventAttributes(), + result.events.get(0).getWorkflowExecutionStartedEventAttributes()); + } catch (Exception e) { + TestCase.fail("Received unexpected error during deserialization"); + } + } + }; + + try { + new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run(); + } catch (Exception e) { + TestCase.fail("Received unexpected error during concurrent deserialization"); + } + } + + @Test + public void testSerialization_HistoryEvent() { + + RunnableAssert r = + new RunnableAssert("history_event_serialization") { + @Override + public void run() { + HistoryEvent event = + new HistoryEvent() + .setEventId(1) + .setVersion(1) + .setEventType(WorkflowExecutionStarted) + .setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)) + .setWorkflowExecutionStartedEventAttributes( + new WorkflowExecutionStartedEventAttributes() + .setAttempt(1) + .setFirstExecutionRunId("test")); + + List historyEvents = Lists.newArrayList(event); + List blobList = + InternalUtils.SerializeFromHistoryEventToBlobData(historyEvents); + assertEquals(1, blobList.size()); + + try { + List result = + InternalUtils.DeserializeFromBlobDataToHistoryEvents(blobList); + assertNotNull(result); + assertEquals(1, result.size()); + assertEquals(event.getEventId(), result.get(0).getEventId()); + assertEquals(event.getVersion(), result.get(0).getVersion()); + assertEquals(event.getEventType(), result.get(0).getEventType()); + assertEquals(event.getTimestamp(), result.get(0).getTimestamp()); + assertEquals( + event.getWorkflowExecutionStartedEventAttributes(), + result.get(0).getWorkflowExecutionStartedEventAttributes()); + } catch (Exception e) { + TestCase.fail("Received unexpected error during deserialization"); + } + } + }; + + try { + new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run(); + } catch (Exception e) { + TestCase.fail("Received unexpected error during concurrent deserialization"); + } + } }