Skip to content

Commit

Permalink
apacheGH-37056: [Java] Fix importing an empty data array from c-data (a…
Browse files Browse the repository at this point in the history
…pache#37531)

### Rationale for this change

Fix java lib bug that prevents from importing specific data via c-data interface.
Currently an attempt to load a vector with empty data buffer results in an IllegalStateException error.

### What changes are included in this PR?
Updated BufferImportTypeVisitor to correctly handle a situation when underlying c-data array pointer is NULL (0) and the expected length of data is zero (0). 

### Are these changes tested?
Yes, updated the existing unit tests

### Are there any user-facing changes?
No

* Closes: apache#37056

Lead-authored-by: Jacek Stania <[email protected]>
Co-authored-by: Jacek Stania <[email protected]>
Co-authored-by: David Li <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
janosik47 and lidavidm authored Sep 7, 2023
1 parent 65e2f22 commit 5a78169
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,45 +80,44 @@ public void close() throws Exception {
}

@VisibleForTesting
long getBufferPtr(ArrowType type, int index) {
ArrowBuf importBuffer(ArrowType type, int index, long capacity) {
checkState(
buffers.length > index,
"Expected at least %s buffers for type %s, but found %s", index + 1, type, buffers.length);
if (buffers[index] == NULL) {
throw new IllegalStateException(String.format("Buffer %s for type %s cannot be null", index, type));
buffers.length > index,
"Expected at least %s buffers for type %s, but found %s", index + 1, type, buffers.length);
long bufferPtr = buffers[index];

if (bufferPtr == NULL) {
// C array may be NULL but only accept that if expected capacity is zero too
if (capacity != 0) {
throw new IllegalStateException(String.format("Buffer %s for type %s cannot be null", index, type));
} else {
// no data in the C array, return an empty buffer
return allocator.getEmpty();
}
}
return buffers[index];

ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
imported.add(buf);
return buf;
}

private ArrowBuf importFixedBits(ArrowType type, int index, long bitsPerSlot) {
final long bufferPtr = getBufferPtr(type, index);
final long capacity = DataSizeRoundingUtil.divideBy8Ceil(bitsPerSlot * fieldNode.getLength());
ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
this.imported.add(buf);
return buf;
return importBuffer(type, index, capacity);
}

private ArrowBuf importFixedBytes(ArrowType type, int index, long bytesPerSlot) {
final long bufferPtr = getBufferPtr(type, index);
final long capacity = bytesPerSlot * fieldNode.getLength();
ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
this.imported.add(buf);
return buf;
return importBuffer(type, index, capacity);
}

private ArrowBuf importOffsets(ArrowType type, long bytesPerSlot) {
final long bufferPtr = getBufferPtr(type, 1);
final long capacity = bytesPerSlot * (fieldNode.getLength() + 1);
ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
this.imported.add(buf);
return buf;
return importBuffer(type, 1, capacity);
}

private ArrowBuf importData(ArrowType type, long capacity) {
final long bufferPtr = getBufferPtr(type, 2);
ArrowBuf buf = underlyingAllocation.unsafeAssociateAllocation(allocator, capacity, bufferPtr);
this.imported.add(buf);
return buf;
return importBuffer(type, 2, capacity);
}

private ArrowBuf maybeImportBitmap(ArrowType type) {
Expand Down
30 changes: 25 additions & 5 deletions java/c/src/test/java/org/apache/arrow/c/ArrowArrayUtilityTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,39 @@ void afterEach() {
// BufferImportTypeVisitor

@Test
void getBufferPtr() throws Exception {
void importBuffer() throws Exception {
// Note values are all dummy values here
try (BufferImportTypeVisitor visitor =
new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[]{0})) {
try (BufferImportTypeVisitor notEmptyDataVisitor =
new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(/* length= */ 1, 0), new long[]{0})) {

// Too few buffers
assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 1));
assertThrows(IllegalStateException.class, () -> notEmptyDataVisitor.importBuffer(new ArrowType.Bool(), 1, 1));

// Null where one isn't expected
assertThrows(IllegalStateException.class, () -> visitor.getBufferPtr(new ArrowType.Bool(), 0));
assertThrows(IllegalStateException.class, () -> notEmptyDataVisitor.importBuffer(new ArrowType.Bool(), 0, 1));

// Expected capacity not zero but c array ptr is NULL (zero)
assertThrows(IllegalStateException.class, () -> notEmptyDataVisitor.importBuffer(new ArrowType.Bool(), 0, 1));

// Expected capacity is zero and c array ptr is NULL (zero)
assertThat(notEmptyDataVisitor.importBuffer(new ArrowType.Bool(), 0, 0)).isEqualTo(allocator.getEmpty());
}

try (BufferImportTypeVisitor emptyDataVisitor =
new BufferImportTypeVisitor(allocator, dummyHandle, new ArrowFieldNode(/* length= */ 0, 0), new long[]{0})) {

// Too few buffers
assertThrows(IllegalStateException.class, () -> emptyDataVisitor.importBuffer(new ArrowType.Bool(), 1, 1));

// Expected capacity not zero but c array ptr is NULL (zero)
assertThrows(IllegalStateException.class, () -> emptyDataVisitor.importBuffer(new ArrowType.Bool(), 0, 1));

// Expected capacity is zero and c array ptr is NULL (zero)
assertThat(emptyDataVisitor.importBuffer(new ArrowType.Bool(), 0, 0)).isEqualTo(allocator.getEmpty());
}
}


@Test
void cleanupAfterFailure() throws Exception {
// Note values are all dummy values here
Expand Down
8 changes: 8 additions & 0 deletions java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,14 @@ public void testListVector() {
}
}

@Test
public void testEmptyListVector() {
try (final ListVector vector = ListVector.empty("v", allocator)) {
setVector(vector, new ArrayList<Integer>());
assertTrue(roundtrip(vector, ListVector.class));
}
}

@Test
public void testLargeListVector() {
try (final LargeListVector vector = LargeListVector.empty("v", allocator)) {
Expand Down
27 changes: 25 additions & 2 deletions java/c/src/test/python/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import jpype
import pyarrow as pa
import pyarrow.ipc as ipc
from pyarrow.cffi import ffi


Expand Down Expand Up @@ -119,6 +120,10 @@ def close(self):


class TestPythonIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
setup_jvm()

def setUp(self):
gc.collect()
self.old_allocated_python = pa.total_allocated_bytes()
Expand Down Expand Up @@ -195,7 +200,26 @@ def test_list_array(self):
# disabled check_metadata since the list internal field name ("item")
# is not preserved during round trips (it becomes "$data$").
), check_metadata=False)


def test_empty_list_array(self):
"""Validates GH-37056 fix.
Empty list of int32 produces a vector with empty child data buffer, however with non-zero capacity.
Using streaming forces the c-data array which represent the child data buffer to be NULL (pointer is 0).
On Java side, an attempt to import such array triggered an exception described in GH-37056.
"""
with pa.BufferOutputStream() as bos:
schema = pa.schema([pa.field("f0", pa.list_(pa.int32()), True)])
with ipc.new_stream(bos, schema) as writer:
src = pa.RecordBatch.from_arrays([pa.array([[]])], schema=schema)
writer.write(src)
data_bytes = bos.getvalue()

def recreate_batch():
with pa.input_stream(data_bytes) as ios:
with ipc.open_stream(ios) as reader:
return reader.read_next_batch()

self.round_trip_record_batch(recreate_batch)

def test_struct_array(self):
fields = [
Expand Down Expand Up @@ -274,5 +298,4 @@ def test_reader_complex_roundtrip(self):


if __name__ == '__main__':
setup_jvm()
unittest.main(verbosity=2)

0 comments on commit 5a78169

Please sign in to comment.