Skip to content

Commit

Permalink
Cleanup and test
Browse files Browse the repository at this point in the history
  • Loading branch information
lriggs committed Oct 25, 2023
1 parent a4ee4ae commit 316b822
Show file tree
Hide file tree
Showing 17 changed files with 35 additions and 951 deletions.
58 changes: 1 addition & 57 deletions cpp/src/gandiva/annotator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) {
}

if (field->type()->id() == arrow::Type::LIST) {
//std::cout << "LR Annotator::MakeDesc 1" << std::endl;
offsets_idx = buffer_count_++;
if (arrow::is_binary_like(field->type()->field(0)->type()->id())) {
child_offsets_idx = buffer_count_++;
Expand All @@ -64,10 +63,7 @@ FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) {
data_buffer_ptr_idx = buffer_count_++;
}
int child_valid_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
//if (is_output) {
child_valid_buffer_ptr_idx = buffer_count_++;
//std::cout << "LR Annotator::MakeDesc 2 child_valid_buffer_ptr_idx=" << child_valid_buffer_ptr_idx << std::endl;
//}
child_valid_buffer_ptr_idx = buffer_count_++;
return std::make_shared<FieldDescriptor>(field, data_idx, validity_idx, offsets_idx,
data_buffer_ptr_idx, child_offsets_idx, child_valid_buffer_ptr_idx);
}
Expand All @@ -86,96 +82,61 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
// The validity buffer is optional. Use nullptr if it does not have one.
if (array_data.buffers[buffer_idx]) {
uint8_t* validity_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -6 " << &validity_buf << std::endl;
eval_batch->SetBuffer(desc.validity_idx(), validity_buf, array_data.offset);
} else {
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -5 null " << std::endl;
eval_batch->SetBuffer(desc.validity_idx(), nullptr, array_data.offset);
}
++buffer_idx;

if (desc.HasOffsetsIdx()) {
uint8_t* offsets_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -4 " << &offsets_buf << " using idx=" << buffer_idx << std::endl;
eval_batch->SetBuffer(desc.offsets_idx(), offsets_buf, array_data.offset);

if (desc.HasChildOffsetsIdx()) {
//std::cout << "LR Annotator::PrepareBuffersForField 1 for field " << desc.Name() << " type is " << array_data.type->id() << std::endl;
if (is_output) {
// if list field is output field, we should put buffer pointer into eval batch
// for resizing
uint8_t* child_offsets_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -3a " << &child_offsets_buf << std::endl;
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0].get());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -3b " << &child_valid_buf << std::endl;
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);

} else {
//std::cout << "LR Annotator::PrepareBuffersForField 2" << std::endl;
// if list field is input field, just put buffer data into eval batch
uint8_t* child_offsets_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx]->data());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -2a " << &child_offsets_buf << std::endl;
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0]->data());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -2b " << &child_valid_buf << std::endl;
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);
}
}
if (array_data.type->id() != arrow::Type::LIST ||
arrow::is_binary_like(array_data.type->field(0)->type()->id())) {
//std::cout << "LR Annotator::PrepareBuffersForField 3" << std::endl;

// primitive type list data buffer index is 1
// binary like type list data buffer index is 2
++buffer_idx;
}
}

if (array_data.type->id() != arrow::Type::LIST) {
//std::cout << "LR Annotator::PrepareBuffersForField 4" << std::endl;

//std::cout << "LR Annotator::PrepareBuffersForField 4 buffer_idx " << buffer_idx << std::endl;
uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
//std::cout << "LR Annotator::PrepareBuffersForField 4a" << std::endl;
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer -1 " << &data_buf << std::endl;
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
//std::cout << "LR Annotator::PrepareBuffersForField 4b" << std::endl;
} else {
//std::cout << "LR Annotator::PrepareBuffersForField 5 " << desc.Name() << " buffer_idx " << buffer_idx << std::endl;
//std::cout << "LR Annotator::PrepareBuffersForField 5 array_data child size " << array_data.child_data.size() << std::endl;

//std::cout << "LR array_data.child_data.at(0)->buffers[0]=" << array_data.child_data.at(0)->buffers[0] << std::endl;
//uint8_t* data_valid_buf =
// const_cast<uint8_t*>(array_data.child_data.at(0)->buffers[0]->data());
//std::cout << "LR Annotator::PrepareBuffersForField setting offset eval data_valid_buf idx=" << 0 << " data_valid_buf=" << &data_valid_buf << std::endl;
//eval_batch->SetBuffer(desc.child_data_validity_idx(), data_valid_buf, array_data.child_data.at(0)->offset);


uint8_t* data_buf =
const_cast<uint8_t*>(array_data.child_data.at(0)->buffers[buffer_idx]->data());
//std::cout << "LR Annotator::PrepareBuffersForField setting data buffer desc.data_idx()=" << desc.data_idx() << " idx=" << buffer_idx << " data=" << data_buf << std::endl;
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.child_data.at(0)->offset);
//std::cout << "LR Annotator::PrepareBuffersForField 5a" << std::endl;


