Skip to content

Commit

Permalink
Fix null pointer exception in deserialize blob (#585)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored Feb 24, 2021
1 parent 14e523a commit 3256a96
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 14 deletions.
61 changes: 50 additions & 11 deletions src/main/java/com/uber/cadence/internal/common/InternalUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.uber.cadence.internal.common;

import com.google.common.base.Defaults;
import com.google.common.collect.Lists;
import com.uber.cadence.DataBlob;
import com.uber.cadence.History;
import com.uber.cadence.HistoryEvent;
Expand All @@ -31,7 +32,6 @@
import com.uber.cadence.workflow.WorkflowMethod;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -153,23 +153,41 @@ public static SearchAttributes convertMapToSearchAttributes(
return new SearchAttributes().setIndexedFields(mapOfByteBuffer);
}

// This method deserialize the DataBlob data to the HistoriyEvent data
public static History DeserializeFromBlobToHistoryEvents(
// This method serializes history to blob data
public static DataBlob SerializeFromHistoryToBlobData(History history) {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TSerializer serializer = new TSerializer();
DataBlob blob = new DataBlob();
try {
blob.setData(serializer.serialize(history));
} catch (org.apache.thrift.TException err) {
throw new RuntimeException("Serialize history to blob data failed", err);
}

return blob;
}

// This method deserialize the DataBlob data to the History data
public static History DeserializeFromBlobDataToHistory(
List<DataBlob> blobData, HistoryEventFilterType historyEventFilterType) throws TException {

List<HistoryEvent> events = new ArrayList<HistoryEvent>();
// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TDeserializer deSerializer = new TDeserializer();
List<HistoryEvent> events = Lists.newArrayList();
for (DataBlob data : blobData) {
History history = new History();
try {
byte[] dataByte = data.getData();
dataByte = Arrays.copyOfRange(dataByte, 1, dataByte.length);
// TODO: verify the beginning index
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
deSerializer.deserialize(history, dataByte);

if (history == null || history.getEvents() == null || history.getEvents().size() == 0) {
return null;
}
} catch (org.apache.thrift.TException err) {
throw new TException("Deserialize blob data to history event failed with unknown error");
throw new TException("Deserialize blob data to history failed with unknown error");
}

events.addAll(history.getEvents());
Expand All @@ -184,22 +202,43 @@ public static History DeserializeFromBlobToHistoryEvents(

// This method serializes history event to blob data
public static List<DataBlob> SerializeFromHistoryEventToBlobData(List<HistoryEvent> events) {
List<DataBlob> blobs = new ArrayList<>(events.size());

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TSerializer serializer = new TSerializer();
List<DataBlob> blobs = Lists.newArrayListWithCapacity(events.size());
for (HistoryEvent event : events) {
DataBlob blob = new DataBlob();
try {
blob.setData(serializer.serialize(event));
} catch (org.apache.thrift.TException err) {
throw new RuntimeException("Serialize to blob data failed", err);
throw new RuntimeException("Serialize history event to blob data failed", err);
}
blobs.add(blob);
}

return blobs;
}

private static final TDeserializer deSerializer = new TDeserializer();
private static final TSerializer serializer = new TSerializer();
// This method serializes blob data to history event
public static List<HistoryEvent> DeserializeFromBlobDataToHistoryEvents(List<DataBlob> blobData)
throws TException {

// TODO: move to global dependency after https://issues.apache.org/jira/browse/THRIFT-2218
TDeserializer deSerializer = new TDeserializer();
List<HistoryEvent> events = Lists.newArrayList();
for (DataBlob data : blobData) {
try {
HistoryEvent event = new HistoryEvent();
byte[] dataByte = data.getData();
// TODO: verify the beginning index
dataByte = Arrays.copyOfRange(dataByte, 0, dataByte.length);
deSerializer.deserialize(event, dataByte);
events.add(event);
} catch (org.apache.thrift.TException err) {
throw new TException("Deserialize blob data to history event failed with unknown error");
}
}
return events;
}

/** Prohibit instantiation */
private InternalUtils() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ private GetWorkflowExecutionHistoryResponse getWorkflowExecutionHistory(
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
if (res.getRawHistory() != null) {
History history =
InternalUtils.DeserializeFromBlobToHistoryEvents(
InternalUtils.DeserializeFromBlobDataToHistory(
res.getRawHistory(), getRequest.getHistoryEventFilterType());
res.setHistory(history);
}
Expand Down Expand Up @@ -2154,7 +2154,7 @@ private void getWorkflowExecutionHistory(
GetWorkflowExecutionHistoryResponse res = result.getSuccess();
if (res.getRawHistory() != null) {
History history =
InternalUtils.DeserializeFromBlobToHistoryEvents(
InternalUtils.DeserializeFromBlobDataToHistory(
res.getRawHistory(), getRequest.getHistoryEventFilterType());
res.setHistory(history);
}
Expand Down
108 changes: 107 additions & 1 deletion src/test/java/com/uber/cadence/internal/common/InternalUtilsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,23 @@

package com.uber.cadence.internal.common;

import static com.uber.cadence.EventType.WorkflowExecutionStarted;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.uber.cadence.SearchAttributes;
import com.google.common.collect.Lists;
import com.googlecode.junittoolbox.MultithreadingTester;
import com.googlecode.junittoolbox.RunnableAssert;
import com.uber.cadence.*;
import com.uber.cadence.converter.DataConverterException;
import com.uber.cadence.workflow.WorkflowUtils;
import java.io.FileOutputStream;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.TestCase;
import org.junit.Test;

public class InternalUtilsTest {
Expand All @@ -47,4 +56,101 @@ public void testConvertMapToSearchAttributesException() throws Throwable {
attr.put("InvalidValue", new FileOutputStream("dummy"));
InternalUtils.convertMapToSearchAttributes(attr);
}

@Test
public void testSerialization_History() {

RunnableAssert r =
new RunnableAssert("history_serialization") {
@Override
public void run() {
HistoryEvent event =
new HistoryEvent()
.setEventId(1)
.setVersion(1)
.setEventType(WorkflowExecutionStarted)
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
.setWorkflowExecutionStartedEventAttributes(
new WorkflowExecutionStartedEventAttributes()
.setAttempt(1)
.setFirstExecutionRunId("test"));

List<HistoryEvent> historyEvents = Lists.newArrayList(event);
History history = new History().setEvents(historyEvents);
DataBlob blob = InternalUtils.SerializeFromHistoryToBlobData(history);
assertNotNull(blob);

try {
History result =
InternalUtils.DeserializeFromBlobDataToHistory(
Lists.newArrayList(blob), HistoryEventFilterType.ALL_EVENT);
assertNotNull(result);
assertEquals(1, result.events.size());
assertEquals(event.getEventId(), result.events.get(0).getEventId());
assertEquals(event.getVersion(), result.events.get(0).getVersion());
assertEquals(event.getEventType(), result.events.get(0).getEventType());
assertEquals(event.getTimestamp(), result.events.get(0).getTimestamp());
assertEquals(
event.getWorkflowExecutionStartedEventAttributes(),
result.events.get(0).getWorkflowExecutionStartedEventAttributes());
} catch (Exception e) {
TestCase.fail("Received unexpected error during deserialization");
}
}
};

try {
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
} catch (Exception e) {
TestCase.fail("Received unexpected error during concurrent deserialization");
}
}

@Test
public void testSerialization_HistoryEvent() {

RunnableAssert r =
new RunnableAssert("history_event_serialization") {
@Override
public void run() {
HistoryEvent event =
new HistoryEvent()
.setEventId(1)
.setVersion(1)
.setEventType(WorkflowExecutionStarted)
.setTimestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC))
.setWorkflowExecutionStartedEventAttributes(
new WorkflowExecutionStartedEventAttributes()
.setAttempt(1)
.setFirstExecutionRunId("test"));

List<HistoryEvent> historyEvents = Lists.newArrayList(event);
List<DataBlob> blobList =
InternalUtils.SerializeFromHistoryEventToBlobData(historyEvents);
assertEquals(1, blobList.size());

try {
List<HistoryEvent> result =
InternalUtils.DeserializeFromBlobDataToHistoryEvents(blobList);
assertNotNull(result);
assertEquals(1, result.size());
assertEquals(event.getEventId(), result.get(0).getEventId());
assertEquals(event.getVersion(), result.get(0).getVersion());
assertEquals(event.getEventType(), result.get(0).getEventType());
assertEquals(event.getTimestamp(), result.get(0).getTimestamp());
assertEquals(
event.getWorkflowExecutionStartedEventAttributes(),
result.get(0).getWorkflowExecutionStartedEventAttributes());
} catch (Exception e) {
TestCase.fail("Received unexpected error during deserialization");
}
}
};

try {
new MultithreadingTester().add(r).numThreads(50).numRoundsPerThread(10).run();
} catch (Exception e) {
TestCase.fail("Received unexpected error during concurrent deserialization");
}
}
}

0 comments on commit 3256a96

Please sign in to comment.