diff --git a/cpp/src/arrow/buffer.h b/cpp/src/arrow/buffer.h index 9270c4dea3fb6..66da004c2beb4 100644 --- a/cpp/src/arrow/buffer.h +++ b/cpp/src/arrow/buffer.h @@ -444,10 +444,21 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer { return Reserve(sizeof(T) * new_nb_elements); } + public: + uint8_t* offsetBuffer; + int64_t offsetCapacity; + protected: - ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {} + ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) { + offsetBuffer = nullptr; + offsetCapacity = 0; + + } ResizableBuffer(uint8_t* data, int64_t size, std::shared_ptr mm) - : MutableBuffer(data, size, std::move(mm)) {} + : MutableBuffer(data, size, std::move(mm)) { + offsetBuffer = nullptr; + offsetCapacity = 0; + } }; /// \defgroup buffer-allocation-functions Functions for allocating buffers diff --git a/cpp/src/gandiva/annotator.cc b/cpp/src/gandiva/annotator.cc index 2d91ba43ab435..4cc0e1dc29bb8 100644 --- a/cpp/src/gandiva/annotator.cc +++ b/cpp/src/gandiva/annotator.cc @@ -53,7 +53,7 @@ FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) { } if (field->type()->id() == arrow::Type::LIST) { - //std::cout << "LR Annotator::MakeDesc 1" << std::endl; + std::cout << "LR Annotator::MakeDesc 1" << std::endl; offsets_idx = buffer_count_++; if (arrow::is_binary_like(field->type()->field(0)->type()->id())) { child_offsets_idx = buffer_count_++; @@ -91,7 +91,7 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc, if (desc.HasOffsetsIdx()) { uint8_t* offsets_buf = const_cast(array_data.buffers[buffer_idx]->data()); - std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -4 " << &offsets_buf << std::endl; + std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -4 " << &offsets_buf << " using idx=" << buffer_idx << std::endl; eval_batch->SetBuffer(desc.offsets_idx(), offsets_buf, array_data.offset); if (desc.HasChildOffsetsIdx()) { @@ -139,7 +139,7 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc, uint8_t* data_buf = const_cast(array_data.child_data.at(0)->buffers[buffer_idx]->data()); - std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer 0 " << &data_buf << std::endl; + std::cout << "LR Annotator::PrepareBuffersForField setting offset eval buffer idx=" << buffer_idx << " data=" << &data_buf << std::endl; eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.child_data.at(0)->offset); //std::cout << "LR Annotator::PrepareBuffersForField 5a" << std::endl; } @@ -158,7 +158,7 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc, // list data buffer is in child data buffer uint8_t* data_buf_ptr = reinterpret_cast( array_data.child_data.at(0)->buffers[buffer_idx].get()); - std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer 2 " << &data_buf_ptr << std::endl; + std::cout << "LR Annotator::PrepareBuffersForField setting eval data buffer " << buffer_idx << " data=" << &data_buf_ptr << std::endl; eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.child_data.at(0)->offset); diff --git a/cpp/src/gandiva/array_ops.cc b/cpp/src/gandiva/array_ops.cc index fd04a84986974..0cbac7942bb06 100644 --- a/cpp/src/gandiva/array_ops.cc +++ b/cpp/src/gandiva/array_ops.cc @@ -95,7 +95,7 @@ int32_t* array_int32_make_array(int64_t context_ptr, int32_t contains_data, int3 //return reinterpret_cast(ret); return reinterpret_cast(ret); } - +/* int32_t* array_int32_remove(int64_t context_ptr, const int32_t* entry_buf, int32_t entry_offsets_len, int32_t remove_data, int32_t* out_len) { //std::cout << "LR array_int32_remove data=" << remove_data @@ -125,6 +125,36 @@ int32_t* array_int32_remove(int64_t context_ptr, const int32_t* entry_buf, //return reinterpret_cast(ret); return reinterpret_cast(ret); } +*/ +int32_t* array_int32_remove(int64_t context_ptr, const int32_t* entry_buf, + int32_t entry_offsets_len, int32_t remove_data, int32_t* out_len) { + //std::cout << "LR array_int32_remove data=" << remove_data + // << " entry_offsets_len " << entry_offsets_len << std::endl; + + std::vector newInts; + + for (int i = 0; i < entry_offsets_len; i++) { + //std::cout << "LR going to check " << entry_buf + i << std::endl; + int32_t entry_item = *(entry_buf + (i * 1)); + //std::cout << "LR checking value " << entry_len << " against target " << remove_data << std::endl; + if (entry_item == remove_data) { + continue; + } else { + newInts.push_back(entry_item); + } + } + + *out_len = newInts.size(); + int32_t outBufferLength = *out_len * sizeof(int); + //length is number of items, but buffers must account for byte size. + uint8_t* ret = gdv_fn_context_arena_malloc(context_ptr, outBufferLength); + memcpy(ret, newInts.data(), outBufferLength); + //std::cout << "LR made a buffer length" << *out_len * 4 << " item 3 is = " << int32_t(ret[3*4]) << std::endl; + + + //return reinterpret_cast(ret); + return reinterpret_cast(ret); +} int64_t array_utf8_length(int64_t context_ptr, const char* entry_buf, int32_t* entry_child_offsets, int32_t entry_offsets_len) { diff --git a/cpp/src/gandiva/decimal_ir.h b/cpp/src/gandiva/decimal_ir.h index 1a7cad7107036..b11730f1e231e 100644 --- a/cpp/src/gandiva/decimal_ir.h +++ b/cpp/src/gandiva/decimal_ir.h @@ -29,7 +29,7 @@ namespace gandiva { class DecimalIR : public FunctionIRBuilder { public: explicit DecimalIR(Engine* engine) - : FunctionIRBuilder(engine), enable_ir_traces_(true) {} + : FunctionIRBuilder(engine), enable_ir_traces_(false) {} /// Build decimal IR functions and add them to the engine. static Status AddFunctions(Engine* engine); diff --git a/cpp/src/gandiva/gdv_function_stubs.cc b/cpp/src/gandiva/gdv_function_stubs.cc index a82ba23974c09..3e506f83a33c3 100644 --- a/cpp/src/gandiva/gdv_function_stubs.cc +++ b/cpp/src/gandiva/gdv_function_stubs.cc @@ -173,14 +173,20 @@ int32_t gdv_fn_populate_varlen_vector(int64_t context_ptr, int8_t* data_ptr, context->set_error_msg(status.message().c_str()); \ return -1; \ } \ - std::cout << "LR populate_list slot " << slot << " offset = " << offset << " buffer = " << \ - (int64_t)(buffer->mutable_data() + offset) << std::endl; \ memcpy(buffer->mutable_data() + offset, (char*)entry_buf, entry_len * SCALE); \ + std::cout << "LR gdv_fn_populate buffer=" << buffer->data() << std::endl; \ + std::cout << " and offset=" << offsets << " * =" << *offsets << std::endl; \ + std::cout << "Setting offset slot=" << slot << "=" << offset / SCALE << std::endl; \ + std::cout << "Setting offset slot+1=" << slot + 1 << "=" << offset / SCALE + entry_len << std::endl; \ + offsets = reinterpret_cast(buffer->offsetBuffer); \ offsets[slot] = offset / SCALE; \ offsets[slot + 1] = offset / SCALE + entry_len; \ return 0; \ } + //buffer->offsetBuffer[slot] = offset / SCALE; + //buffer->offsetBuffer[slot + 1] = offset / SCALE + entry_len; + POPULATE_NUMERIC_LIST_TYPE_VECTOR(int32_t, 4) POPULATE_NUMERIC_LIST_TYPE_VECTOR(int64_t, 8) POPULATE_NUMERIC_LIST_TYPE_VECTOR(float, 4) diff --git a/cpp/src/gandiva/llvm_generator.cc b/cpp/src/gandiva/llvm_generator.cc index b3de4ac524387..a97c8b02b07ac 100644 --- a/cpp/src/gandiva/llvm_generator.cc +++ b/cpp/src/gandiva/llvm_generator.cc @@ -57,7 +57,7 @@ namespace gandiva { } }*/ -LLVMGenerator::LLVMGenerator(bool cached) : cached_(cached), enable_ir_traces_(true) {} +LLVMGenerator::LLVMGenerator(bool cached) : cached_(cached), enable_ir_traces_(false) {} Status LLVMGenerator::Make(std::shared_ptr config, bool cached, std::unique_ptr* llvm_generator) { @@ -472,7 +472,8 @@ Status LLVMGenerator::CodeGenExprValue(DexPtr value_expr, int buffer_count, output_value->length()->print(output2); - //std::cout << "LR gdv_fn_populate_list_int32_t_vector params are " << arg_context_ptr << "," << output_buffer_ptr_ref << "," + std::cout << "LR gdv_fn_populate_list_int32_t_vector params are " << arg_context_ptr << "," << output_buffer_ptr_ref << "," + << output_offset_ref << "," << loop_var << std::endl; // << output_offset_ref << "," << loop_var << "[[" << str1 << "]] [[" << str2 << "]]" << std::endl; AddFunctionCall("gdv_fn_populate_list_int32_t_vector", types()->i32_type(), {arg_context_ptr, output_buffer_ptr_ref, output_offset_ref, diff --git a/java/gandiva/src/main/cpp/jni_common.cc b/java/gandiva/src/main/cpp/jni_common.cc index 32ecf6beee1f8..1f647e0e2797b 100644 --- a/java/gandiva/src/main/cpp/jni_common.cc +++ b/java/gandiva/src/main/cpp/jni_common.cc @@ -84,10 +84,15 @@ static jclass gandiva_exception_; static jclass vector_expander_class_; static jclass listvector_expander_class_; static jclass vector_expander_ret_class_; +static jclass list_expander_ret_class_; static jmethodID vector_expander_method_; static jmethodID listvector_expander_method_; static jfieldID vector_expander_ret_address_; static jfieldID vector_expander_ret_capacity_; +static jfieldID list_expander_ret_address_; +static jfieldID list_expander_ret_capacity_; +static jfieldID list_expander_offset_ret_address_; +static jfieldID list_expander_offset_ret_capacity_; static jclass secondary_cache_class_; static jmethodID cache_get_method_; @@ -141,11 +146,25 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { vector_expander_ret_class_ = (jclass)env->NewGlobalRef(local_expander_ret_class); env->DeleteLocalRef(local_expander_ret_class); + jclass local_list_expander_ret_class = + env->FindClass("org/apache/arrow/gandiva/evaluator/ListVectorExpander$ExpandResult"); + list_expander_ret_class_ = (jclass)env->NewGlobalRef(local_list_expander_ret_class); + env->DeleteLocalRef(local_list_expander_ret_class); + vector_expander_ret_address_ = env->GetFieldID(vector_expander_ret_class_, "address", "J"); vector_expander_ret_capacity_ = env->GetFieldID(vector_expander_ret_class_, "capacity", "J"); + list_expander_ret_address_ = + env->GetFieldID(list_expander_ret_class_, "address", "J"); + list_expander_ret_capacity_ = + env->GetFieldID(list_expander_ret_class_, "capacity", "J"); + list_expander_offset_ret_address_ = + env->GetFieldID(list_expander_ret_class_, "offsetaddress", "J"); + list_expander_offset_ret_capacity_ = + env->GetFieldID(list_expander_ret_class_, "offsetcapacity", "J"); + jclass local_cache_class = env->FindClass("org/apache/arrow/gandiva/evaluator/JavaSecondaryCacheInterface"); secondary_cache_class_ = (jclass)env->NewGlobalRef(local_cache_class); @@ -175,8 +194,9 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { env->DeleteGlobalRef(configuration_builder_class_); env->DeleteGlobalRef(gandiva_exception_); env->DeleteGlobalRef(vector_expander_class_); - env->DeleteGlobalRef(vector_expander_class_); + env->DeleteGlobalRef(listvector_expander_class_); env->DeleteGlobalRef(vector_expander_ret_class_); + env->DeleteGlobalRef(list_expander_ret_class_); env->DeleteGlobalRef(secondary_cache_class_); env->DeleteGlobalRef(cache_buf_ret_class_); } @@ -884,12 +904,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_build class JavaResizableBuffer : public arrow::ResizableBuffer { public: JavaResizableBuffer(JNIEnv* env, jobject jexpander, jmethodID jmethod, int32_t vector_idx, uint8_t* buffer, - int32_t len) + int32_t len, bool isListVec = false) : ResizableBuffer(buffer, len), env_(env), jexpander_(jexpander), vector_idx_(vector_idx), - method_(jmethod) { + method_(jmethod), + isList(isListVec) + { size_ = 0; } @@ -897,11 +919,12 @@ class JavaResizableBuffer : public arrow::ResizableBuffer { Status Reserve(const int64_t new_capacity) override; - private: + public: JNIEnv* env_; jobject jexpander_; jmethodID method_; int32_t vector_idx_; + bool isList; }; Status JavaResizableBuffer::Reserve(const int64_t new_capacity) { @@ -919,16 +942,36 @@ Status JavaResizableBuffer::Reserve(const int64_t new_capacity) { return Status::OutOfMemory("buffer expand failed in java."); } - jlong ret_address = env_->GetLongField(ret, vector_expander_ret_address_); - jlong ret_capacity = env_->GetLongField(ret, vector_expander_ret_capacity_); - std::cout << "Buffer expand: New capacity is " << new_capacity << + if (isList) { + jlong ret_address = env_->GetLongField(ret, list_expander_ret_address_); + jlong ret_capacity = env_->GetLongField(ret, list_expander_ret_capacity_); + jlong offset_ret_address = env_->GetLongField(ret, list_expander_offset_ret_address_); + jlong offset_ret_capacity = env_->GetLongField(ret, list_expander_offset_ret_capacity_); + + std::cout << "Buffer expand: New capacity is " << new_capacity << " vector id " << vector_idx_ << " expander method " << method_ << " jexpander_ " << jexpander_ << " returned size is " << ret_capacity << - " and the original buffer ptr=" << data_ << " and the new ptr=" << ret_address << std::endl; + " and the original buffer ptr=" << reinterpret_cast(data_) << " and the new ptr=" << ret_address << + " and the original offset ptr=" << reinterpret_cast(offsetBuffer) << " and the new ptr=" << offset_ret_address << std::endl; - data_ = reinterpret_cast(ret_address); - capacity_ = ret_capacity; + data_ = reinterpret_cast(ret_address); + capacity_ = ret_capacity; + + offsetBuffer = reinterpret_cast(offset_ret_address); + offsetCapacity = offset_ret_capacity; + } else { + jlong ret_address = env_->GetLongField(ret, vector_expander_ret_address_); + jlong ret_capacity = env_->GetLongField(ret, vector_expander_ret_capacity_); + + std::cout << "Buffer expand: New capacity is " << new_capacity << + " vector id " << vector_idx_ << " expander method " << method_ << + " jexpander_ " << jexpander_ << " returned size is " << ret_capacity << + " and the original buffer ptr=" << reinterpret_cast(data_) << " and the new ptr=" << ret_address << std::endl; + + data_ = reinterpret_cast(ret_address); + capacity_ = ret_capacity; + } return Status::OK(); } @@ -1039,6 +1082,7 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( break; } + std::shared_ptr outBufJava = nullptr; auto ret_types = holder->rettypes(); ArrayDataVector output; int buf_idx = 0; @@ -1047,14 +1091,14 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( for (FieldPtr field : ret_types) { std::vector> buffers; - //std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector -2 adding buffer" << std::endl; + std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector -2 adding buffer idx=" << buf_idx << std::endl; CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len); uint8_t* validity_buf = reinterpret_cast(out_bufs[buf_idx++]); jlong bitmap_sz = out_sizes[sz_idx++]; buffers.push_back(std::make_shared(validity_buf, bitmap_sz)); if (arrow::is_binary_like(field->type()->id())) { - //std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector -1 adding buffer" << std::endl; + std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector -1 adding bufferbuffer idx=" << buf_idx << std::endl; CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len); uint8_t* offsets_buf = reinterpret_cast(out_bufs[buf_idx++]); jlong offsets_sz = out_sizes[sz_idx++]; @@ -1064,7 +1108,7 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len); uint8_t* value_buf = reinterpret_cast(out_bufs[buf_idx++]); - jlong data_sz = out_sizes[sz_idx++] * 1000; + jlong data_sz = out_sizes[sz_idx++]; if (arrow::is_binary_like(field->type()->id())) { if (jexpander == nullptr) { status = Status::Invalid( @@ -1073,17 +1117,20 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( break; } - //std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 1 adding buffer size=" << data_sz << std::endl; + std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 1 adding buffer buffer idx=" << buf_idx - 1 << " size=" << data_sz << std::endl; buffers.push_back(std::make_shared( env, jexpander, vector_expander_method_, output_vector_idx, value_buf, data_sz)); } else if (field->type()->id() == arrow::Type::LIST) { - //std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 2 adding buffer size=" << data_sz << std::endl; - buffers.push_back(std::make_shared( + std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 2 adding list offset buffer idx=" << buf_idx - 1 << " size=" << data_sz << std::endl; + std::cout << " size=" << out_sizes[sz_idx - 1] << " outsize index=" << sz_idx - 1 << " address " << out_bufs[buf_idx - 1] + << " output_vector_idx=" << output_vector_idx << std::endl; + buffers.push_back(std::make_shared( env, jexpander, vector_expander_method_, output_vector_idx, value_buf, data_sz)); } else { buffers.push_back(std::make_shared(value_buf, data_sz)); } + if (field->type()->id() == arrow::Type::LIST) { std::vector> child_buffers; @@ -1099,7 +1146,7 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( //LR TODO the two buffers... data_sz = out_sizes[sz_idx++]; - std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 3 adding buffer " << buf_idx + std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 3 adding child nbuffer " << buf_idx << " size=" << data_sz << std::endl; CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len); uint8_t* child_offset_buf = reinterpret_cast(out_bufs[buf_idx++]); @@ -1107,14 +1154,18 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( env, jListExpander, listvector_expander_method_, output_vector_idx, child_offset_buf, data_sz)); - std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 4 adding buffer " << buf_idx + std::cout << "LR Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector 4 adding child buffer " << buf_idx << " size=" << out_sizes[sz_idx] << " outsize index=" << sz_idx << " address " << out_bufs[buf_idx] << " output_vector_idx=" << output_vector_idx << std::endl; data_sz = out_sizes[sz_idx++]; CHECK_OUT_BUFFER_IDX_AND_BREAK(buf_idx, out_bufs_len); uint8_t* child_data_buf = reinterpret_cast(out_bufs[buf_idx++]); - child_buffers.push_back(std::make_shared( - env, jListExpander, listvector_expander_method_, output_vector_idx, child_data_buf, data_sz)); + + outBufJava = std::make_shared( + env, jListExpander, listvector_expander_method_, output_vector_idx, child_data_buf, data_sz, true); + outBufJava->offsetBuffer = reinterpret_cast(out_bufs[1]); + outBufJava->offsetCapacity = out_sizes[1]; + child_buffers.push_back(outBufJava); std::shared_ptr dt2 = std::make_shared(); auto array_data_child = arrow::ArrayData::Make(dt2, output_row_count, child_buffers); @@ -1163,29 +1214,51 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( //LRTest1 Start int numRecords = (array_data->child_data[0])->length; //int numRecords = (array_data->child_data[0])->length * array_data->length; - int recordSize = numRecords * 4; //LR TODO HACK - std::cout << "LR jni_common there are records=" << array_data->length << " and the first one is=" - << (array_data->child_data[0])->length << " using numRecords=" << numRecords << std::endl; - std::cout << "LR jni_common out_bufs[3]=" << out_bufs[3] << " after eval=" - << (jlong)(array_data->child_data[0])->buffers[1]->data() << std::endl; + //std::cout << "LR jni_common there are records=" << array_data->length << " and the first one is=" + // << (array_data->child_data[0])->length << " using numRecords=" << numRecords << std::endl; + //std::cout << "LR jni_common out_bufs[3]=" << out_bufs[3] << " after eval=" + // << (jlong)(array_data->child_data[0])->buffers[1]->data() << std::endl; //LR test1 out_bufs[3] = (jlong)(array_data->child_data[0])->buffers[1]->data(); out_sizes[3] = (jlong)(array_data->child_data[0])->buffers[1]->capacity(); //Copy the new buffer ptr back to Java. The above two lines don't copy it to java, just to the local array. - env->SetLongArrayRegion(out_buf_addrs, 0, out_bufs_len, out_bufs); - env->SetLongArrayRegion(out_buf_sizes, 0, out_bufs_len, out_sizes); + //env->SetLongArrayRegion(out_buf_addrs, 0, out_bufs_len, out_bufs); + //env->SetLongArrayRegion(out_buf_sizes, 0, out_bufs_len, out_sizes); + + //array_data.child_data.at(0)->offset) + //env->ReleaseLongArrayElements(out_buf_addrs, out_bufs, JNI_ABORT); //memcpy((void*)out_bufs[3], (array_data->child_data[0])->buffers[1]->data(), recordSize); //out_sizes[3] = recordSize; //int test[] = {42,21,42,21,42}; //memcpy((void *)out_bufs[3], test, 20); + /*out_sizes[2] = numRecords * 20; + int test[numRecords * 20]; + for (int i = 0; i < numRecords; i++) { + test[i] = 0; + } + memcpy((void *)out_bufs[2], test, numRecords*4); + */ + + //LR test1 Havent tried yet. + //out_bufs[2] = (jlong)(array_data->child_data[0])->buffers[0]->data(); + //out_sizes[2] = (jlong)(array_data->child_data[0])->buffers[0]->capacity(); + + //out_bufs[1] = (jlong)(array_data->child_data[0])->buffers[0]->data(); + //out_sizes[1] = (jlong)(array_data->child_data[0])->buffers[0]->capacity(); + + //out_bufs[1] = (jlong)(array_data)->buffers[0]->data(); + //out_sizes[1] = (jlong)(array_data)->buffers[0]->capacity(); + out_bufs[1] = (jlong) outBufJava->offsetBuffer; + out_sizes[1] = (jlong) outBufJava->offsetCapacity; + + env->SetLongArrayRegion(out_buf_addrs, 0, out_bufs_len, out_bufs); + env->SetLongArrayRegion(out_buf_sizes, 0, out_bufs_len, out_sizes); - //std::cout << "LR jni_common the (validity)? buffer has size=" << out_sizes[2] << " and the first thing is " - //<< out_bufs[2] << " and the second is " << (out_bufs[2])[1] std::endl; //validity buffer? diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java index 85ff261e3d85e..c14d2e810e83b 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/ListVectorExpander.java @@ -17,6 +17,7 @@ package org.apache.arrow.gandiva.evaluator; +import org.apache.arrow.memory.ArrowBuf; import org.apache.arrow.vector.complex.ListVector; /** @@ -36,10 +37,22 @@ public ListVectorExpander(ListVector[] vectors) { public static class ExpandResult { public long address; public long capacity; + public long offsetaddress; + public long offsetcapacity; - public ExpandResult(long address, long capacity) { + /** + * fdsfsdfds. + * @param address dsfds + * @param capacity dfsdf + * @param offsetad dsfdsfsd + * @param offsetcap dfsfs + * + */ + public ExpandResult(long address, long capacity, long offsetad, long offsetcap) { this.address = address; this.capacity = capacity; + this.offsetaddress = offsetad; + this.offsetcapacity = offsetcap; } } @@ -57,15 +70,25 @@ public ExpandResult expandOutputVectorAtIndex(int index, long toCapacity) { throw new IllegalArgumentException("invalid index " + index); } + int valueBufferIndex = 1; ListVector vector = vectors[index]; - while (vector.getDataVector().getFieldBuffers().get(0).capacity() < toCapacity) { + while (vector.getDataVector().getFieldBuffers().get(valueBufferIndex).capacity() < toCapacity) { vector.reAlloc(); } System.out.println("LR Expanding ListVector. New capacity=" + - vector.getDataVector().getFieldBuffers().get(0).capacity()); + vector.getDataVector().getFieldBuffers().get(valueBufferIndex).capacity()); + System.out.println("LR Expanding ListVector. Offset data is "); + ArrowBuf ab = vector.getOffsetBuffer(); + String s = "offsetBuffer = ["; + for (int i = 0; i < 20; i++) { + s += ab.getInt(i) + ","; + } + System.out.println(s); return new ExpandResult( - vector.getDataVector().getFieldBuffers().get(0).memoryAddress(), - vector.getDataVector().getFieldBuffers().get(0).capacity()); + vector.getDataVector().getFieldBuffers().get(valueBufferIndex).memoryAddress(), + vector.getDataVector().getFieldBuffers().get(valueBufferIndex).capacity(), + vector.getOffsetBuffer().memoryAddress(), + vector.getOffsetBuffer().capacity()); } } diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java index c412b5f1baae1..89321f5911ad6 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java @@ -27,13 +27,12 @@ import org.apache.arrow.gandiva.ipc.GandivaTypes; import org.apache.arrow.gandiva.ipc.GandivaTypes.SelectionVectorType; import org.apache.arrow.memory.ArrowBuf; -import org.apache.arrow.memory.ReferenceManager; import org.apache.arrow.vector.BaseVariableWidthVector; +import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VariableWidthVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.StructVector; -import org.apache.arrow.vector.complex.impl.UnionListWriter; import org.apache.arrow.vector.ipc.message.ArrowBuffer; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.types.pojo.Schema; @@ -380,6 +379,7 @@ private void evaluate(int numRows, List buffers, List buf outAddrs[idx] = valueVector.getValidityBuffer().memoryAddress(); outSizes[idx++] = valueVector.getValidityBuffer().capacity(); if (isVarWidth) { + logger.error("LR Projector.java evaluate isVarWidth setting buffer=" + idx); outAddrs[idx] = valueVector.getOffsetBuffer().memoryAddress(); outSizes[idx++] = valueVector.getOffsetBuffer().capacity(); hasVariableWidthColumns = true; @@ -393,6 +393,12 @@ private void evaluate(int numRows, List buffers, List buf } if (valueVector instanceof ListVector) { + /*((ListVector) valueVector).reAlloc(); + ((ListVector) valueVector).reAlloc(); + ((ListVector) valueVector).reAlloc(); //100 rows + ((ListVector) valueVector).reAlloc(); + ((ListVector) valueVector).reAlloc();*/ + hasVariableWidthColumns = true; resizableListVectors[outColumnIdx] = (ListVector) valueVector; //LR TODO figure out what to use here resizableVectors[outColumnIdx] = (BaseVariableWidthVector) valueVector; @@ -403,22 +409,20 @@ private void evaluate(int numRows, List buffers, List buf logger.error("LR Projector.java evaluate ListVector has buffers=" + fieldBufs.size()); + logger.error("LR Projector.java evaluate isVarlistvector Width setting buffer=" + idx); outAddrs[idx] = valueVector.getOffsetBuffer().memoryAddress(); outSizes[idx++] = valueVector.getOffsetBuffer().capacity(); //vector valid + logger.error("LR Projector.java evaluate isVarlistvector Width setting buffer=" + idx); outAddrs[idx] = ((ListVector) valueVector).getDataVector().getValidityBufferAddress(); outSizes[idx++] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0).capacity(); //vector offset logger.error("LR Projector.java evaluate ListVector passing data buffer as " + idx); - /*((ListVector) valueVector).reAlloc(); - ((ListVector) valueVector).reAlloc(); - ((ListVector) valueVector).reAlloc(); //100 rows - ((ListVector) valueVector).reAlloc(); - ((ListVector) valueVector).reAlloc(); - */ + + //This doesnt actually allocate any memory. //((ListVector) valueVector).setInitialCapacity(1000000); @@ -426,11 +430,12 @@ private void evaluate(int numRows, List buffers, List buf // ((ListVector) valueVector).reAlloc(); //} + logger.error("LR Projector.java evaluate isVarlistvector Width setting buffer=" + idx); //The realloc avoids dynamic resizing, will have to be fixed later. - outAddrs[idx] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0).memoryAddress(); - outSizes[idx++] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0).capacity(); - logger.error("LR Projector.java evaluate ListVector set buffer " + idx + - " as ptr=" + outAddrs[idx - 1] + " size " + outSizes[idx - 1]); + outAddrs[idx] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(1).memoryAddress(); + outSizes[idx++] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(1).capacity(); + //logger.error("LR Projector.java evaluate ListVector set buffer " + idx + + // " as ptr=" + outAddrs[idx - 1] + " size " + outSizes[idx - 1]); //vector data //outAddrs[idx] = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(2).memoryAddress(); @@ -459,6 +464,7 @@ private void evaluate(int numRows, List buffers, List buf logger.error("LR Projector.java evaluate calling evaluateProjector with buffers=" + idx); logger.error("LR Projector.java before evaluateProjector buffer[3]=" + outAddrs[3]); + logger.error("LR Projector.java before evaluateProjector buffer[1]=" + outAddrs[1]); wrapper.evaluateProjector( hasVariableWidthColumns ? new VectorExpander(resizableVectors) : null, hasVariableWidthColumns ? new ListVectorExpander(resizableListVectors) : null, @@ -474,6 +480,7 @@ private void evaluate(int numRows, List buffers, List buf logger.error("LR Projector.java after evaluateProjector buffer[3]=" + outAddrs[3]); + logger.error("LR Projector.java after evaluateProjector buffer[1]=" + outAddrs[1]); for (ValueVector valueVector : outColumns) { if (valueVector instanceof ListVector) { //LR HACK @@ -498,25 +505,14 @@ public void endList() { //ArrowBuf ab = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[2], outAddrs[2]); - ArrowBuf ab2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[3], outAddrs[3]); - /*for (int i = 0; i < 50; i++) { - System.out.println("LR arrowbuf=" + Integer.reverseBytes(ab2.getInt(i))); - System.out.println("LR arrowbuf=" + ab2.getInt(i)); - System.out.println("LR arrowbuf=" + ab2.getShort(i)); - System.out.println("LR arrowbuf=" + ab2.getInt(i * 4)); - System.out.println("LR arrowbuf======"); - }*/ - /*ArrowBuf ab = ((ListVector) valueVector).getDataVector().getFieldBuffers().get(0); - for (int i = 0; i < 50; i++) { - System.out.println("LR arrowbuf2=" + Integer.reverseBytes(ab.getInt(i))); - System.out.println("LR arrowbuf2=" + ab.getInt(i)); - System.out.println("LR arrowbuf2=" + ab.getShort(i)); - }*/ + //ArrowBuf ab2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[3], outAddrs[3]); logger.error("LR Projector.java using numRecords=" + selectionVectorRecordCount + " outSizes[3]=" + outSizes[3]); - UnionListWriter writer = ((ListVector) valueVector).getWriter(); + + //import org.apache.arrow.vector.complex.impl.UnionListWriter; + /*UnionListWriter writer = ((ListVector) valueVector).getWriter(); for (int i = 0; i < selectionVectorRecordCount; i++) { writer.startList(); writer.setPosition(i); @@ -533,7 +529,147 @@ public void endList() { writer.setValueCount(5); writer.endList(); } - ((ListVector) valueVector).setValueCount(selectionVectorRecordCount); + ((ListVector) valueVector).setValueCount(selectionVectorRecordCount);*/ + + + //offsetBuffer = [0,83886080,327680,1280,5,167772160,655360,2560,10,251658240,983040,3840,15, + //335544320,1310720,5120,20, + //419430400,1638400,6400,25,503316480,1966080,7680,30,587202560,2293760,8960,35,671088640,2621440,10240,40, + //754974720,2949120,11520, + + + + + + + + + + + + String s = ""; + List fv = ((ListVector) valueVector).getDataVector().getFieldBuffers(); + for (ArrowBuf ab : fv) { + s = ""; + for (int i = 0; i < 20; i++) { + s += ab.getInt(i) + ","; + } + logger.error("LR Projector.java before updating listvector. size=" + + ab.capacity() + " buffer=" + s); + } + + ArrowBuf fvv = ((ListVector) valueVector).getValidityBuffer(); + s = ""; + for (int i = 0; i < 20; i++) { + s += fvv.getInt(i) + ","; + } + logger.error("LR Projector.java before updating listvector. getValidityBuffer=" + + fvv.capacity() + " buffer=" + s); + + ArrowBuf fvvv = ((ListVector) valueVector).getOffsetBuffer(); + s = ""; + for (int i = 0; i < 20; i++) { + s += fvvv.getInt(i) + ","; + } + logger.error("LR Projector.java before updating listvector. getOffsetBuffer=" + + fvvv.capacity() + " buffer=" + s); + + + ((ListVector) valueVector).getDataVector().setValueCount(selectionVectorRecordCount * 5); + + ((ListVector) valueVector).setLastSet(selectionVectorRecordCount - 1); + /* + //Validity then data. + ArrowBuf abb = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[2], outAddrs[2]); + ArrowBuf abb2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[3], outAddrs[3]); + List outBufsNew = new ArrayList(); + + //outBufsNew.add(ab0); + outBufsNew.add(abb); + outBufsNew.add(abb2); + ArrowFieldNode afn = new ArrowFieldNode(selectionVectorRecordCount * 5, 0); + ((ListVector) valueVector).getDataVector().clear(); + ((ListVector) valueVector).getDataVector().loadFieldBuffers(afn, outBufsNew); + + //TODO Need to get validity [0] and offset [1] buffer for the listvector. + //((ListVector) valueVector).getDataVector().loadFieldBuffers(afn, outBufsNew); + + List outBufsNew2 = new ArrayList(); + + + + ArrowBuf mabb22 = new ArrowBuf(ReferenceManager.NO_OP, null, selectionVectorRecordCount, outAddrs[0]); + for (int i = 0; i < selectionVectorRecordCount; i++) { + BitVectorHelper.setBit(mabb22, i); + } + + ArrowBuf mabb2 = new ArrowBuf(ReferenceManager.NO_OP, null, outSizes[1], outAddrs[1]); + //for (int i = 0; i < selectionVectorRecordCount; i++) { + // mabb2.setInt(i * 4, 5 * i); + //} + s = "offset? buffer mabb2, outAddrs[0]="; + for (int i = 0; i < 20; i++) { + s += mabb2.getInt(i) + ","; + } + System.out.println(s); + + outBufsNew2.add(mabb22); + outBufsNew2.add(mabb2); + ArrowFieldNode afn2 = new ArrowFieldNode(selectionVectorRecordCount, 0); + ((ListVector) valueVector).loadFieldBuffers(afn2, outBufsNew2); + + + */ + + //((ListVector) valueVector).setValueCount(selectionVectorRecordCount); + //((ListVector) valueVector).getDataVector().setValueCount(selectionVectorRecordCount); + + int simple = 0; + try { + for (int i = 0; i < selectionVectorRecordCount * 5; i++) { + BitVectorHelper.setBit(((ListVector) valueVector).getDataVector().getValidityBuffer(), i); + simple++; + } + } catch (IndexOutOfBoundsException e) { + simple = 0; + } + try { + for (int i = 0; i < selectionVectorRecordCount; i++) { + BitVectorHelper.setBit(((ListVector) valueVector).getValidityBuffer(), i); + simple++; + } + } catch (IndexOutOfBoundsException e) { + simple = 0; + } + + + + + + + /* + + + + try { + for (int i = 0; i < selectionVectorRecordCount; i++) { + BitVectorHelper.setBit(((ListVector) valueVector).getValidityBuffer(), i); + simple++; + } + } catch (IndexOutOfBoundsException e) { + simple = 0; + } + + + for (int i = 0; i < selectionVectorRecordCount; i++) { + ((ListVector) valueVector).getOffsetBuffer().setInt(i * 4, 5 * i); + } + */ + + + + + //LR HACK 9-13 10:34 All the multiline comment