//std::cout << "LR array_data.child_data.at(0)->buffers[0]->data() is " << array_data.child_data.at(0)->buffers[0] << std::endl;
if (array_data.child_data.at(0)->buffers[0] ) {
uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0]->data());
//desc.set_child_data_validity_idx(4);
// std::cout << "LR Annotator::PrepareBuffersForField setting child valid buffer -5b " <<
//" name=" << desc.Name() << " idx=" << desc.child_data_validity_idx() << " child_data_buf=" << *child_valid_buf << std::endl;
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf, 0);
}

Expand All @@ -187,16 +148,11 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
if (array_data.type->id() != arrow::Type::LIST) {
uint8_t* data_buf_ptr =
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval buffer 1 " << &data_buf_ptr << std::endl;
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);
} else {
//std::cout << "LR Annotator::PrepareBuffersForField is_output index " << desc.data_buffer_ptr_idx() << std::endl;

// list data buffer is in child data buffer
uint8_t* data_buf_ptr = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
//std::cout << "LR Annotator::PrepareBuffersForField setting eval data buffer " << buffer_idx << " data=" << &data_buf_ptr << std::endl;

eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr,
array_data.child_data.at(0)->offset);
}
Expand All @@ -209,7 +165,6 @@ EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
EvalBatchPtr eval_batch = std::make_shared<EvalBatch>(
record_batch.num_rows(), buffer_count_, local_bitmap_count_);

//std::cout << "LR PrepareEvalBatch 1" << std::endl;
// Fill in the entries for the input fields.
for (int i = 0; i < record_batch.num_columns(); ++i) {
const std::string& name = record_batch.column_name(i);
Expand All @@ -218,28 +173,17 @@ EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
// skip columns not involved in the expression.
continue;
}

/*std::cout << "LR PrepareEvalBatch 1a i=" << i << " record batch schema " << record_batch.schema()->ToString()
<< " num rows " << record_batch.num_rows()
<< " num columns " << record_batch.num_columns()
<< " data size " << record_batch.column_data().size()
<< " col 1 " << record_batch.column(0)->ToString()
<< std::endl;*/

//std::cout << "LR PrepareEvalBatch 1a i=" << i << " record batch data " << record_batch.ToString() << std::endl;
PrepareBuffersForField(*(found->second), *(record_batch.column_data(i)),
eval_batch.get(), false /*is_output*/);
}

// Fill in the entries for the output fields.
//std::cout << "LR PrepareEvalBatch preparing output fields" << std::endl;
int idx = 0;
for (auto& arraydata : out_vector) {
const FieldDescriptorPtr& desc = out_descs_.at(idx);
PrepareBuffersForField(*desc, *arraydata, eval_batch.get(), true /*is_output*/);
++idx;
}
//std::cout << "LR PrepareEvalBatch 2" << std::endl;
return eval_batch;
}

Expand Down
89 changes: 5 additions & 84 deletions cpp/src/gandiva/array_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,18 +74,15 @@ bool array_int32_contains_int32(int64_t context_ptr, const int32_t* entry_buf,
return false;
}

//LR TODO
int32_t* array_int32_make_array(int64_t context_ptr, int32_t contains_data, int32_t* out_len) {
//std::cout << "LR array_int32_make_array offset data=" << contains_data << std::endl;

int integers[] = { contains_data, 21, 3, contains_data, 5 };
*out_len = 5;// * 4;
//length is number of items, but buffers must account for byte size.
uint8_t* ret = gdv_fn_context_arena_malloc(context_ptr, *out_len * 4);
memcpy(ret, integers, *out_len * 4);
//std::cout << "LR made a buffer length" << *out_len * 4 << " item 3 is = " << int32_t(ret[3*4]) << std::endl;


//return reinterpret_cast<int32_t*>(ret);
return reinterpret_cast<int32_t*>(ret);
}

