Skip to content

Commit

Permalink
Using some dynamic sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
lriggs committed Sep 14, 2023
1 parent fcd1aa0 commit 7f2c64c
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 27 deletions.
4 changes: 2 additions & 2 deletions cpp/src/gandiva/array_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ int32_t* array_int32_remove(int64_t context_ptr, const int32_t* entry_buf,
int* integers = new int[5];
int j = 0;
for (int i = 0; i < entry_offsets_len; i++) {
std::cout << "LR going to check " << entry_buf + i << std::endl;
//std::cout << "LR going to check " << entry_buf + i << std::endl;
int32_t entry_len = *(entry_buf + (i * 1));
std::cout << "LR checking value " << entry_len << " against target " << remove_data << std::endl;
//std::cout << "LR checking value " << entry_len << " against target " << remove_data << std::endl;
if (entry_len == remove_data) {
continue;
} else {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/gandiva/expr_decomposer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Status ExprDecomposer::Visit(const FieldNode& node) {
//std::cout << "LR ExprDecomposer 3" << std::endl;
auto p = std::make_shared<VectorReadFixedLenValueListDex>(desc);
value_dex = p;
int v = p->DataIdx();
//int v = p->DataIdx();
//std::cout << "LR primitive list type " v << " " <<
} else {
//std::cout << "LR ExprDecomposer 4" << std::endl;
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/gandiva/gdv_function_stubs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,24 +164,16 @@ int32_t gdv_fn_populate_varlen_vector(int64_t context_ptr, int8_t* data_ptr,
int32_t gdv_fn_populate_list_##TYPE##_vector(int64_t context_ptr, int8_t* data_ptr, \
int32_t* offsets, int64_t slot, \
TYPE* entry_buf, int32_t entry_len) { \
std::cout << "gdv_fn_populate 1 data_ptr is " << data_ptr << std::endl; \
auto buffer = reinterpret_cast<arrow::ResizableBuffer*>(data_ptr); \
int32_t offset = static_cast<int32_t>(buffer->size()); \
std::cout << "gdv_fn_populate 2 data_ptr" << data_ptr << " buffer " << buffer << \
" offset " << offset << " entry_len " << entry_len << " scale " \
<< SCALE << " slot " << slot<< std::endl; \
auto status = buffer->Resize(offset + entry_len * SCALE, false /*shrink*/); \
if (!status.ok()) { \
gandiva::ExecutionContext* context = \
reinterpret_cast<gandiva::ExecutionContext*>(context_ptr); \
context->set_error_msg(status.message().c_str()); \
return -1; \
} \
std::cout << "gdv_fn_populate resized buffer to =" << offset + entry_len * SCALE << std::endl; \
std::cout << "gdv_fn_populate copying bytes =" << entry_len * SCALE << std::endl; \
std::cout << "gdv_fn_populate buffer =" << buffer->ToString() << " offeset " << offset << std::endl; \
memcpy(buffer->mutable_data() + offset, (char*)entry_buf, entry_len * SCALE); \
std::cout << "gdv_fn_populate buffer after =" << buffer->ToString() << std::endl; \
offsets[slot] = offset / SCALE; \
offsets[slot + 1] = offset / SCALE + entry_len; \
return 0; \
Expand Down
22 changes: 17 additions & 5 deletions java/gandiva/src/main/cpp/jni_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ Status JavaResizableBuffer::Resize(const int64_t new_size, bool shrink_to_fit) {

JNIEXPORT void JNICALL
Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
JNIEnv* env, jobject object, jobject jexpander, jlong module_id, jint num_rows,
JNIEnv* env, jobject object, jobject jexpander, jobject jListExpander, jlong module_id, jint num_rows,
jlongArray buf_addrs, jlongArray buf_sizes, jint sel_vec_type, jint sel_vec_rows,
jlong sel_vec_addr, jlong sel_vec_size, jlongArray out_buf_addrs,
jlongArray out_buf_sizes) {
Expand Down Expand Up @@ -1060,20 +1060,29 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(

std::vector<std::shared_ptr<arrow::Buffer>> child_buffers;

if (jListExpander == nullptr) {
status = Status::Invalid(
"expression has variable len output columns, but the jListExpander object is "
"null");
break;
}


//LR TODO the two buffers...

data_sz = out_sizes[sz_idx++];
//std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 3 adding buffer size=" << data_sz << std::endl;
CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len);
uint8_t* child_offset_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]);
child_buffers.push_back(std::make_shared<JavaResizableBuffer>(
env, jexpander, output_vector_idx, child_offset_buf, data_sz));
env, jListExpander, output_vector_idx, child_offset_buf, data_sz));

data_sz = out_sizes[sz_idx++];
//std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 4 adding buffer size=" << data_sz << std::endl;
CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len);
uint8_t* child_data_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]);
child_buffers.push_back(std::make_shared<JavaResizableBuffer>(
env, jexpander, output_vector_idx, child_data_buf, data_sz));
env, jListExpander, output_vector_idx, child_data_buf, data_sz));

