From 7f2c64ccf1aeee3e60fc13dbc233462df8eab391 Mon Sep 17 00:00:00 2001 From: Logan Riggs Date: Thu, 14 Sep 2023 16:57:22 -0700 Subject: [PATCH] Using some dynamic sizes --- cpp/src/gandiva/array_ops.cc | 4 +- cpp/src/gandiva/expr_decomposer.cc | 2 +- cpp/src/gandiva/gdv_function_stubs.cc | 8 --- java/gandiva/src/main/cpp/jni_common.cc | 22 ++++-- .../arrow/gandiva/evaluator/JniWrapper.java | 3 +- .../gandiva/evaluator/ListVectorExpander.java | 69 +++++++++++++++++++ .../arrow/gandiva/evaluator/Projector.java | 37 +++++++--- 7 files changed, 118 insertions(+), 27 deletions(-) create mode 100644 java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java diff --git a/cpp/src/gandiva/array_ops.cc b/cpp/src/gandiva/array_ops.cc index 7e5931c18cd42..fd04a84986974 100644 --- a/cpp/src/gandiva/array_ops.cc +++ b/cpp/src/gandiva/array_ops.cc @@ -105,9 +105,9 @@ int32_t* array_int32_remove(int64_t context_ptr, const int32_t* entry_buf, int* integers = new int[5]; int j = 0; for (int i = 0; i < entry_offsets_len; i++) { - std::cout << "LR going to check " << entry_buf + i << std::endl; + //std::cout << "LR going to check " << entry_buf + i << std::endl; int32_t entry_len = *(entry_buf + (i * 1)); - std::cout << "LR checking value " << entry_len << " against target " << remove_data << std::endl; + //std::cout << "LR checking value " << entry_len << " against target " << remove_data << std::endl; if (entry_len == remove_data) { continue; } else { diff --git a/cpp/src/gandiva/expr_decomposer.cc b/cpp/src/gandiva/expr_decomposer.cc index ec56d30c51589..72c992df11c7e 100644 --- a/cpp/src/gandiva/expr_decomposer.cc +++ b/cpp/src/gandiva/expr_decomposer.cc @@ -52,7 +52,7 @@ Status ExprDecomposer::Visit(const FieldNode& node) { //std::cout << "LR ExprDecomposer 3" << std::endl; auto p = std::make_shared(desc); value_dex = p; - int v = p->DataIdx(); + //int v = p->DataIdx(); //std::cout << "LR primitive list type " v << " " << } else { //std::cout << "LR ExprDecomposer 4" << std::endl; diff --git a/cpp/src/gandiva/gdv_function_stubs.cc b/cpp/src/gandiva/gdv_function_stubs.cc index c739861fcf492..045b97c698086 100644 --- a/cpp/src/gandiva/gdv_function_stubs.cc +++ b/cpp/src/gandiva/gdv_function_stubs.cc @@ -164,12 +164,8 @@ int32_t gdv_fn_populate_varlen_vector(int64_t context_ptr, int8_t* data_ptr, int32_t gdv_fn_populate_list_##TYPE##_vector(int64_t context_ptr, int8_t* data_ptr, \ int32_t* offsets, int64_t slot, \ TYPE* entry_buf, int32_t entry_len) { \ - std::cout << "gdv_fn_populate 1 data_ptr is " << data_ptr << std::endl; \ auto buffer = reinterpret_cast(data_ptr); \ int32_t offset = static_cast(buffer->size()); \ - std::cout << "gdv_fn_populate 2 data_ptr" << data_ptr << " buffer " << buffer << \ - " offset " << offset << " entry_len " << entry_len << " scale " \ - << SCALE << " slot " << slot<< std::endl; \ auto status = buffer->Resize(offset + entry_len * SCALE, false /*shrink*/); \ if (!status.ok()) { \ gandiva::ExecutionContext* context = \ @@ -177,11 +173,7 @@ int32_t gdv_fn_populate_varlen_vector(int64_t context_ptr, int8_t* data_ptr, context->set_error_msg(status.message().c_str()); \ return -1; \ } \ - std::cout << "gdv_fn_populate resized buffer to =" << offset + entry_len * SCALE << std::endl; \ - std::cout << "gdv_fn_populate copying bytes =" << entry_len * SCALE << std::endl; \ - std::cout << "gdv_fn_populate buffer =" << buffer->ToString() << " offeset " << offset << std::endl; \ memcpy(buffer->mutable_data() + offset, (char*)entry_buf, entry_len * SCALE); \ - std::cout << "gdv_fn_populate buffer after =" << buffer->ToString() << std::endl; \ offsets[slot] = offset / SCALE; \ offsets[slot + 1] = offset / SCALE + entry_len; \ return 0; \ diff --git a/java/gandiva/src/main/cpp/jni_common.cc b/java/gandiva/src/main/cpp/jni_common.cc index f26763fc0c03f..1b96109f256cc 100644 --- a/java/gandiva/src/main/cpp/jni_common.cc +++ b/java/gandiva/src/main/cpp/jni_common.cc @@ -939,7 +939,7 @@ Status JavaResizableBuffer::Resize(const int64_t new_size, bool shrink_to_fit) { JNIEXPORT void JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( - JNIEnv* env, jobject object, jobject jexpander, jlong module_id, jint num_rows, + JNIEnv* env, jobject object, jobject jexpander, jobject jListExpander, jlong module_id, jint num_rows, jlongArray buf_addrs, jlongArray buf_sizes, jint sel_vec_type, jint sel_vec_rows, jlong sel_vec_addr, jlong sel_vec_size, jlongArray out_buf_addrs, jlongArray out_buf_sizes) { @@ -1060,20 +1060,29 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( std::vector> child_buffers; + if (jListExpander == nullptr) { + status = Status::Invalid( + "expression has variable len output columns, but the jListExpander object is " + "null"); + break; + } + + + //LR TODO the two buffers... data_sz = out_sizes[sz_idx++]; //std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 3 adding buffer size=" << data_sz << std::endl; CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len); uint8_t* child_offset_buf = reinterpret_cast(out_bufs[buf_idx++]); child_buffers.push_back(std::make_shared( - env, jexpander, output_vector_idx, child_offset_buf, data_sz)); + env, jListExpander, output_vector_idx, child_offset_buf, data_sz)); data_sz = out_sizes[sz_idx++]; //std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 4 adding buffer size=" << data_sz << std::endl; CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len); uint8_t* child_data_buf = reinterpret_cast(out_bufs[buf_idx++]); child_buffers.push_back(std::make_shared( - env, jexpander, output_vector_idx, child_data_buf, data_sz)); + env, jListExpander, output_vector_idx, child_data_buf, data_sz)); std::shared_ptr dt2 = std::make_shared(); auto array_data_child = arrow::ArrayData::Make(dt2, output_row_count, child_buffers); @@ -1120,9 +1129,12 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( // << (array_data->child_data[0])->length << std::endl; //LRTest1 Start - int numRecords = 5 * 1000000; + int numRecords = (array_data->child_data[0])->length; //int numRecords = (array_data->child_data[0])->length * array_data->length; - int recordSize = numRecords * 4; + int recordSize = numRecords * 4; //LR TODO HACK + + std::cout << "LR jni_common there are records=" << array_data->length << " and the first one is=" + << (array_data->child_data[0])->length << " using numRecords=" << numRecords << std::endl; memcpy(&out_bufs[3], (array_data->child_data[0])->buffers[1]->data(), recordSize); out_sizes[3] = recordSize; diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java index 293d51a87a5fd..f883ed7081547 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java @@ -50,6 +50,7 @@ native long buildProjector(Object cache, byte[] schemaBuf, byte[] exprListBuf, * and store the output in ValueVectors. Throws an exception in case of errors * * @param expander VectorExpander object. Used for callbacks from cpp. + * @param listExpander ListVectorExpander object. Used for callbacks from cpp. * @param moduleId moduleId representing expressions. Created using a call to * buildNativeCode * @param numRows Number of rows in the record batch @@ -63,7 +64,7 @@ native long buildProjector(Object cache, byte[] schemaBuf, byte[] exprListBuf, * @param outSizes The allocated size of the output buffers. On successful evaluation, * the result is stored in the output buffers */ - native void evaluateProjector(Object expander, long moduleId, int numRows, + native void evaluateProjector(Object expander, Object listExpander, long moduleId, int numRows, long[] bufAddrs, long[] bufSizes, int selectionVectorType, int selectionVectorSize, long selectionVectorBufferAddr, long selectionVectorBufferSize, diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java new file mode 100644 index 0000000000000..7019f396b9677 --- /dev/null +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.gandiva.evaluator; + +import org.apache.arrow.vector.complex.ListVector; + +/** + * This class provides the functionality to expand output vectors using a callback mechanism from + * gandiva. + */ +public class ListVectorExpander { + private final ListVector[] vectors; + + public ListVectorExpander(ListVector[] vectors) { + this.vectors = vectors; + } + + /** + * Result of vector expansion. + */ + public static class ExpandResult { + public long address; + public long capacity; + + public ExpandResult(long address, long capacity) { + this.address = address; + this.capacity = capacity; + } + } + + /** + * Expand vector at specified index. This is used as a back call from jni, and is only + * relevant for ListVectors. + * + * @param index index of buffer in the list passed to jni. + * @param toCapacity the size to which the buffer should be expanded to. + * + * @return address and size of the buffer after expansion. + */ + public ExpandResult expandOutputVectorAtIndex(int index, long toCapacity) { + if (index >= vectors.length || vectors[index] == null) { + throw new IllegalArgumentException("invalid index " + index); + } + + ListVector vector = vectors[index]; + while (vector.getDataVector().getFieldBuffers().get(0).capacity() < toCapacity) { + vector.reAlloc(); + } + return new ExpandResult( + vector.getDataVector().getFieldBuffers().get(0).memoryAddress(), + vector.getDataVector().getFieldBuffers().get(0).capacity()); + } + +} diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java index 91729b7e519f0..ae904bb64a9e1 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java @@ -355,7 +355,8 @@ private void evaluate(int numRows, List buffers, List buf boolean hasVariableWidthColumns = false; BaseVariableWidthVector[] resizableVectors = new BaseVariableWidthVector[outColumns.size()]; - + ListVector[] resizableListVectors = new ListVector[outColumns.size()]; + long[] outAddrs = new long[3 * outColumns.size()]; long[] outSizes = new long[3 * outColumns.size()]; @@ -393,6 +394,7 @@ private void evaluate(int numRows, List buffers, List buf if (valueVector instanceof ListVector) { hasVariableWidthColumns = true; + resizableListVectors[outColumnIdx] = (ListVector) valueVector; //LR TODO figure out what to use here resizableVectors[outColumnIdx] = (BaseVariableWidthVector) valueVector; //resizableVectors[outColumnIdx] = (BaseVariableWidthVector) valueVector; //resizeableVectors[outColumnIdx] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0); @@ -413,7 +415,14 @@ private void evaluate(int numRows, List buffers, List buf logger.error("LR Projector.java evaluate ListVector passing data buffer as " + idx); ((ListVector) valueVector).reAlloc(); ((ListVector) valueVector).reAlloc(); - ((ListVector) valueVector).reAlloc(); + ((ListVector) valueVector).reAlloc(); //100 rows + + //This doesnt actually allocate any memory. + //((ListVector) valueVector).setInitialCapacity(1000000); + //while (((ListVector) valueVector).getValueCapacity() < 1000000) { + // ((ListVector) valueVector).reAlloc(); + //} + //The realloc avoids dynamic resizing, will have to be fixed later. outAddrs[idx] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0).memoryAddress(); outSizes[idx++] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0).capacity(); @@ -448,6 +457,7 @@ private void evaluate(int numRows, List buffers, List buf logger.error("LR Projector.java evaluate calling evaluateProjector with buffers=" + idx); wrapper.evaluateProjector( hasVariableWidthColumns ? new VectorExpander(resizableVectors) : null, + hasVariableWidthColumns ? new ListVectorExpander(resizableListVectors) : null, this.moduleId, numRows, bufAddrs, bufSizes, selectionVectorType, selectionVectorRecordCount, selectionVectorAddr, selectionVectorSize, @@ -464,9 +474,9 @@ private void evaluate(int numRows, List buffers, List buf if (valueVector instanceof ListVector) { //LR HACK - int numRecordsFound = 5 * 1000000; + //int numRecordsFound = 5 * 100; //int numRecordsFound = Math.toIntExact(outSizes[3]) / 4; - //logger.error("LR Projector.java using outsizes numRecords=" + numRecordsFound + " outSizes[3]=" + outSizes[3]); + //logger.error("LR Projector.java using numRecords=" + numRecordsFound + " outSizes[3]=" + outSizes[3]); //LR HACK 9-13 10:34 /*public void startList() { @@ -482,8 +492,8 @@ public void endList() { listStarted = false; */ - /*ArrowBuf ab2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[3], outAddrs[3]); - for (int i = 0; i < 50; i++) { + ArrowBuf ab2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[3], outAddrs[3]); + /*for (int i = 0; i < 50; i++) { System.out.println("LR arrowbuf=" + Integer.reverseBytes(ab2.getInt(i))); System.out.println("LR arrowbuf=" + ab2.getInt(i)); System.out.println("LR arrowbuf=" + ab2.getShort(i)); @@ -498,18 +508,25 @@ public void endList() { System.out.println("LR arrowbuf2=" + ab.getShort(i)); }*/ - + logger.error("LR Projector.java using numRecords=" + + selectionVectorRecordCount + " outSizes[3]=" + outSizes[3]); UnionListWriter writer = ((ListVector) valueVector).getWriter(); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < selectionVectorRecordCount; i++) { writer.startList(); writer.setPosition(i); for (int j = 0; j < 5; j++) { - writer.writeInt(ab2.getInt((j + (5 * i)) * 4)); + int index = ((j + (5 * i)) * 4); + //Not sure whats going on. Buffer too small? + try { + writer.writeInt(ab2.getInt(index)); + } catch (IndexOutOfBoundsException e) { + continue; + } } writer.setValueCount(5); writer.endList(); } - ((ListVector) valueVector).setValueCount(100); + ((ListVector) valueVector).setValueCount(selectionVectorRecordCount); //LR HACK 9-13 10:34 All the multiline comment