From 9e6f9bd901995b45e51300a0596a8c26493690ef Mon Sep 17 00:00:00 2001 From: Shawn Wang Date: Thu, 2 Jan 2025 19:27:54 +0800 Subject: [PATCH] sparse: add block-max wand and block-max maxscore algorithm Signed-off-by: Shawn Wang --- include/knowhere/comp/index_param.h | 1 + include/knowhere/sparse_utils.h | 10 + src/index/sparse/sparse_index_node.cc | 22 + src/index/sparse/sparse_inverted_index.h | 465 +++++++++++++++++- .../sparse/sparse_inverted_index_config.h | 10 +- tests/ut/test_sparse.cc | 12 +- 6 files changed, 494 insertions(+), 26 deletions(-) diff --git a/include/knowhere/comp/index_param.h b/include/knowhere/comp/index_param.h index 364b66c11..6110a57bd 100644 --- a/include/knowhere/comp/index_param.h +++ b/include/knowhere/comp/index_param.h @@ -177,6 +177,7 @@ constexpr const char* PRQ_NUM = "nrq"; // for PRQ, number of redisual quant constexpr const char* INVERTED_INDEX_ALGO = "inverted_index_algo"; constexpr const char* DROP_RATIO_BUILD = "drop_ratio_build"; constexpr const char* DROP_RATIO_SEARCH = "drop_ratio_search"; +constexpr const char* BLOCKMAX_BLOCK_SIZE = "blockmax_block_size"; } // namespace indexparam using MetricType = std::string; diff --git a/include/knowhere/sparse_utils.h b/include/knowhere/sparse_utils.h index be069c559..8e88cb790 100644 --- a/include/knowhere/sparse_utils.h +++ b/include/knowhere/sparse_utils.h @@ -386,6 +386,16 @@ class GrowableVectorView { return reinterpret_cast(mmap_data_)[i]; } + T& + back() { + return reinterpret_cast(mmap_data_)[size() - 1]; + } + + const T& + back() const { + return reinterpret_cast(mmap_data_)[size() - 1]; + } + class iterator : public boost::iterator_facade { public: iterator() = default; diff --git a/src/index/sparse/sparse_index_node.cc b/src/index/sparse/sparse_index_node.cc index a3f136751..7e15287ff 100644 --- a/src/index/sparse/sparse_index_node.cc +++ b/src/index/sparse/sparse_index_node.cc @@ -364,6 +364,18 @@ class SparseInvertedIndexNode : public IndexNode { new sparse::InvertedIndex(); index->SetBM25Params(k1, b, avgdl, max_score_ratio); return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_WAND") { + auto index = new sparse::InvertedIndex(); + index->SetBM25Params(k1, b, avgdl, max_score_ratio); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_MAXSCORE") { + auto index = new sparse::InvertedIndex(); + index->SetBM25Params(k1, b, avgdl, max_score_ratio); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; } else if (cfg.inverted_index_algo.value() == "DAAT_MAXSCORE") { auto index = new sparse::InvertedIndex(); @@ -386,6 +398,16 @@ class SparseInvertedIndexNode : public IndexNode { auto index = new sparse::InvertedIndex(); return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_WAND") { + auto index = + new sparse::InvertedIndex(); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; + } else if (cfg.inverted_index_algo.value() == "DAAT_BLOCKMAX_MAXSCORE") { + auto index = new sparse::InvertedIndex(); + index->SetBlockmaxBlockSize(cfg.blockmax_block_size.value()); + return index; } else if (cfg.inverted_index_algo.value() == "TAAT_NAIVE") { auto index = new sparse::InvertedIndex(); return index; diff --git a/src/index/sparse/sparse_inverted_index.h b/src/index/sparse/sparse_inverted_index.h index 4544aeff0..6adc9a5ff 100644 --- a/src/index/sparse/sparse_inverted_index.h +++ b/src/index/sparse/sparse_inverted_index.h @@ -14,9 +14,11 @@ #include #include +#include #include #include +#include #include #include #include @@ -39,6 +41,8 @@ enum class InvertedIndexAlgo { TAAT_NAIVE, DAAT_WAND, DAAT_MAXSCORE, + DAAT_BLOCKMAX_WAND, + DAAT_BLOCKMAX_MAXSCORE, }; template @@ -105,12 +109,30 @@ class InvertedIndex : public BaseInvertedIndex { close(map_fd_); map_fd_ = -1; } + if (blockmax_map_ != nullptr) { + auto res = munmap(blockmax_map_, blockmax_map_byte_size_); + if (res != 0) { + LOG_KNOWHERE_ERROR_ << "Failed to munmap when deleting sparse InvertedIndex: " << strerror(errno); + } + blockmax_map_ = nullptr; + blockmax_map_byte_size_ = 0; + } + if (blockmax_map_fd_ != -1) { + // closing the file descriptor will also cause the file to be deleted. + close(blockmax_map_fd_); + blockmax_map_fd_ = -1; + } } } template using Vector = std::conditional_t, std::vector>; + void + SetBlockmaxBlockSize(size_t block_size) { + blockmax_block_size_ = block_size; + } + void SetBM25Params(float k1, float b, float avgdl, float max_score_ratio) { bm25_params_ = std::make_unique(k1, b, avgdl, max_score_ratio); @@ -139,13 +161,12 @@ class InvertedIndex : public BaseInvertedIndex { "avgdl must be supplied during searching"); } auto avgdl = cfg.bm25_avgdl.value(); - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { - // daat_wand and daat_maxscore: search time k1/b must equal load time config. + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { + // daat related algorithms: search time k1/b must equal load time config. if ((cfg.bm25_k1.has_value() && cfg.bm25_k1.value() != bm25_params_->k1) || ((cfg.bm25_b.has_value() && cfg.bm25_b.value() != bm25_params_->b))) { return expected>::Err( - Status::invalid_args, - "search time k1/b must equal load time config for DAAT_WAND or DAAT_MAXSCORE algorithm."); + Status::invalid_args, "search time k1/b must equal load time config for DAAT_* algorithm."); } return GetDocValueBM25Computer(bm25_params_->k1, bm25_params_->b, avgdl); } else { @@ -181,12 +202,6 @@ class InvertedIndex : public BaseInvertedIndex { writeBinaryPOD(writer, n_rows_internal_); writeBinaryPOD(writer, max_dim_); writeBinaryPOD(writer, deprecated_value_threshold); - BitsetView bitset(nullptr, 0); - - std::vector> cursors; - for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { - cursors.emplace_back(inverted_index_ids_[i], inverted_index_vals_[i], n_rows_internal_, 0, 0, bitset); - } auto dim_map_reverse = std::unordered_map(); for (const auto& [dim, dim_id] : dim_map_) { @@ -224,6 +239,7 @@ class InvertedIndex : public BaseInvertedIndex { return Status::success; } + Status Load(MemoryIOReader& reader, int map_flags, const std::string& supplement_target_filename) override { DType deprecated_value_threshold; @@ -237,7 +253,7 @@ class InvertedIndex : public BaseInvertedIndex { readBinaryPOD(reader, deprecated_value_threshold); if constexpr (mmapped) { - RETURN_IF_ERROR(PrepareMmap(reader, rows, map_flags, supplement_target_filename)); + RETURN_IF_ERROR(prepare_index_mmap(reader, rows, map_flags, supplement_target_filename)); } else { if constexpr (bm25) { bm25_params_->row_sums.reserve(rows); @@ -257,17 +273,52 @@ class InvertedIndex : public BaseInvertedIndex { reader.read(raw_row.data(), count * SparseRow::element_size()); } } - add_row_to_index(raw_row, i); + add_row_to_index(raw_row, i, false); } n_rows_internal_ = rows; + // prepare and generate blockmax information + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND) { + if constexpr (mmapped) { + RETURN_IF_ERROR(prepare_blockmax_mmap(map_flags, supplement_target_filename + ".blockmax")); + } else { + blockmax_last_block_sizes_.resize(inverted_index_ids_.size(), 0); + blockmax_ids_.resize(inverted_index_ids_.size()); + blockmax_scores_.resize(inverted_index_ids_.size()); + } + + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + auto& ids = inverted_index_ids_[i]; + auto& vals = inverted_index_vals_[i]; + for (size_t j = 0; j < ids.size(); ++j) { + auto score = static_cast(vals[j]); + if constexpr (bm25) { + score = bm25_params_->max_score_ratio * + bm25_params_->wand_max_score_computer(vals[j], bm25_params_->row_sums.at(ids[j])); + } + if (blockmax_last_block_sizes_[i] == 0) { + blockmax_ids_[i].emplace_back(ids[j]); + blockmax_scores_[i].emplace_back(score); + } else { + blockmax_ids_[i].back() = ids[j]; + blockmax_scores_[i].back() = std::max(blockmax_scores_[i].back(), score); + } + if (++blockmax_last_block_sizes_[i] >= blockmax_block_size_) { + blockmax_last_block_sizes_[i] = 0; + } + } + } + } + return Status::success; } // memory in reader must be guaranteed to be valid during the lifetime of this object. Status - PrepareMmap(MemoryIOReader& reader, size_t rows, int map_flags, const std::string& supplement_target_filename) { + prepare_index_mmap(MemoryIOReader& reader, size_t rows, int map_flags, + const std::string& supplement_target_filename) { const auto initial_reader_location = reader.tellg(); const auto nnz = (reader.remaining() - (rows * sizeof(size_t))) / SparseRow::element_size(); @@ -301,7 +352,7 @@ class InvertedIndex : public BaseInvertedIndex { map_byte_size_ = inverted_index_ids_byte_size + inverted_index_vals_byte_size + plists_ids_byte_size + plists_vals_byte_size; - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { map_byte_size_ += max_score_in_dim_byte_size; } if constexpr (bm25) { @@ -350,7 +401,7 @@ class InvertedIndex : public BaseInvertedIndex { inverted_index_vals_.initialize(ptr, inverted_index_vals_byte_size); ptr += inverted_index_vals_byte_size; - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { max_score_in_dim_.initialize(ptr, max_score_in_dim_byte_size); ptr += max_score_in_dim_byte_size; } @@ -375,17 +426,91 @@ class InvertedIndex : public BaseInvertedIndex { size_t dim_id = 0; for (const auto& [idx, count] : idx_counts) { dim_map_[idx] = dim_id; - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { max_score_in_dim_.emplace_back(0.0f); } ++dim_id; } + // in mmap mode, next_dim_id_ should never be used, but still assigning for consistency. next_dim_id_ = dim_id; return Status::success; } + Status + prepare_blockmax_mmap(int map_flags, const std::string& blockmax_mmap_filename) { + std::ofstream temp_file(blockmax_mmap_filename, std::ios::binary | std::ios::trunc); + if (!temp_file) { + LOG_KNOWHERE_ERROR_ << "Failed to create mmap file when loading sparse InvertedIndex: " << strerror(errno); + return Status::disk_file_error; + } + temp_file.close(); + + size_t blockmax_total_blocks = 0; + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + blockmax_total_blocks += (inverted_index_ids_[i].size() + blockmax_block_size_ - 1) / blockmax_block_size_; + } + auto blockmax_last_block_sizes_byte_size = + inverted_index_ids_.size() * sizeof(typename decltype(blockmax_last_block_sizes_)::value_type); + auto blockmax_ids_dim0_byte_size = + inverted_index_ids_.size() * sizeof(typename decltype(blockmax_ids_)::value_type); + auto blockmax_scores_dim0_byte_size = + inverted_index_ids_.size() * sizeof(typename decltype(blockmax_scores_)::value_type); + auto blockmax_total_blocks_byte_size = + blockmax_total_blocks * (sizeof(typename decltype(blockmax_ids_)::value_type::value_type) + + sizeof(typename decltype(blockmax_scores_)::value_type::value_type)); + blockmax_map_byte_size_ = blockmax_last_block_sizes_byte_size + blockmax_ids_dim0_byte_size + + blockmax_scores_dim0_byte_size + blockmax_total_blocks_byte_size; + std::filesystem::resize_file(blockmax_mmap_filename, blockmax_map_byte_size_); + + blockmax_map_fd_ = open(blockmax_mmap_filename.c_str(), O_RDWR); + if (blockmax_map_fd_ == -1) { + LOG_KNOWHERE_ERROR_ << "Failed to open mmap file when loading sparse InvertedIndex: " << strerror(errno); + return Status::disk_file_error; + } + std::filesystem::remove(blockmax_mmap_filename); + // clear MAP_PRIVATE flag: we need to write to this mmapped memory/file, + // MAP_PRIVATE triggers copy-on-write and uses extra anonymous memory. + map_flags &= ~MAP_PRIVATE; + map_flags |= MAP_SHARED; + blockmax_map_ = static_cast( + mmap(nullptr, blockmax_map_byte_size_, PROT_READ | PROT_WRITE, map_flags, blockmax_map_fd_, 0)); + if (blockmax_map_ == MAP_FAILED) { + LOG_KNOWHERE_ERROR_ << "Failed to create blockmax mmap when loading sparse InvertedIndex: " + << strerror(errno) << ", size: " << blockmax_map_byte_size_ + << " on file: " << blockmax_mmap_filename; + return Status::disk_file_error; + } + + char* ptr = blockmax_map_; + + blockmax_last_block_sizes_.initialize(ptr, blockmax_last_block_sizes_byte_size); + ptr += blockmax_last_block_sizes_byte_size; + blockmax_ids_.initialize(ptr, blockmax_ids_dim0_byte_size); + ptr += blockmax_ids_dim0_byte_size; + blockmax_scores_.initialize(ptr, blockmax_scores_dim0_byte_size); + ptr += blockmax_scores_dim0_byte_size; + + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + blockmax_last_block_sizes_[i] = 0; + } + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + auto bcount = (inverted_index_ids_[i].size() + blockmax_block_size_ - 1) / blockmax_block_size_; + auto& bids = blockmax_ids_.emplace_back(); + bids.initialize(ptr, bcount * sizeof(typename decltype(blockmax_ids_)::value_type::value_type)); + ptr += bcount * sizeof(typename decltype(blockmax_ids_)::value_type::value_type); + } + for (size_t i = 0; i < inverted_index_ids_.size(); ++i) { + auto bcount = (inverted_index_ids_[i].size() + blockmax_block_size_ - 1) / blockmax_block_size_; + auto& bscores = blockmax_scores_.emplace_back(); + bscores.initialize(ptr, bcount * sizeof(typename decltype(blockmax_scores_)::value_type::value_type)); + ptr += bcount * sizeof(typename decltype(blockmax_scores_)::value_type::value_type); + } + + return Status::success; + } + // Non zero drop ratio is only supported for static index, i.e. data should // include all rows that'll be added to the index. Status @@ -411,8 +536,9 @@ class InvertedIndex : public BaseInvertedIndex { bm25_params_->row_sums.reserve(current_rows + rows); } for (size_t i = 0; i < rows; ++i) { - add_row_to_index(data[i], current_rows + i); + add_row_to_index(data[i], current_rows + i, true); } + n_rows_internal_ += rows; return Status::success; @@ -446,11 +572,15 @@ class InvertedIndex : public BaseInvertedIndex { } MaxMinHeap heap(k * refine_factor); - // DAAT_WAND and DAAT_MAXSCORE are based on the implementation in PISA. + // DAAT related algorithms are based on the implementation in PISA. if constexpr (algo == InvertedIndexAlgo::DAAT_WAND) { search_daat_wand(q_vec, heap, bitset, computer); } else if constexpr (algo == InvertedIndexAlgo::DAAT_MAXSCORE) { search_daat_maxscore(q_vec, heap, bitset, computer); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + search_daat_blockmax_maxscore(q_vec, heap, bitset, computer); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND) { + search_daat_blockmax_wand(q_vec, heap, bitset, computer); } else { search_taat_naive(q_vec, heap, bitset, computer); } @@ -529,9 +659,22 @@ class InvertedIndex : public BaseInvertedIndex { res += sizeof(typename decltype(inverted_index_vals_)::value_type::value_type) * inverted_index_vals_[i].capacity(); } - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { res += sizeof(typename decltype(max_score_in_dim_)::value_type) * max_score_in_dim_.capacity(); } + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + res += sizeof(typename decltype(blockmax_ids_)::value_type) * blockmax_ids_.capacity(); + res += sizeof(typename decltype(blockmax_scores_)::value_type) * blockmax_scores_.capacity(); + for (size_t i = 0; i < blockmax_ids_.size(); ++i) { + res += + sizeof(typename decltype(blockmax_ids_)::value_type::value_type) * blockmax_ids_[i].capacity(); + } + for (size_t i = 0; i < blockmax_scores_.size(); ++i) { + res += sizeof(typename decltype(blockmax_scores_)::value_type::value_type) * + blockmax_scores_[i].capacity(); + } + } return res; } } @@ -643,6 +786,35 @@ class InvertedIndex : public BaseInvertedIndex { } }; // struct Cursor + template + struct BlockMaxCursor : public Cursor { + public: + BlockMaxCursor(const Vector& plist_ids, const Vector& plist_vals, + const Vector& pblk_max_ids, const Vector& pblk_max_scores, size_t num_vec, + float max_score, float q_value, DocIdFilter filter) + : Cursor(plist_ids, plist_vals, num_vec, max_score, q_value, filter), + pblk_max_ids_(pblk_max_ids), + pblk_max_scores_(pblk_max_scores) { + } + void + seek_block(table_t vec_id) { + while (bloc_ < pblk_max_ids_.size() && pblk_max_ids_[bloc_] < vec_id) { + ++bloc_; + } + cur_block_end_vec_id_ = (bloc_ >= pblk_max_ids_.size()) ? this->total_num_vec_ : pblk_max_ids_[bloc_]; + } + + [[nodiscard]] float + cur_block_max_score() const { + return this->q_value_ * pblk_max_scores_[bloc_]; + } + + const Vector& pblk_max_ids_; + const Vector& pblk_max_scores_; + table_t cur_block_end_vec_id_ = 0; + size_t bloc_ = 0; + }; // struct BlockMaxCursor + std::vector> parse_query(const SparseRow& query, DType q_threshold) const { std::vector> filtered_query; @@ -672,6 +844,23 @@ class InvertedIndex : public BaseInvertedIndex { return cursors; } + template + std::vector> + make_blockmax_cursors(const std::vector>& q_vec, const DocValueComputer& computer, + DocIdFilter& filter) const { + std::vector> cursors; + cursors.reserve(q_vec.size()); + for (auto q : q_vec) { + auto& plist_ids = inverted_index_ids_[q.first]; + auto& plist_vals = inverted_index_vals_[q.first]; + auto& pblk_max_ids = blockmax_ids_[q.first]; + auto& pblk_max_scores = blockmax_scores_[q.first]; + cursors.emplace_back(plist_ids, plist_vals, pblk_max_ids, pblk_max_scores, n_rows_internal_, + max_score_in_dim_[q.first] * q.second, q.second, filter); + } + return cursors; + } + // find the top-k candidates using brute force search, k as specified by the capacity of the heap. // any value in q_vec that is smaller than q_threshold and any value with dimension >= n_cols() will be ignored. // TODO: may switch to row-wise brute force if filter rate is high. Benchmark needed. @@ -840,6 +1029,192 @@ class InvertedIndex : public BaseInvertedIndex { } } + template + void + search_daat_blockmax_wand(std::vector>& q_vec, MaxMinHeap& heap, + DocIdFilter& filter, const DocValueComputer& computer) const { + std::vector> cursors = make_blockmax_cursors(q_vec, computer, filter); + std::vector*> cursor_ptrs(cursors.size()); + for (size_t i = 0; i < cursors.size(); ++i) { + cursor_ptrs[i] = &cursors[i]; + } + + auto sort_cursors = [&cursor_ptrs] { + std::sort(cursor_ptrs.begin(), cursor_ptrs.end(), + [](auto& x, auto& y) { return x->cur_vec_id_ < y->cur_vec_id_; }); + }; + sort_cursors(); + + while (true) { + float threshold = heap.full() ? heap.top().val : 0; + float global_upper_bound = 0; + table_t pivot_id; + size_t pivot; + bool found_pivot = false; + for (pivot = 0; pivot < cursor_ptrs.size(); ++pivot) { + if (cursor_ptrs[pivot]->cur_vec_id_ >= n_rows_internal_) { + break; + } + global_upper_bound += cursor_ptrs[pivot]->max_score_; + if (global_upper_bound > threshold) { + found_pivot = true; + pivot_id = cursor_ptrs[pivot]->cur_vec_id_; + for (; pivot + 1 < cursor_ptrs.size(); ++pivot) { + if (cursor_ptrs[pivot + 1]->cur_vec_id_ != pivot_id) { + break; + } + } + break; + } + } + if (!found_pivot) { + break; + } + float block_upper_bound = 0.0f; + for (size_t i = 0; i <= pivot; ++i) { + if (cursor_ptrs[i]->cur_block_end_vec_id_ < pivot_id) { + cursor_ptrs[i]->seek_block(pivot_id); + } + block_upper_bound += cursor_ptrs[i]->cur_block_max_score(); + } + if (block_upper_bound > threshold) { + if (pivot_id == cursor_ptrs[0]->cur_vec_id_) { + float score = 0.0f; + for (auto& cursor_ptr : cursor_ptrs) { + if (cursor_ptr->cur_vec_id_ != pivot_id) { + break; + } + float cur_vec_sum = bm25 ? bm25_params_->row_sums.at(cursor_ptr->cur_vec_id_) : 0; + score += cursor_ptr->q_value_ * computer(cursor_ptr->cur_vec_val(), cur_vec_sum); + cursor_ptr->next(); + } + heap.push(pivot_id, score); + threshold = heap.full() ? heap.top().val : 0; + sort_cursors(); + } else { + size_t next_list = pivot; + for (; cursor_ptrs[next_list]->cur_vec_id_ == pivot_id; --next_list) { + } + + cursor_ptrs[next_list]->seek(pivot_id); + for (size_t i = next_list + 1; i < cursor_ptrs.size(); ++i) { + if (cursor_ptrs[i]->cur_vec_id_ >= cursor_ptrs[i - 1]->cur_vec_id_) { + break; + } + std::swap(cursor_ptrs[i], cursor_ptrs[i - 1]); + } + } + } else { + table_t cand_id = n_rows_internal_ - 1; + for (size_t i = 0; i <= pivot; ++i) { + if (cursor_ptrs[i]->cur_block_end_vec_id_ < cand_id) { + cand_id = cursor_ptrs[i]->cur_block_end_vec_id_; + } + } + ++cand_id; + // cursor_ptrs[pivot + 1] must have a vec_id greater than pivot_id, + // and if this condition is met, it means pivot can start from pivot + 1 + if (pivot + 1 < cursor_ptrs.size() && cursor_ptrs[pivot + 1]->cur_vec_id_ < cand_id) { + cand_id = cursor_ptrs[pivot + 1]->cur_vec_id_; + } + + size_t next_list = pivot; + while (next_list + 1 > 0) { + cursor_ptrs[next_list]->seek(cand_id); + if (cursor_ptrs[next_list]->cur_vec_id_ != cand_id) { + for (size_t i = next_list + 1; i < cursor_ptrs.size(); ++i) { + if (cursor_ptrs[i]->cur_vec_id_ >= cursor_ptrs[i - 1]->cur_vec_id_) { + break; + } + std::swap(cursor_ptrs[i], cursor_ptrs[i - 1]); + } + break; + } + --next_list; + } + } + } + } + + template + void + search_daat_blockmax_maxscore(std::vector>& q_vec, MaxMinHeap& heap, + DocIdFilter& filter, const DocValueComputer& computer) const { + std::sort(q_vec.begin(), q_vec.end(), [this](auto& a, auto& b) { + return a.second * max_score_in_dim_[a.first] > b.second * max_score_in_dim_[b.first]; + }); + std::vector> cursors = make_blockmax_cursors(q_vec, computer, filter); + + std::vector upper_bounds(cursors.size() + 1, 0.0f); + float bound_sum = 0.0f; + for (size_t i = cursors.size() - 1; i + 1 > 0; --i) { + bound_sum += cursors[i].max_score_; + upper_bounds[i] = bound_sum; + } + + float threshold = heap.full() ? heap.top().val : 0; + + table_t ne_start_cursor_id = cursors.size(); + uint64_t curr_cand_vec_id = (*std::min_element(cursors.begin(), cursors.end(), [](auto&& lhs, auto&& rhs) { + return lhs.cur_vec_id_ < rhs.cur_vec_id_; + })).cur_vec_id_; + + while (ne_start_cursor_id > 0 && curr_cand_vec_id < n_rows_internal_) { + float score = 0; + table_t next_cand_vec_id = n_rows_internal_; + float cur_vec_sum = bm25 ? bm25_params_->row_sums.at(curr_cand_vec_id) : 0; + // score essential list and find next + for (size_t i = 0; i < ne_start_cursor_id; ++i) { + if (cursors[i].cur_vec_id_ == curr_cand_vec_id) { + score += cursors[i].q_value_ * computer(cursors[i].cur_vec_val(), cur_vec_sum); + cursors[i].next(); + } + if (cursors[i].cur_vec_id_ < next_cand_vec_id) { + next_cand_vec_id = cursors[i].cur_vec_id_; + } + } + + auto new_score = score + upper_bounds[ne_start_cursor_id]; + if (new_score > threshold) { + // update block index for non-essential list and check block upper bound + for (size_t i = ne_start_cursor_id; i < cursors.size(); ++i) { + if (cursors[i].cur_block_end_vec_id_ < curr_cand_vec_id) { + cursors[i].seek_block(curr_cand_vec_id); + } + new_score -= cursors[i].max_score_ - cursors[i].cur_block_max_score(); + if (new_score <= threshold) { + break; + } + } + if (new_score > threshold) { + // try to complete evaluation with non-essential lists + for (size_t i = ne_start_cursor_id; i < cursors.size(); ++i) { + cursors[i].seek(curr_cand_vec_id); + if (cursors[i].cur_vec_id_ == curr_cand_vec_id) { + new_score += cursors[i].q_value_ * computer(cursors[i].cur_vec_val(), cur_vec_sum); + } + new_score -= cursors[i].cur_block_max_score(); + + if (new_score <= threshold) { + break; + } + } + score = new_score; + } + if (score > threshold) { + heap.push(curr_cand_vec_id, score); + threshold = heap.full() ? heap.top().val : 0; + // update non-essential lists + while (ne_start_cursor_id != 0 && upper_bounds[ne_start_cursor_id - 1] <= threshold) { + --ne_start_cursor_id; + } + } + } + + curr_cand_vec_id = next_cand_vec_id; + } + } + void refine_and_collect(const SparseRow& query, MaxMinHeap& inacc_heap, size_t k, float* distances, label_t* labels, const DocValueComputer& computer) const { @@ -862,9 +1237,14 @@ class InvertedIndex : public BaseInvertedIndex { search_daat_wand(q_vec, heap, filter, computer); } else if constexpr (algo == InvertedIndexAlgo::DAAT_MAXSCORE) { search_daat_maxscore(q_vec, heap, filter, computer); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND) { + search_daat_blockmax_wand(q_vec, heap, filter, computer); + } else if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + search_daat_blockmax_maxscore(q_vec, heap, filter, computer); } else { search_taat_naive(q_vec, heap, filter, computer); } + collect_result(heap, distances, labels); } @@ -880,7 +1260,7 @@ class InvertedIndex : public BaseInvertedIndex { } inline void - add_row_to_index(const SparseRow& row, table_t vec_id) { + add_row_to_index(const SparseRow& row, table_t vec_id, bool is_append) { [[maybe_unused]] float row_sum = 0; for (size_t j = 0; j < row.size(); ++j) { auto [dim, val] = row[j]; @@ -900,18 +1280,49 @@ class InvertedIndex : public BaseInvertedIndex { dim_it = dim_map_.insert({dim, next_dim_id_++}).first; inverted_index_ids_.emplace_back(); inverted_index_vals_.emplace_back(); - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { max_score_in_dim_.emplace_back(0.0f); + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + if (is_append) { + blockmax_ids_.emplace_back(); + blockmax_scores_.emplace_back(); + blockmax_last_block_sizes_.emplace_back(0); + } + } } } inverted_index_ids_[dim_it->second].emplace_back(vec_id); inverted_index_vals_[dim_it->second].emplace_back(get_quant_val(val)); - if constexpr (algo == InvertedIndexAlgo::DAAT_WAND || algo == InvertedIndexAlgo::DAAT_MAXSCORE) { + if constexpr (algo != InvertedIndexAlgo::TAAT_NAIVE) { auto score = static_cast(val); if constexpr (bm25) { score = bm25_params_->max_score_ratio * bm25_params_->wand_max_score_computer(val, row_sum); } max_score_in_dim_[dim_it->second] = std::max(max_score_in_dim_[dim_it->second], score); + if constexpr (algo == InvertedIndexAlgo::DAAT_BLOCKMAX_WAND || + algo == InvertedIndexAlgo::DAAT_BLOCKMAX_MAXSCORE) { + if (is_append) { + size_t cur_block_size = blockmax_last_block_sizes_[dim_it->second]; + if (cur_block_size == 0) { + // create a new block and add the first element + blockmax_ids_[dim_it->second].emplace_back(vec_id); + blockmax_scores_[dim_it->second].emplace_back(score); + } else { + // change the element of the last block + blockmax_ids_[dim_it->second].back() = vec_id; + if (score > blockmax_scores_[dim_it->second].back()) { + blockmax_scores_[dim_it->second].back() = score; + } + } + // update the last block size + ++cur_block_size; + if (cur_block_size >= blockmax_block_size_) { + cur_block_size = 0; + } + blockmax_last_block_sizes_[dim_it->second] = cur_block_size; + } + } } } if constexpr (bm25) { @@ -942,6 +1353,10 @@ class InvertedIndex : public BaseInvertedIndex { Vector> inverted_index_ids_; Vector> inverted_index_vals_; Vector max_score_in_dim_; + Vector> blockmax_ids_; + Vector> blockmax_scores_; + Vector blockmax_last_block_sizes_; + size_t blockmax_block_size_ = 0; size_t n_rows_internal_ = 0; size_t max_dim_ = 0; @@ -951,6 +1366,10 @@ class InvertedIndex : public BaseInvertedIndex { size_t map_byte_size_ = 0; int map_fd_ = -1; + char* blockmax_map_ = nullptr; + size_t blockmax_map_byte_size_ = 0; + int blockmax_map_fd_ = -1; + struct BM25Params { float k1; float b; @@ -958,7 +1377,7 @@ class InvertedIndex : public BaseInvertedIndex { // corresponds to the document length of each doc in the BM25 formula. Vector row_sums; - // below are used only for DAAT_WAND and DAAT_MAXSCORE algorithms. + // below are used only for DAAT_* algorithms. float max_score_ratio; DocValueComputer wand_max_score_computer; diff --git a/src/index/sparse/sparse_inverted_index_config.h b/src/index/sparse/sparse_inverted_index_config.h index 9b14e5aa6..acc972e82 100644 --- a/src/index/sparse/sparse_inverted_index_config.h +++ b/src/index/sparse/sparse_inverted_index_config.h @@ -24,6 +24,7 @@ class SparseInvertedIndexConfig : public BaseConfig { CFG_INT refine_factor; CFG_FLOAT wand_bm25_max_score_ratio; CFG_STRING inverted_index_algo; + CFG_INT blockmax_block_size; KNOHWERE_DECLARE_CONFIG(SparseInvertedIndexConfig) { // NOTE: drop_ratio_build has been deprecated, it won't change anything KNOWHERE_CONFIG_DECLARE_FIELD(drop_ratio_build) @@ -68,7 +69,7 @@ class SparseInvertedIndexConfig : public BaseConfig { * the recall rate within a certain range. */ KNOWHERE_CONFIG_DECLARE_FIELD(wand_bm25_max_score_ratio) - .set_range(0.5, 1.3) + .set_range(0.0, 1.3) .set_default(1.05) .description("ratio to upscale/downscale the max score of each dimension") .for_train_and_search() @@ -80,6 +81,13 @@ class SparseInvertedIndexConfig : public BaseConfig { .for_train_and_search() .for_deserialize() .for_deserialize_from_file(); + KNOWHERE_CONFIG_DECLARE_FIELD(blockmax_block_size) + .description("block size for blockmax-based algorithms") + .set_default(64) + .set_range(1, 65535, true, true) + .for_train_and_search() + .for_deserialize() + .for_deserialize_from_file(); } }; // class SparseInvertedIndexConfig diff --git a/tests/ut/test_sparse.cc b/tests/ut/test_sparse.cc index 879dfcb49..c1d7a9ab0 100644 --- a/tests/ut/test_sparse.cc +++ b/tests/ut/test_sparse.cc @@ -47,7 +47,8 @@ TEST_CASE("Test Mem Sparse Index With Float Vector", "[float metrics]") { auto metric = GENERATE(knowhere::metric::IP, knowhere::metric::BM25); - auto inverted_index_algo = GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE"); + std::string inverted_index_algo = + GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE", "DAAT_BLOCKMAX_WAND", "DAAT_BLOCKMAX_MAXSCORE"); auto drop_ratio_search = metric == knowhere::metric::BM25 ? GENERATE(0.0, 0.1) : GENERATE(0.0, 0.3); @@ -69,6 +70,9 @@ TEST_CASE("Test Mem Sparse Index With Float Vector", "[float metrics]") { knowhere::Json json = base_gen(); json[knowhere::indexparam::DROP_RATIO_SEARCH] = drop_ratio_search; json[knowhere::indexparam::INVERTED_INDEX_ALGO] = inverted_index_algo; + if (inverted_index_algo == "DAAT_BLOCKMAX_WAND" || inverted_index_algo == "DAAT_BLOCKMAX_MAXSCORE") { + json[knowhere::indexparam::BLOCKMAX_BLOCK_SIZE] = GENERATE(1, 2, 64, 128); + } return json; }; @@ -464,7 +468,8 @@ TEST_CASE("Test Mem Sparse Index CC", "[float metrics]") { auto query_ds = doc_vector_gen(nq, dim); - auto inverted_index_algo = GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE"); + std::string inverted_index_algo = + GENERATE("TAAT_NAIVE", "DAAT_WAND", "DAAT_MAXSCORE", "DAAT_BLOCKMAX_WAND", "DAAT_BLOCKMAX_MAXSCORE"); auto drop_ratio_search = GENERATE(0.0, 0.3); @@ -487,6 +492,9 @@ TEST_CASE("Test Mem Sparse Index CC", "[float metrics]") { knowhere::Json json = base_gen(); json[knowhere::indexparam::DROP_RATIO_SEARCH] = drop_ratio_search; json[knowhere::indexparam::INVERTED_INDEX_ALGO] = inverted_index_algo; + if (inverted_index_algo == "DAAT_BLOCKMAX_WAND" || inverted_index_algo == "DAAT_BLOCKMAX_MAXSCORE") { + json[knowhere::indexparam::BLOCKMAX_BLOCK_SIZE] = GENERATE(1, 2, 64, 128); + } return json; };