Expand All @@ -94,7 +91,6 @@ bool array_int64_contains_int64(int64_t context_ptr, const int64_t* entry_buf,
int64_t contains_data, bool entry_validWhat,
int64_t loop_var, int64_t validity_index_var,
bool* valid_row) {
//std::cout << "LR array_int64_contains_int64 offset length=" << entry_offsets_len << std::endl;
if (!combined_row_validity) {
*valid_row = false;
return false;
Expand All @@ -108,8 +104,7 @@ bool array_int64_contains_int64(int64_t context_ptr, const int64_t* entry_buf,
if (!arrow::bit_util::GetBit(reinterpret_cast<const uint8_t*>(entry_validityAdjusted), validityBitIndex + i)) {
continue;
}
int64_t entry_len = *(entry_buf + (i*2)); //LR TODO sizeof int64?
//std::cout << "LR checking value " << entry_len << " against target " << contains_data << std::endl;
int64_t entry_len = *(entry_buf + (i*2));
if (entry_len == contains_data) {
return true;
}
Expand All @@ -120,85 +115,29 @@ bool array_int64_contains_int64(int64_t context_ptr, const int64_t* entry_buf,
int32_t* array_int32_remove(int64_t context_ptr, const int32_t* entry_buf,
int32_t entry_len, const int32_t* entry_validity, bool combined_row_validity,
int32_t remove_data, bool entry_validWhat,
/*const int32_t* array_valid_bits,*/ int64_t loop_var, int64_t validity_index_var,
int64_t loop_var, int64_t validity_index_var,
bool* valid_row, int32_t* out_len, int32_t** valid_ptr) {
//std::cout << "LR array_int32_remove data=" << remove_data
// << " entry_offsets_len " << entry_offsets_len << std::endl;

//std::cout << "LR array_int32_remove " << loop_var << std::endl;
std::vector<int> newInts;


/*std::bitset<8> validBits(*entry_valid); //LR TODO handle size.
std::bitset<8> outputValidBits;
std::cout << "LR Entry bitset is " << validBits << std::endl;
for (int i = 0; i < entry_offsets_len; i++) {
//std::cout << "LR going to check " << entry_buf + i << std::endl;
int32_t entry_item = *(entry_buf + (i * 1));
//std::cout << "LR checking value " << entry_len << " against target " << remove_data << std::endl;
if (entry_item == remove_data) {
continue;
} else if (!validBits[i]) {
outputValidBits[i] = 0;
newInts.push_back(0); //This will be marked invalid, so data doesn't matter.
} else {
outputValidBits[i] = 1;
//Note the vector can have n elements, while validbits might have n+1.
newInts.push_back(entry_item);
}
}*/

//std::cout << "LR entry_buf=" << entry_buf << " *entry_buf=" << entry_buf << std::endl;
//std::cout << "LR notSureWhatThisIs=" << notSureWhatThisIs << " *notSureWhatThisIs=" << *notSureWhatThisIs << std::endl;
std::cout << "LR combined_row_validity=" << combined_row_validity << " entry_validWhat=" << entry_validWhat << " validity_index_var=" << validity_index_var <<
" entry_validity=" << entry_validity << std::endl;
//<< " *notSureWhatThisIs=" << *notSureWhatThisIs << std::endl;

//LR TODO not sure what entry_validWhat is.
//LR TODO I'm not sure why entry_validty increases for each loop. It starts as the pointer to the validity buffer, so adjust here.
const int32_t* entry_validityAdjusted = entry_validity - (loop_var );
//std::bitset<15> maybeInputBits (*notSureWhatThisIsAdjusted);
//std::cout << "LR maybeInputBits=" << maybeInputBits << std::endl;


int64_t validityBitIndex = 0;
//for (int i = 0; i < loop_var; i++) {
// validityBitIndex += *(offsets + i);
// std::cout << "LR i=" << i << " adding offset " << *(offsets + i) << " offset is " << offsets << std::endl;
//}

//The validity index already has the current row length added to it, so decrement.
validityBitIndex = validity_index_var - entry_len;
//TODO temp until the buffer is worked out.
//validityBitIndex -= (loop_var);


//std::cout << "Using validityBitIndex=" << validityBitIndex << std::endl;



validityBitIndex = validity_index_var - entry_len;
entry_validWhat = true;
//std::bitset<10> outputValidBits;

std::vector<bool> outValid;
for (int i = 0; i < entry_len; i++) {
//std::cout << "LR going to check " << entry_buf + i << std::endl;
int32_t entry_item = *(entry_buf + (i * 1));
//std::cout << "LR checking value " << entry_len << " against target " << remove_data << std::endl;
if (entry_item == remove_data) {
//outValid.push_back(false);
//newInts.push_back(42);
//entry_validWhat = false;
//TODO temp until buffer is worked out } else if (!arrow::bit_util::GetBit(reinterpret_cast<const uint8_t*>(array_valid_bits), validityBitIndex + i)) {
//Do not add the item to remove.
} else if (!arrow::bit_util::GetBit(reinterpret_cast<const uint8_t*>(entry_validityAdjusted), validityBitIndex + i)) {
outValid.push_back(false);
newInts.push_back(0);
//outputValidBits[i] = 0;
} else {
outValid.push_back(true);
//Note the vector can have n elements, while validbits might have n+1.
newInts.push_back(entry_item);
//outputValidBits[i] = 1;
}
}

Expand All @@ -215,31 +154,13 @@ validityBitIndex = validity_index_var - entry_len;
//length is number of items, but buffers must account for byte size.
uint8_t* ret = gdv_fn_context_arena_malloc(context_ptr, outBufferLength);
memcpy(ret, newInts.data(), outBufferLength);
//std::cout << "LR made a buffer length" << *out_len * 4 << " item 3 is = " << int32_t(ret[3*4]) << std::endl;


*valid_row = true;


//unsigned long ll = outputValidBits.to_ulong();
if (!combined_row_validity) {
//ll = 0;
*out_len = 0;
*valid_row = false; //this one is what works for the top level validity.
entry_validWhat = false;
}
//LR no need, set along the way. memcpy(validRet, &ll, 1);
//*valid_len = 1;
//std::cout << "LR valid_buf is " << valid_buf << std::endl;
//std::cout << "LR outputValidBits is " << outputValidBits << std::endl;
//valid_buf = reinterpret_cast<bool*>(validRet);

*valid_ptr = reinterpret_cast<int32_t*>(validRet);
//std::cout << "LR setting valid_ptr=" << valid_ptr << " *valid_ptr=" << *valid_ptr << " **valid_ptr=" << **valid_ptr << " valid_ptr bitset data is " << std::bitset<8>(**valid_ptr)
// << " return value is " << reinterpret_cast<int32_t*>(ret) << std::endl;


//return reinterpret_cast<int32_t*>(ret);
return reinterpret_cast<int32_t*>(ret);
}

Expand Down
2 changes: 0 additions & 2 deletions cpp/src/gandiva/bitmap_accumulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,9 @@ class GANDIVA_EXPORT BitMapAccumulator : public DexDefaultVisitor {

void Visit(const VectorReadValidityDex& dex) {
int idx = dex.ValidityIdx();
//std::cout << "LR BitMapAccumulator visiting " << idx << std::endl;
auto bitmap = eval_batch_.GetBuffer(idx);
// The bitmap could be null. Ignore it in this case.
if (bitmap != NULLPTR) {
//std::cout << "LR BitMapAccumulator is not null " << bitmap << std::endl;
src_maps_.push_back(bitmap);
src_map_offsets_.push_back(eval_batch_.GetBufferOffset(idx));
}
Expand Down
10 changes: 0 additions & 10 deletions cpp/src/gandiva/expr_decomposer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,28 +38,20 @@ namespace gandiva {
Status ExprDecomposer::Visit(const FieldNode& node) {
auto desc = annotator_.CheckAndAddInputFieldDescriptor(node.field());

//std::cout << "LR ExprDecomposer" << std::endl;
DexPtr validity_dex = std::make_shared<VectorReadValidityDex>(desc);
DexPtr value_dex;
if (desc->HasChildOffsetsIdx()) {
//std::cout << "LR ExprDecomposer 1" << std::endl;
// handle list<binary> type
value_dex = std::make_shared<VectorReadVarLenValueListDex>(desc);
} else if (desc->HasOffsetsIdx()) {
//std::cout << "LR ExprDecomposer 2" << std::endl;
if (desc->field()->type()->id() == arrow::Type::LIST) {
// handle list<primitive> type
//std::cout << "LR ExprDecomposer 3" << std::endl;
auto p = std::make_shared<VectorReadFixedLenValueListDex>(desc);
value_dex = p;
//int v = p->DataIdx();
//std::cout << "LR primitive list type " v << " " <<
} else {
//std::cout << "LR ExprDecomposer 4" << std::endl;
value_dex = std::make_shared<VectorReadVarLenValueDex>(desc);
}
} else {
//std::cout << "LR ExprDecomposer 5" << std::endl;
value_dex = std::make_shared<VectorReadFixedLenValueDex>(desc);
}
result_ = std::make_shared<ValueValidityPair>(validity_dex, value_dex);
Expand Down Expand Up @@ -126,9 +118,7 @@ Status ExprDecomposer::Visit(const FunctionNode& in_node) {
} else {
DCHECK(native_function->result_nullable_type() == kResultNullInternal);

//LR TODO Need validity?
// Add a local bitmap to track the output validity.
std::cout << "LR Making a nullable function holder with validity." << std::endl;
int local_bitmap_idx = annotator_.AddLocalBitMap();
auto validity_dex = std::make_shared<LocalBitMapValidityDex>(local_bitmap_idx);

Expand Down
1 change: 0 additions & 1 deletion cpp/src/gandiva/field_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class FieldDescriptor {
data_buffer_ptr_idx_(data_buffer_ptr_idx),
child_offsets_idx_(child_offsets_idx),
child_validity_idx_(child_validity_idx) {
//std::cout << "LR FieldDescriptor=" << Name() << " " << data_idx_ << "," << data_buffer_ptr_idx_ << "," << child_validity_idx_ << std::endl;
}

/// Index of validity array in the array-of-buffers
Expand Down
Loading

0 comments on commit 316b822

Please sign in to comment.