Skip to content

Commit

Permalink
DX-61115 cherry picked commits form arrow (#37)
Browse files Browse the repository at this point in the history
* apacheGH-36375: [Java] Added creating MapWriter in ComplexWriter. (apache#36351) (#32)

Added new method rootAsMap() to ComplexWriter and implement it in ComplexWriterImpl for supporting map type.
Previously in dremio side:

When i trying to return map like output ComplexWrite
with this code:

org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter mapWriter = out.rootAsList().map(false);

mapWriter.startMap();
for (java.util.Map.Entry<java.lang.Integer, java.lang.Integer> element : map.entrySet()) {
    mapWriter.startEntry();
    mapWriter.key().integer().writeInt((Integer) element.getKey());
    mapWriter.value().integer().writeInt((Integer) element.getValue());
    mapWriter.endEntry();
}
mapWriter.endMap();
It use UnionMapWriter and generate schema like:
EXPR$0: Map(false)<$data$: Union(Sparse, [1, 39])<struct: Struct<key: Int(32, true) not null, value: Int(32, true) not null> not null, map: Map(false)<entries: Struct<key: Int(32, true) not null, value: Int(32, true)> not null>>>
But in OutputDerivation impl class where i should create output Complete type

List<Field> children = Arrays.asList( CompleteType.INT.toField("key", false), CompleteType.INT.toField("value", false));
return new CompleteType(CompleteType.MAP.getType(), CompleteType.struct(children).toField(MapVector.DATA_VECTOR_NAME, false));
(This is only one valid case, because MapVector.initializeChildrenFromFields())
return
EXPR$0::map<key::int32, value::int32> I found a place where it start using union - PromotableWriter.promoteToUnion.
And in the end i have

SCHEMA_CHANGE ERROR: Schema changed during projection. Schema was 
schema(EXPR$0::map<key::int32, value::int32>)
 but then changed to 
schema(EXPR$0::map<struct::struct<key::int32, value::int32>, map::map<key::int32, value::int32>>)
* Closes: apache#36375

Authored-by: Ivan Chesnov <[email protected]>

Signed-off-by: David Li <[email protected]>

* DX-67936 Upgrade to Netty 4.1.96 for CVE-2023-34462 io.netty:netty-handler 4.1.93.Final (#36)

* Update README_DREMIO for new commit.

---------

Signed-off-by: David Li <[email protected]>
Co-authored-by: lriggs <[email protected]>
Co-authored-by: Logan Riggs <[email protected]>
  • Loading branch information
3 people authored Jul 30, 2023
1 parent 7f71473 commit a2e8d49
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 7 deletions.
7 changes: 2 additions & 5 deletions README_DREMIO.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
git checkout -b dremio_24.3_12.0 apache-arrow-12.0.1

Apply starting from the bottom and working upwards.

git cherry-pick d4dc6d525cfd89aa7ca7a23e0d755f5f2e3b2c48
git cherry-pick cf53c1e023605fd506a3617bbd9fe1788b33c5a3
git cherry-pick 0230230a4a28d0e1e2aa23570804a14444c58e4a
git cherry-pick 0002859096a93cd9b0630db757a277b75a483263
git cherry-pick 5ed2f61a9fa72d011248bf9cfec7a9734d5b2aa0
git cherry-pick c601e318a897b3a6fd5f4fd3068beda1460173a2
Expand Down
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dep.junit.jupiter.version>5.9.0</dep.junit.jupiter.version>
<dep.slf4j.version>1.7.25</dep.slf4j.version>
<dep.guava-bom.version>31.1-jre</dep.guava-bom.version>
<dep.netty-bom.version>4.1.93.Final</dep.netty-bom.version>
<dep.netty-bom.version>4.1.96.Final</dep.netty-bom.version>
<dep.grpc-bom.version>1.56.0</dep.grpc-bom.version>
<dep.protobuf-bom.version>3.21.9</dep.protobuf-bom.version>
<dep.jackson-bom.version>2.13.4</dep.jackson-bom.version>
Expand Down
1 change: 1 addition & 0 deletions java/vector/src/main/codegen/templates/BaseWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public interface ComplexWriter {
void copyReader(FieldReader reader);
StructWriter rootAsStruct();
ListWriter rootAsList();
MapWriter rootAsMap(boolean keysSorted);

void setPosition(int index);
void setValueCount(int count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.complex.NonNullableStructVector;
import org.apache.arrow.vector.complex.StateTool;
import org.apache.arrow.vector.complex.StructVector;
Expand All @@ -32,14 +33,15 @@ public class ComplexWriterImpl extends AbstractFieldWriter implements ComplexWri

private NullableStructWriter structRoot;
private UnionListWriter listRoot;
private UnionMapWriter mapRoot;
private final NonNullableStructVector container;

Mode mode = Mode.INIT;
private final String name;
private final boolean unionEnabled;
private final NullableStructWriterFactory nullableStructWriterFactory;

private enum Mode { INIT, STRUCT, LIST }
private enum Mode { INIT, STRUCT, LIST, MAP }

/**
* Constructs a new instance.
Expand Down Expand Up @@ -107,6 +109,9 @@ public void clear() {
case LIST:
listRoot.clear();
break;
case MAP:
mapRoot.clear();
break;
default:
break;
}
Expand All @@ -121,6 +126,9 @@ public void setValueCount(int count) {
case LIST:
listRoot.setValueCount(count);
break;
case MAP:
mapRoot.setValueCount(count);
break;
default:
break;
}
Expand All @@ -136,6 +144,9 @@ public void setPosition(int index) {
case LIST:
listRoot.setPosition(index);
break;
case MAP:
mapRoot.setPosition(index);
break;
default:
break;
}
Expand Down Expand Up @@ -223,5 +234,29 @@ public ListWriter rootAsList() {
return listRoot;
}

@Override
public MapWriter rootAsMap(boolean keysSorted) {
switch (mode) {

case INIT:
int vectorCount = container.size();
// TODO allow dictionaries in complex types
MapVector mapVector = container.addOrGetMap(name, keysSorted);
if (container.size() > vectorCount) {
mapVector.allocateNew();
}
mapRoot = new UnionMapWriter(mapVector);
mapRoot.setPosition(idx());
mode = Mode.MAP;
break;

case MAP:
break;

default:
check(Mode.INIT, Mode.STRUCT);
}

return mapRoot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1401,4 +1401,108 @@ public void testStructOfList() {
Assert.assertEquals(0, size);
}
}

@Test
public void testMap() {
try (NonNullableStructVector parent = NonNullableStructVector.empty("parent", allocator)) {
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter mapWriter = writer.rootAsMap(false);
for (int i = 0; i < COUNT; i++) {
mapWriter.startMap();
for (int j = 0; j < i % 7; j++) {
mapWriter.startEntry();
if (j % 2 == 0) {
mapWriter.key().integer().writeInt(j);
mapWriter.value().integer().writeInt(j + 1);
} else {
IntHolder keyHolder = new IntHolder();
keyHolder.value = j;
IntHolder valueHolder = new IntHolder();
valueHolder.value = j + 1;
mapWriter.key().integer().write(keyHolder);
mapWriter.value().integer().write(valueHolder);
}
mapWriter.endEntry();
}
mapWriter.endMap();
}
writer.setValueCount(COUNT);
UnionMapReader mapReader = (UnionMapReader) new SingleStructReaderImpl(parent).reader("root");
for (int i = 0; i < COUNT; i++) {
mapReader.setPosition(i);
for (int j = 0; j < i % 7; j++) {
mapReader.next();
assertEquals(j, mapReader.key().readInteger().intValue());
assertEquals(j + 1, mapReader.value().readInteger().intValue());
}
}
}
}

@Test
public void testMapWithNulls() {
try (NonNullableStructVector parent = NonNullableStructVector.empty("parent", allocator)) {
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter mapWriter = writer.rootAsMap(false);
mapWriter.startMap();
mapWriter.startEntry();
mapWriter.key().integer().writeNull();
mapWriter.value().integer().writeInt(1);
mapWriter.endEntry();
mapWriter.endMap();
writer.setValueCount(1);
UnionMapReader mapReader = (UnionMapReader) new SingleStructReaderImpl(parent).reader("root");
Assert.assertNull(mapReader.key().readInteger());
assertEquals(1, mapReader.value().readInteger().intValue());
}
}

@Test
public void testMapWithListKey() {
try (NonNullableStructVector parent = NonNullableStructVector.empty("parent", allocator)) {
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter mapWriter = writer.rootAsMap(false);
mapWriter.startMap();
mapWriter.startEntry();
mapWriter.key().list().startList();
for (int i = 0; i < 3; i++) {
mapWriter.key().list().integer().writeInt(i);
}
mapWriter.key().list().endList();
mapWriter.value().integer().writeInt(1);
mapWriter.endEntry();
mapWriter.endMap();
writer.setValueCount(1);
UnionMapReader mapReader = (UnionMapReader) new SingleStructReaderImpl(parent).reader("root");
mapReader.key().next();
assertEquals(0, mapReader.key().reader().readInteger().intValue());
mapReader.key().next();
assertEquals(1, mapReader.key().reader().readInteger().intValue());
mapReader.key().next();
assertEquals(2, mapReader.key().reader().readInteger().intValue());
assertEquals(1, mapReader.value().readInteger().intValue());
}
}

@Test
public void testMapWithStructKey() {
try (NonNullableStructVector parent = NonNullableStructVector.empty("parent", allocator)) {
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter mapWriter = writer.rootAsMap(false);
mapWriter.startMap();
mapWriter.startEntry();
mapWriter.key().struct().start();
mapWriter.key().struct().integer("value1").writeInt(1);
mapWriter.key().struct().integer("value2").writeInt(2);
mapWriter.key().struct().end();
mapWriter.value().integer().writeInt(1);
mapWriter.endEntry();
mapWriter.endMap();
writer.setValueCount(1);
UnionMapReader mapReader = (UnionMapReader) new SingleStructReaderImpl(parent).reader("root");
assertEquals(1, mapReader.key().reader("value1").readInteger().intValue());
assertEquals(2, mapReader.key().reader("value2").readInteger().intValue());
assertEquals(1, mapReader.value().readInteger().intValue());
}
}
}

0 comments on commit a2e8d49

Please sign in to comment.