std::shared_ptr<arrow::DataType> dt2 = std::make_shared<arrow::Int32Type>();
auto array_data_child = arrow::ArrayData::Make(dt2, output_row_count, child_buffers);
Expand Down Expand Up @@ -1120,9 +1129,12 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector(
// << (array_data->child_data[0])->length << std::endl;

//LRTest1 Start
int numRecords = 5 * 1000000;
int numRecords = (array_data->child_data[0])->length;
//int numRecords = (array_data->child_data[0])->length * array_data->length;
int recordSize = numRecords * 4;
int recordSize = numRecords * 4; //LR TODO HACK

std::cout << "LR jni_common there are records=" << array_data->length << " and the first one is="
<< (array_data->child_data[0])->length << " using numRecords=" << numRecords << std::endl;

memcpy(&out_bufs[3], (array_data->child_data[0])->buffers[1]->data(), recordSize);
out_sizes[3] = recordSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ native long buildProjector(Object cache, byte[] schemaBuf, byte[] exprListBuf,
* and store the output in ValueVectors. Throws an exception in case of errors
*
* @param expander VectorExpander object. Used for callbacks from cpp.
* @param listExpander ListVectorExpander object. Used for callbacks from cpp.
* @param moduleId moduleId representing expressions. Created using a call to
* buildNativeCode
* @param numRows Number of rows in the record batch
Expand All @@ -63,7 +64,7 @@ native long buildProjector(Object cache, byte[] schemaBuf, byte[] exprListBuf,
* @param outSizes The allocated size of the output buffers. On successful evaluation,
* the result is stored in the output buffers
*/
native void evaluateProjector(Object expander, long moduleId, int numRows,
native void evaluateProjector(Object expander, Object listExpander, long moduleId, int numRows,
long[] bufAddrs, long[] bufSizes,
int selectionVectorType, int selectionVectorSize,
long selectionVectorBufferAddr, long selectionVectorBufferSize,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.gandiva.evaluator;

import org.apache.arrow.vector.complex.ListVector;

/**
* This class provides the functionality to expand output vectors using a callback mechanism from
* gandiva.
*/
public class ListVectorExpander {
private final ListVector[] vectors;

public ListVectorExpander(ListVector[] vectors) {
this.vectors = vectors;
}

/**
* Result of vector expansion.
*/
public static class ExpandResult {
public long address;
public long capacity;

public ExpandResult(long address, long capacity) {
this.address = address;
this.capacity = capacity;
}
}

/**
* Expand vector at specified index. This is used as a back call from jni, and is only
* relevant for ListVectors.
*
* @param index index of buffer in the list passed to jni.
* @param toCapacity the size to which the buffer should be expanded to.
*
* @return address and size of the buffer after expansion.
*/
public ExpandResult expandOutputVectorAtIndex(int index, long toCapacity) {
if (index >= vectors.length || vectors[index] == null) {
throw new IllegalArgumentException("invalid index " + index);
}

ListVector vector = vectors[index];
while (vector.getDataVector().getFieldBuffers().get(0).capacity() < toCapacity) {
vector.reAlloc();
}
return new ExpandResult(
vector.getDataVector().getFieldBuffers().get(0).memoryAddress(),
vector.getDataVector().getFieldBuffers().get(0).capacity());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,8 @@ private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buf

boolean hasVariableWidthColumns = false;
BaseVariableWidthVector[] resizableVectors = new BaseVariableWidthVector[outColumns.size()];

ListVector[] resizableListVectors = new ListVector[outColumns.size()];

long[] outAddrs = new long[3 * outColumns.size()];
long[] outSizes = new long[3 * outColumns.size()];

Expand Down Expand Up @@ -393,6 +394,7 @@ private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buf
if (valueVector instanceof ListVector) {

hasVariableWidthColumns = true;
resizableListVectors[outColumnIdx] = (ListVector) valueVector;
//LR TODO figure out what to use here resizableVectors[outColumnIdx] = (BaseVariableWidthVector) valueVector;
//resizableVectors[outColumnIdx] = (BaseVariableWidthVector) valueVector;
//resizeableVectors[outColumnIdx] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0);
Expand All @@ -413,7 +415,14 @@ private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buf
logger.error("LR Projector.java evaluate ListVector passing data buffer as " + idx);
((ListVector) valueVector).reAlloc();
((ListVector) valueVector).reAlloc();
((ListVector) valueVector).reAlloc();
((ListVector) valueVector).reAlloc(); //100 rows

//This doesnt actually allocate any memory.
//((ListVector) valueVector).setInitialCapacity(1000000);
//while (((ListVector) valueVector).getValueCapacity() < 1000000) {
// ((ListVector) valueVector).reAlloc();
//}

//The realloc avoids dynamic resizing, will have to be fixed later.
outAddrs[idx] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0).memoryAddress();
outSizes[idx++] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0).capacity();
Expand Down Expand Up @@ -448,6 +457,7 @@ private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buf
logger.error("LR Projector.java evaluate calling evaluateProjector with buffers=" + idx);
wrapper.evaluateProjector(
hasVariableWidthColumns ? new VectorExpander(resizableVectors) : null,
hasVariableWidthColumns ? new ListVectorExpander(resizableListVectors) : null,
this.moduleId, numRows, bufAddrs, bufSizes,
selectionVectorType, selectionVectorRecordCount,
selectionVectorAddr, selectionVectorSize,
Expand All @@ -464,9 +474,9 @@ private void evaluate(int numRows, List<ArrowBuf> buffers, List<ArrowBuffer> buf
if (valueVector instanceof ListVector) {
//LR HACK

int numRecordsFound = 5 * 1000000;
//int numRecordsFound = 5 * 100;
//int numRecordsFound = Math.toIntExact(outSizes[3]) / 4;
//logger.error("LR Projector.java using outsizes numRecords=" + numRecordsFound + " outSizes[3]=" + outSizes[3]);
//logger.error("LR Projector.java using numRecords=" + numRecordsFound + " outSizes[3]=" + outSizes[3]);

//LR HACK 9-13 10:34
/*public void startList() {
Expand All @@ -482,8 +492,8 @@ public void endList() {
listStarted = false;
*/

/*ArrowBuf ab2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[3], outAddrs[3]);
for (int i = 0; i < 50; i++) {
ArrowBuf ab2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[3], outAddrs[3]);
/*for (int i = 0; i < 50; i++) {
System.out.println("LR arrowbuf=" + Integer.reverseBytes(ab2.getInt(i)));
System.out.println("LR arrowbuf=" + ab2.getInt(i));
System.out.println("LR arrowbuf=" + ab2.getShort(i));
Expand All @@ -498,18 +508,25 @@ public void endList() {
System.out.println("LR arrowbuf2=" + ab.getShort(i));
}*/


logger.error("LR Projector.java using numRecords=" +
selectionVectorRecordCount + " outSizes[3]=" + outSizes[3]);
UnionListWriter writer = ((ListVector) valueVector).getWriter();
for (int i = 0; i < 100; i++) {
for (int i = 0; i < selectionVectorRecordCount; i++) {
writer.startList();
writer.setPosition(i);
for (int j = 0; j < 5; j++) {
writer.writeInt(ab2.getInt((j + (5 * i)) * 4));
int index = ((j + (5 * i)) * 4);
//Not sure whats going on. Buffer too small?
try {
writer.writeInt(ab2.getInt(index));
} catch (IndexOutOfBoundsException e) {
continue;
}
}
writer.setValueCount(5);
writer.endList();
}
((ListVector) valueVector).setValueCount(100);
((ListVector) valueVector).setValueCount(selectionVectorRecordCount);


//LR HACK 9-13 10:34 All the multiline comment
Expand Down

0 comments on commit 7f2c64c

Please sign in to comment.