From 0964db6bf768524275090baf7d457686641fcea3 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Tue, 24 Sep 2024 23:57:18 -0400 Subject: [PATCH 1/5] added expiration --- src/search/indexer.cc | 172 +++++++++++++++++++++++++++++++++++++++++- src/search/indexer.h | 12 +++ 2 files changed, 183 insertions(+), 1 deletion(-) diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 0f771026216..f102f83e0e9 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -354,6 +354,163 @@ Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, s return Status::OK(); } +Status IndexUpdater::DeleteTagKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, + const SearchKey &search_key, const TagFieldMetadata *tag) const { + CHECK(original.IsNull() || original.Is()); + + if (original.IsNull()) { + return Status::OK(); + } + + auto original_tags = original.IsNull() ? std::vector() : original.Get(); + + auto to_tag_set = [&](const std::vector &tags) -> std::set { + if (tag->case_sensitive) { + return {tags.begin(), tags.end()}; + } else { + std::set res; + std::transform(tags.begin(), tags.end(), std::inserter(res, res.begin()), util::ToLower); + return res; + } + }; + + std::set tags_to_delete = to_tag_set(original_tags); + + if (tags_to_delete.empty()) { + return Status::OK(); + } + + auto *storage = indexer->storage; + auto batch = storage->GetWriteBatchBase(); + auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search); + + for (const auto &tag_str : tags_to_delete) { + auto index_key = search_key.ConstructTagFieldData(tag_str, key); + + auto delete_status = batch->Delete(cf_handle, index_key); + if (!delete_status.ok()) { + return {Status::NotOK, delete_status.ToString()}; + } + } + + auto write_status = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!write_status.ok()) { + return {Status::NotOK, write_status.ToString()}; + } + + return Status::OK(); +} + +Status IndexUpdater::DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, + const SearchKey &search_key) const { + CHECK(original.IsNull() || original.Is()); + + auto *storage = indexer->storage; + auto batch = storage->GetWriteBatchBase(); + auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search); + + auto index_key = search_key.ConstructNumericFieldData(original.Get(), key); + + auto s = batch->Delete(cf_handle, index_key); + if (!s.ok()) { + return {Status::NotOK, s.ToString()}; + } + + s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) { + return {Status::NotOK, s.ToString()}; + } + + return Status::OK(); +} + +Status IndexUpdater::DeleteHnswVectorKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, + const SearchKey &search_key, HnswVectorFieldMetadata *vector) const { + CHECK(original.IsNull() || original.Is()); + + if (original.IsNull()) { + return Status::OK(); + } + + auto *storage = indexer->storage; + auto batch = storage->GetWriteBatchBase(); + auto hnsw = HnswIndex(search_key, vector, storage); + + auto delete_status = hnsw.DeleteVectorEntry(ctx, key, batch); + if (!delete_status.IsOK()) { + return {Status::NotOK}; + } + + auto write_status = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!write_status.ok()) { + return {Status::NotOK, write_status.ToString()}; + } + + return Status::OK(); +} + +Status IndexUpdater::Delete(engine::Context &ctx, std::string_view key) const { + const auto &ns = info->ns; + Database db(indexer->storage, ns); + + RedisType type = kRedisNone; + auto s = db.Type(ctx, key, &type); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + + if (type == kRedisNone) return Status::OK(); + + if (type != static_cast(info->metadata.on_data_type)) { + return {Status::TypeMismatched}; + } + + auto original = GET_OR_RET(Record(ctx, key)); + + for (const auto &[field, i] : info->fields) { + if (i.metadata->noindex) { + continue; + } + + const kqir::Value &original_val = original.find(field) != original.end() ? original.at(field) : kqir::Value(); + SearchKey search_key(info->ns, info->name, field); + + // Invoke the appropriate delete function based on the field type + if (auto tag = dynamic_cast(i.metadata.get())) { + GET_OR_RET(DeleteTagKey(ctx, key, original_val, search_key, tag)); + } else if (auto numeric = dynamic_cast(i.metadata.get())) { + GET_OR_RET(DeleteNumericKey(ctx, key, original_val, search_key)); + } else if (auto vector = dynamic_cast(i.metadata.get())) { + GET_OR_RET(DeleteHnswVectorKey(ctx, key, original_val, search_key, vector)); + } else { + return {Status::NotOK, "Unexpected field type during deletion"}; + } + } + + return Status::OK(); +} + +Status IndexUpdater::IsKeyExpired(engine::Context &ctx, std::string_view key, const std::string &ns, bool *expired) { + Database db(ctx.storage, ns); + + std::string ns_key = db.AppendNamespacePrefix(key); + std::string metadata_value; + auto s = db.GetRawMetadata(ctx, ns_key, &metadata_value); + if (!s.ok()) { + if (s.IsNotFound()) { + *expired = true; + } + return {Status::NotOK}; + } + + Metadata metadata(kRedisNone); + rocksdb::Slice meta_slice(metadata_value); + s = metadata.Decode(&meta_slice); + if (!s.ok()) { + return {Status::NotOK}; + } + *expired = metadata.Expired(); + return Status::OK(); +} + Status IndexUpdater::Build(engine::Context &ctx) const { auto storage = indexer->storage; util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), ColumnFamilyID::Metadata); @@ -367,7 +524,20 @@ Status IndexUpdater::Build(engine::Context &ctx) const { auto [_, key] = ExtractNamespaceKey(iter->key(), storage->IsSlotIdEncoded()); - auto s = Update(ctx, {}, key.ToStringView()); + bool is_expired = false; + Status s = IsKeyExpired(ctx, key.ToStringView(), info->ns, &is_expired); + if (!s.IsOK()) { + return s; + } + if (is_expired) { + s = Delete(ctx, key.ToStringView()); + if (!s.IsOK() && !s.Is()) { + return s; + } + continue; + } + + s = Update(ctx, {}, key.ToStringView()); if (s.Is()) continue; if (!s.OK()) return s; } diff --git a/src/search/indexer.h b/src/search/indexer.h index 20819a2f279..954a5d7eba7 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -83,6 +83,16 @@ struct IndexUpdater { const kqir::Value ¤t) const; Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const; + Status Delete(engine::Context &ctx, std::string_view key) const; + Status DeleteTagKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, + const SearchKey &search_key, const TagFieldMetadata *tag) const; + + Status DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, + const SearchKey &search_key) const; + + Status DeleteHnswVectorKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, + const SearchKey &search_key, HnswVectorFieldMetadata *vector) const; + Status Build(engine::Context &ctx) const; Status UpdateTagIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original, @@ -93,6 +103,7 @@ struct IndexUpdater { Status UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original, const kqir::Value ¤t, const SearchKey &search_key, HnswVectorFieldMetadata *vector) const; + static Status IsKeyExpired(engine::Context &ctx, std::string_view key, const std::string &ns, bool *expired); }; struct GlobalIndexer { @@ -112,6 +123,7 @@ struct GlobalIndexer { void Add(IndexUpdater updater); void Remove(const kqir::IndexInfo *index); + void RemoveKeyFromIndex(engine::Storage *storage, std::string_view key, const std::string &ns); StatusOr Record(engine::Context &ctx, std::string_view key, const std::string &ns); static Status Update(engine::Context &ctx, const RecordResult &original); From 74447f85b128a5289587fd89f80283b092a21bcb Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 27 Sep 2024 04:49:32 -0400 Subject: [PATCH 2/5] pre pointer --- src/commands/cmd_search.cc | 6 +- src/search/index_manager.h | 15 ++- src/search/indexer.cc | 218 +++++++++++++++++++------------------ src/search/indexer.h | 43 ++++---- src/types/redis_json.cc | 2 +- 5 files changed, 152 insertions(+), 132 deletions(-) diff --git a/src/commands/cmd_search.cc b/src/commands/cmd_search.cc index 7dbaa4af02b..a2c7388a968 100644 --- a/src/commands/cmd_search.cc +++ b/src/commands/cmd_search.cc @@ -307,7 +307,8 @@ class CommandFTSearchSQL : public Commander { return Status::OK(); } Status Execute(Server *srv, Connection *conn, std::string *output) override { - auto results = GET_OR_RET(srv->index_mgr.Search(std::move(ir_), conn->GetNamespace())); + engine::Context ctx(srv->storage); + auto results = GET_OR_RET(srv->index_mgr.Search(ctx, std::move(ir_), conn->GetNamespace())); DumpQueryResult(results, output); @@ -405,7 +406,8 @@ class CommandFTSearch : public Commander { Status Execute(Server *srv, Connection *conn, std::string *output) override { CHECK(ir_); - auto results = GET_OR_RET(srv->index_mgr.Search(std::move(ir_), conn->GetNamespace())); + engine::Context ctx(srv->storage); + auto results = GET_OR_RET(srv->index_mgr.Search(ctx, std::move(ir_), conn->GetNamespace())); DumpQueryResult(results, output); diff --git a/src/search/index_manager.h b/src/search/index_manager.h index 6ab88397ef8..ec4f0554054 100644 --- a/src/search/index_manager.h +++ b/src/search/index_manager.h @@ -152,6 +152,7 @@ struct IndexManager { auto batch = storage->GetWriteBatchBase(); std::string meta_val; + LOG(INFO) << meta_val; info->metadata.Encode(&meta_val); auto s = batch->Put(cf, index_key.ConstructIndexMeta(), meta_val); if (!s.ok()) { @@ -169,6 +170,7 @@ struct IndexManager { SearchKey field_key(info->ns, info->name, field_info.name); std::string field_val; + LOG(INFO) << field_val; field_info.metadata->Encode(&field_val); s = batch->Put(cf, field_key.ConstructFieldMeta(), field_val); @@ -185,7 +187,7 @@ struct IndexManager { indexer->Add(updater); index_map.Insert(std::move(info)); - for (auto updater : indexer->updater_list) { + for (auto &updater : indexer->updater_list) { GET_OR_RET(updater.Build(ctx)); } @@ -208,10 +210,15 @@ struct IndexManager { return plan_op; } - StatusOr> Search(std::unique_ptr ir, + + StatusOr> Search(engine::Context &ctx, std::unique_ptr ir, const std::string &ns) const { - auto plan_op = GET_OR_RET(GeneratePlan(std::move(ir), ns)); + for (auto updater : indexer->updater_list) { + GET_OR_RET(updater.ScanKeys(ctx)); + } + LOG(INFO) << "searched"; + auto plan_op = GET_OR_RET(GeneratePlan(std::move(ir), ns)); kqir::ExecutorContext executor_ctx(plan_op.get(), storage); std::vector results; @@ -219,10 +226,8 @@ struct IndexManager { auto iter_res = GET_OR_RET(executor_ctx.Next()); while (!std::holds_alternative(iter_res)) { results.push_back(std::get(iter_res)); - iter_res = GET_OR_RET(executor_ctx.Next()); } - return results; } diff --git a/src/search/indexer.cc b/src/search/indexer.cc index f102f83e0e9..b415f477598 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -28,6 +28,7 @@ #include "search/hnsw_indexer.h" #include "search/search_encoding.h" #include "search/value.h" +#include "status.h" #include "storage/redis_metadata.h" #include "storage/storage.h" #include "string_util.h" @@ -42,6 +43,7 @@ StatusOr FieldValueRetriever::Create(IndexOnDataType type, Hash db(storage, ns); std::string ns_key = db.AppendNamespacePrefix(key); HashMetadata metadata(false); + LOG(INFO) << "creating hash metadata" << ns_key; auto s = db.GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return {Status::NotOK, s.ToString()}; @@ -49,10 +51,12 @@ StatusOr FieldValueRetriever::Create(IndexOnDataType type, } else if (type == IndexOnDataType::JSON) { Json db(storage, ns); std::string ns_key = db.AppendNamespacePrefix(key); + LOG(INFO) << "creating json metadata" << ns_key; JsonMetadata metadata(false); JsonValue value; auto s = db.read(ctx, ns_key, &metadata, &value); if (!s.ok()) return {Status::NotOK, s.ToString()}; + LOG(INFO) << s.ToString(); return FieldValueRetriever(value); } else { assert(false && "unreachable code: unexpected IndexOnDataType"); @@ -358,14 +362,10 @@ Status IndexUpdater::DeleteTagKey(engine::Context &ctx, std::string_view key, co const SearchKey &search_key, const TagFieldMetadata *tag) const { CHECK(original.IsNull() || original.Is()); - if (original.IsNull()) { - return Status::OK(); - } - auto original_tags = original.IsNull() ? std::vector() : original.Get(); - auto to_tag_set = [&](const std::vector &tags) -> std::set { - if (tag->case_sensitive) { + auto to_tag_set = [](const std::vector &tags, bool case_sensitive) -> std::set { + if (case_sensitive) { return {tags.begin(), tags.end()}; } else { std::set res; @@ -374,7 +374,7 @@ Status IndexUpdater::DeleteTagKey(engine::Context &ctx, std::string_view key, co } }; - std::set tags_to_delete = to_tag_set(original_tags); + std::set tags_to_delete = to_tag_set(original_tags, tag->case_sensitive); if (tags_to_delete.empty()) { return Status::OK(); @@ -384,85 +384,49 @@ Status IndexUpdater::DeleteTagKey(engine::Context &ctx, std::string_view key, co auto batch = storage->GetWriteBatchBase(); auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search); - for (const auto &tag_str : tags_to_delete) { - auto index_key = search_key.ConstructTagFieldData(tag_str, key); + for (const auto &tag : tags_to_delete) { + auto index_key = search_key.ConstructTagFieldData(tag, key); - auto delete_status = batch->Delete(cf_handle, index_key); - if (!delete_status.ok()) { - return {Status::NotOK, delete_status.ToString()}; + auto s = batch->Delete(cf_handle, index_key); + if (!s.ok()) { + return {Status::NotOK, s.ToString()}; } } - auto write_status = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); - if (!write_status.ok()) { - return {Status::NotOK, write_status.ToString()}; - } - + auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + LOG(INFO) << "tag deleted successfully"; return Status::OK(); } Status IndexUpdater::DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, - const SearchKey &search_key) const { + const SearchKey &search_key, + [[maybe_unused]] const NumericFieldMetadata *num) const { CHECK(original.IsNull() || original.Is()); - - auto *storage = indexer->storage; - auto batch = storage->GetWriteBatchBase(); - auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search); - - auto index_key = search_key.ConstructNumericFieldData(original.Get(), key); - - auto s = batch->Delete(cf_handle, index_key); - if (!s.ok()) { - return {Status::NotOK, s.ToString()}; - } - - s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); - if (!s.ok()) { - return {Status::NotOK, s.ToString()}; - } - - return Status::OK(); -} - -Status IndexUpdater::DeleteHnswVectorKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, - const SearchKey &search_key, HnswVectorFieldMetadata *vector) const { - CHECK(original.IsNull() || original.Is()); - - if (original.IsNull()) { + if (!original.IsNull()) { + LOG(INFO) << "null sadge"; return Status::OK(); } + LOG(INFO) << "DELETING NUMERIC KEY VALUE for key: " << key; auto *storage = indexer->storage; auto batch = storage->GetWriteBatchBase(); - auto hnsw = HnswIndex(search_key, vector, storage); - - auto delete_status = hnsw.DeleteVectorEntry(ctx, key, batch); - if (!delete_status.IsOK()) { - return {Status::NotOK}; - } + auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search); - auto write_status = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); - if (!write_status.ok()) { - return {Status::NotOK, write_status.ToString()}; + if (!original.IsNull()) { + auto index_key = search_key.ConstructNumericFieldData(original.Get(), key); + auto s = batch->Delete(cf_handle, index_key); + if (!s.ok()) { + return {Status::NotOK, s.ToString()}; + } } - + auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) return {Status::NotOK, s.ToString()}; + LOG(INFO) << "deleted successfully"; return Status::OK(); } Status IndexUpdater::Delete(engine::Context &ctx, std::string_view key) const { - const auto &ns = info->ns; - Database db(indexer->storage, ns); - - RedisType type = kRedisNone; - auto s = db.Type(ctx, key, &type); - if (!s.ok()) return {Status::NotOK, s.ToString()}; - - if (type == kRedisNone) return Status::OK(); - - if (type != static_cast(info->metadata.on_data_type)) { - return {Status::TypeMismatched}; - } - auto original = GET_OR_RET(Record(ctx, key)); for (const auto &[field, i] : info->fields) { @@ -470,19 +434,33 @@ Status IndexUpdater::Delete(engine::Context &ctx, std::string_view key) const { continue; } - const kqir::Value &original_val = original.find(field) != original.end() ? original.at(field) : kqir::Value(); - SearchKey search_key(info->ns, info->name, field); + kqir::Value original_val; - // Invoke the appropriate delete function based on the field type - if (auto tag = dynamic_cast(i.metadata.get())) { - GET_OR_RET(DeleteTagKey(ctx, key, original_val, search_key, tag)); - } else if (auto numeric = dynamic_cast(i.metadata.get())) { - GET_OR_RET(DeleteNumericKey(ctx, key, original_val, search_key)); - } else if (auto vector = dynamic_cast(i.metadata.get())) { - GET_OR_RET(DeleteHnswVectorKey(ctx, key, original_val, search_key, vector)); - } else { - return {Status::NotOK, "Unexpected field type during deletion"}; + if (auto it = original.find(field); it != original.end()) { + original_val = it->second; } + GET_OR_RET(DeleteKey(ctx, field, key, original_val)); + } + + LOG(INFO) << "Deletion completed successfully for key: " << key; + return Status::OK(); +} + +Status IndexUpdater::DeleteKey(engine::Context &ctx, const std::string &field, std::string_view key, + const kqir::Value &original_val) const { + auto iter = info->fields.find(field); + if (iter == info->fields.end()) { + return {Status::NotOK, "No such field to do index updating"}; + } + + auto *metadata = iter->second.metadata.get(); + SearchKey search_key(info->ns, info->name, field); + if (auto tag = dynamic_cast(metadata)) { + GET_OR_RET(DeleteTagKey(ctx, key, original_val, search_key, tag)); + } else if (auto numeric [[maybe_unused]] = dynamic_cast(metadata)) { + GET_OR_RET(DeleteNumericKey(ctx, key, original_val, search_key, numeric)); + } else { + return {Status::NotOK, "Unexpected field type"}; } return Status::OK(); @@ -490,33 +468,53 @@ Status IndexUpdater::Delete(engine::Context &ctx, std::string_view key) const { Status IndexUpdater::IsKeyExpired(engine::Context &ctx, std::string_view key, const std::string &ns, bool *expired) { Database db(ctx.storage, ns); + LOG(INFO) << "checking if " << key << " exists"; + int exists_result = 0; + std::vector keys = {Slice(key)}; + auto s = db.Exists(ctx, keys, &exists_result); + + if (!s.ok() || exists_result == 0) { + LOG(INFO) << "expired!"; + *expired = true; + return Status::OK(); + } + return Status::OK(); +} + +Status IndexUpdater::Build(engine::Context &ctx) const { + auto storage = indexer->storage; + util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), ColumnFamilyID::Metadata); - std::string ns_key = db.AppendNamespacePrefix(key); - std::string metadata_value; - auto s = db.GetRawMetadata(ctx, ns_key, &metadata_value); - if (!s.ok()) { - if (s.IsNotFound()) { - *expired = true; + for (const auto &prefix : info->prefixes) { + auto ns_key = ComposeNamespaceKey(info->ns, prefix, storage->IsSlotIdEncoded()); + for (iter->Seek(ns_key); iter->Valid(); iter->Next()) { + if (!iter->key().starts_with(ns_key)) { + break; + } + + auto [_, key] = ExtractNamespaceKey(iter->key(), storage->IsSlotIdEncoded()); + + auto s = Update(ctx, {}, key.ToStringView()); + if (s.Is()) continue; + if (!s.OK()) return s; } - return {Status::NotOK}; - } - Metadata metadata(kRedisNone); - rocksdb::Slice meta_slice(metadata_value); - s = metadata.Decode(&meta_slice); - if (!s.ok()) { - return {Status::NotOK}; + if (auto s = iter->status(); !s.ok()) { + return {Status::NotOK, s.ToString()}; + } } - *expired = metadata.Expired(); + return Status::OK(); } -Status IndexUpdater::Build(engine::Context &ctx) const { +Status IndexUpdater::ScanKeys(engine::Context &ctx) const { + LOG(INFO) << "scanning"; auto storage = indexer->storage; util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), ColumnFamilyID::Metadata); for (const auto &prefix : info->prefixes) { auto ns_key = ComposeNamespaceKey(info->ns, prefix, storage->IsSlotIdEncoded()); + for (iter->Seek(ns_key); iter->Valid(); iter->Next()) { if (!iter->key().starts_with(ns_key)) { break; @@ -525,23 +523,35 @@ Status IndexUpdater::Build(engine::Context &ctx) const { auto [_, key] = ExtractNamespaceKey(iter->key(), storage->IsSlotIdEncoded()); bool is_expired = false; + LOG(INFO) << "about to check if key is expired: " << key; Status s = IsKeyExpired(ctx, key.ToStringView(), info->ns, &is_expired); if (!s.IsOK()) { return s; } if (is_expired) { - s = Delete(ctx, key.ToStringView()); - if (!s.IsOK() && !s.Is()) { - return s; + Status delete_status = Delete(ctx, key.ToStringView()); + if (!delete_status.IsOK() && !delete_status.Is()) { + return delete_status; } + + auto batch = storage->GetWriteBatchBase(); + auto cf_handle_kv = + storage->GetCFHandle(ColumnFamilyID::Metadata); + + batch->Delete(cf_handle_kv, iter->key()); + + auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); + if (!s.ok()) { + return {Status::NotOK, s.ToString()}; + } + + LOG(INFO) << "Deleted expired key: " << key; + continue; } - - s = Update(ctx, {}, key.ToStringView()); - if (s.Is()) continue; - if (!s.OK()) return s; } + // Check if there were any errors during iteration if (auto s = iter->status(); !s.ok()) { return {Status::NotOK, s.ToString()}; } @@ -550,10 +560,10 @@ Status IndexUpdater::Build(engine::Context &ctx) const { return Status::OK(); } -void GlobalIndexer::Add(IndexUpdater updater) { - updater.indexer = this; - for (const auto &prefix : updater.info->prefixes) { - prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false), updater); +void GlobalIndexer::Add(const std::shared_ptr &updater) { + updater->indexer = this; + for (const auto &prefix : updater->info->prefixes) { + prefix_map.insert(ComposeNamespaceKey(updater->info->ns, prefix, false), updater); } updater_list.push_back(updater); } @@ -588,7 +598,7 @@ StatusOr GlobalIndexer::Record(engine::Context &ctx } Status GlobalIndexer::Update(engine::Context &ctx, const RecordResult &original) { - return original.updater.Update(ctx, original.fields, original.key); + return original.updater->Update(ctx, original.fields, original.key); } } // namespace redis diff --git a/src/search/indexer.h b/src/search/indexer.h index 954a5d7eba7..b4ee7269722 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -83,15 +83,15 @@ struct IndexUpdater { const kqir::Value ¤t) const; Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const; + Status Delete(engine::Context &ctx, std::string_view key) const; + + Status DeleteKey(engine::Context &ctx, const std::string &field, std::string_view key, const kqir::Value &original_val) const; + Status DeleteTagKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, const SearchKey &search_key, const TagFieldMetadata *tag) const; - Status DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, - const SearchKey &search_key) const; - - Status DeleteHnswVectorKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, - const SearchKey &search_key, HnswVectorFieldMetadata *vector) const; + Status DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, const SearchKey &search_key, const NumericFieldMetadata *num) const; Status Build(engine::Context &ctx) const; @@ -103,30 +103,33 @@ struct IndexUpdater { Status UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original, const kqir::Value ¤t, const SearchKey &search_key, HnswVectorFieldMetadata *vector) const; + Status ScanKeys(engine::Context &ctx) const; static Status IsKeyExpired(engine::Context &ctx, std::string_view key, const std::string &ns, bool *expired); }; struct GlobalIndexer { - using FieldValues = IndexUpdater::FieldValues; - struct RecordResult { - IndexUpdater updater; - std::string key; - FieldValues fields; - }; + using FieldValues = IndexUpdater::FieldValues; + + struct RecordResult { + std::shared_ptr updater; + std::string key; + FieldValues fields; + }; - tsl::htrie_map prefix_map; - std::vector updater_list; + tsl::htrie_map> prefix_map; + std::vector> updater_list; // Use shared_ptr - engine::Storage *storage = nullptr; + engine::Storage* storage = nullptr; - explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {} + explicit GlobalIndexer(engine::Storage* storage) : storage(storage) {} - void Add(IndexUpdater updater); - void Remove(const kqir::IndexInfo *index); - void RemoveKeyFromIndex(engine::Storage *storage, std::string_view key, const std::string &ns); + // Update function signatures to accept shared_ptr + void Add(const std::shared_ptr &updater); + void Remove(const kqir::IndexInfo* index); + void RemoveKeyFromIndex(engine::Storage* storage, std::string_view key, const std::string& ns); - StatusOr Record(engine::Context &ctx, std::string_view key, const std::string &ns); - static Status Update(engine::Context &ctx, const RecordResult &original); + StatusOr Record(engine::Context& ctx, std::string_view key, const std::string& ns); + static Status Update(engine::Context& ctx, const RecordResult& original); }; } // namespace redis diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc index b058a2dfd6a..b956b9f5d23 100644 --- a/src/types/redis_json.cc +++ b/src/types/redis_json.cc @@ -426,7 +426,7 @@ rocksdb::Status Json::Del(engine::Context &ctx, const std::string &user_key, con JsonValue json_val; JsonMetadata metadata; auto s = read(ctx, ns_key, &metadata, &json_val); - + LOG(INFO) << "IN THE DELETE FUNCTION" << s.ToString(); if (!s.ok() && !s.IsNotFound()) return s; if (s.IsNotFound()) { return rocksdb::Status::OK(); From f1238f59e61fa0a4264f4d758cd59dd8c8c1aec9 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 27 Sep 2024 16:08:41 -0400 Subject: [PATCH 3/5] Index Updates --- src/search/index_manager.h | 5 +---- src/search/indexer.cc | 37 +++++++------------------------------ src/search/indexer.h | 32 +++++++++++++++----------------- src/types/redis_json.cc | 1 - 4 files changed, 23 insertions(+), 52 deletions(-) diff --git a/src/search/index_manager.h b/src/search/index_manager.h index ec4f0554054..0306cbaa474 100644 --- a/src/search/index_manager.h +++ b/src/search/index_manager.h @@ -152,7 +152,6 @@ struct IndexManager { auto batch = storage->GetWriteBatchBase(); std::string meta_val; - LOG(INFO) << meta_val; info->metadata.Encode(&meta_val); auto s = batch->Put(cf, index_key.ConstructIndexMeta(), meta_val); if (!s.ok()) { @@ -170,7 +169,6 @@ struct IndexManager { SearchKey field_key(info->ns, info->name, field_info.name); std::string field_val; - LOG(INFO) << field_val; field_info.metadata->Encode(&field_val); s = batch->Put(cf, field_key.ConstructFieldMeta(), field_val); @@ -213,10 +211,9 @@ struct IndexManager { StatusOr> Search(engine::Context &ctx, std::unique_ptr ir, const std::string &ns) const { - for (auto updater : indexer->updater_list) { + for (auto &updater : indexer->updater_list) { GET_OR_RET(updater.ScanKeys(ctx)); } - LOG(INFO) << "searched"; auto plan_op = GET_OR_RET(GeneratePlan(std::move(ir), ns)); kqir::ExecutorContext executor_ctx(plan_op.get(), storage); diff --git a/src/search/indexer.cc b/src/search/indexer.cc index b415f477598..1a1c2faf936 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -43,20 +43,16 @@ StatusOr FieldValueRetriever::Create(IndexOnDataType type, Hash db(storage, ns); std::string ns_key = db.AppendNamespacePrefix(key); HashMetadata metadata(false); - LOG(INFO) << "creating hash metadata" << ns_key; - auto s = db.GetMetadata(ctx, ns_key, &metadata); if (!s.ok()) return {Status::NotOK, s.ToString()}; return FieldValueRetriever(db, metadata, key); } else if (type == IndexOnDataType::JSON) { Json db(storage, ns); std::string ns_key = db.AppendNamespacePrefix(key); - LOG(INFO) << "creating json metadata" << ns_key; JsonMetadata metadata(false); JsonValue value; auto s = db.read(ctx, ns_key, &metadata, &value); if (!s.ok()) return {Status::NotOK, s.ToString()}; - LOG(INFO) << s.ToString(); return FieldValueRetriever(value); } else { assert(false && "unreachable code: unexpected IndexOnDataType"); @@ -171,7 +167,6 @@ StatusOr IndexUpdater::Record(engine::Context &ctx, s } auto retriever = GET_OR_RET(FieldValueRetriever::Create(info->metadata.on_data_type, key, indexer->storage, ns)); - FieldValues values; for (const auto &[field, i] : info->fields) { if (i.metadata->noindex) { @@ -395,7 +390,6 @@ Status IndexUpdater::DeleteTagKey(engine::Context &ctx, std::string_view key, co auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) return {Status::NotOK, s.ToString()}; - LOG(INFO) << "tag deleted successfully"; return Status::OK(); } @@ -403,11 +397,6 @@ Status IndexUpdater::DeleteNumericKey(engine::Context &ctx, std::string_view key const SearchKey &search_key, [[maybe_unused]] const NumericFieldMetadata *num) const { CHECK(original.IsNull() || original.Is()); - if (!original.IsNull()) { - LOG(INFO) << "null sadge"; - return Status::OK(); - } - LOG(INFO) << "DELETING NUMERIC KEY VALUE for key: " << key; auto *storage = indexer->storage; auto batch = storage->GetWriteBatchBase(); @@ -422,7 +411,6 @@ Status IndexUpdater::DeleteNumericKey(engine::Context &ctx, std::string_view key } auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) return {Status::NotOK, s.ToString()}; - LOG(INFO) << "deleted successfully"; return Status::OK(); } @@ -441,8 +429,6 @@ Status IndexUpdater::Delete(engine::Context &ctx, std::string_view key) const { } GET_OR_RET(DeleteKey(ctx, field, key, original_val)); } - - LOG(INFO) << "Deletion completed successfully for key: " << key; return Status::OK(); } @@ -468,13 +454,11 @@ Status IndexUpdater::DeleteKey(engine::Context &ctx, const std::string &field, s Status IndexUpdater::IsKeyExpired(engine::Context &ctx, std::string_view key, const std::string &ns, bool *expired) { Database db(ctx.storage, ns); - LOG(INFO) << "checking if " << key << " exists"; int exists_result = 0; std::vector keys = {Slice(key)}; auto s = db.Exists(ctx, keys, &exists_result); if (!s.ok() || exists_result == 0) { - LOG(INFO) << "expired!"; *expired = true; return Status::OK(); } @@ -508,7 +492,6 @@ Status IndexUpdater::Build(engine::Context &ctx) const { } Status IndexUpdater::ScanKeys(engine::Context &ctx) const { - LOG(INFO) << "scanning"; auto storage = indexer->storage; util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), ColumnFamilyID::Metadata); @@ -523,7 +506,6 @@ Status IndexUpdater::ScanKeys(engine::Context &ctx) const { auto [_, key] = ExtractNamespaceKey(iter->key(), storage->IsSlotIdEncoded()); bool is_expired = false; - LOG(INFO) << "about to check if key is expired: " << key; Status s = IsKeyExpired(ctx, key.ToStringView(), info->ns, &is_expired); if (!s.IsOK()) { return s; @@ -535,23 +517,18 @@ Status IndexUpdater::ScanKeys(engine::Context &ctx) const { } auto batch = storage->GetWriteBatchBase(); - auto cf_handle_kv = - storage->GetCFHandle(ColumnFamilyID::Metadata); + auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Metadata); - batch->Delete(cf_handle_kv, iter->key()); + batch->Delete(cf_handle, iter->key()); auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) { return {Status::NotOK, s.ToString()}; } - - LOG(INFO) << "Deleted expired key: " << key; - continue; } } - // Check if there were any errors during iteration if (auto s = iter->status(); !s.ok()) { return {Status::NotOK, s.ToString()}; } @@ -560,10 +537,10 @@ Status IndexUpdater::ScanKeys(engine::Context &ctx) const { return Status::OK(); } -void GlobalIndexer::Add(const std::shared_ptr &updater) { - updater->indexer = this; - for (const auto &prefix : updater->info->prefixes) { - prefix_map.insert(ComposeNamespaceKey(updater->info->ns, prefix, false), updater); +void GlobalIndexer::Add(IndexUpdater updater) { + updater.indexer = this; + for (const auto &prefix : updater.info->prefixes) { + prefix_map.insert(ComposeNamespaceKey(updater.info->ns, prefix, false), updater); } updater_list.push_back(updater); } @@ -598,7 +575,7 @@ StatusOr GlobalIndexer::Record(engine::Context &ctx } Status GlobalIndexer::Update(engine::Context &ctx, const RecordResult &original) { - return original.updater->Update(ctx, original.fields, original.key); + return original.updater.Update(ctx, original.fields, original.key); } } // namespace redis diff --git a/src/search/indexer.h b/src/search/indexer.h index b4ee7269722..2a28add5efa 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -108,28 +108,26 @@ struct IndexUpdater { }; struct GlobalIndexer { - using FieldValues = IndexUpdater::FieldValues; - - struct RecordResult { - std::shared_ptr updater; - std::string key; - FieldValues fields; - }; + using FieldValues = IndexUpdater::FieldValues; + struct RecordResult { + IndexUpdater updater; + std::string key; + FieldValues fields; + }; - tsl::htrie_map> prefix_map; - std::vector> updater_list; // Use shared_ptr + tsl::htrie_map prefix_map; + std::vector updater_list; - engine::Storage* storage = nullptr; + engine::Storage *storage = nullptr; - explicit GlobalIndexer(engine::Storage* storage) : storage(storage) {} + explicit GlobalIndexer(engine::Storage *storage) : storage(storage) {} - // Update function signatures to accept shared_ptr - void Add(const std::shared_ptr &updater); - void Remove(const kqir::IndexInfo* index); - void RemoveKeyFromIndex(engine::Storage* storage, std::string_view key, const std::string& ns); + void Add(IndexUpdater updater); + void Remove(const kqir::IndexInfo *index); + void RemoveKeyFromIndex(engine::Storage *storage, std::string_view key, const std::string &ns); - StatusOr Record(engine::Context& ctx, std::string_view key, const std::string& ns); - static Status Update(engine::Context& ctx, const RecordResult& original); + StatusOr Record(engine::Context &ctx, std::string_view key, const std::string &ns); + static Status Update(engine::Context &ctx, const RecordResult &original); }; } // namespace redis diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc index b956b9f5d23..ccc4c68ce42 100644 --- a/src/types/redis_json.cc +++ b/src/types/redis_json.cc @@ -426,7 +426,6 @@ rocksdb::Status Json::Del(engine::Context &ctx, const std::string &user_key, con JsonValue json_val; JsonMetadata metadata; auto s = read(ctx, ns_key, &metadata, &json_val); - LOG(INFO) << "IN THE DELETE FUNCTION" << s.ToString(); if (!s.ok() && !s.IsNotFound()) return s; if (s.IsNotFound()) { return rocksdb::Status::OK(); From 35b170f4250caf48e275a3bc13989afdbb92f23f Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 27 Sep 2024 17:06:44 -0400 Subject: [PATCH 4/5] added test + lint --- src/search/index_manager.h | 1 - src/search/indexer.cc | 2 +- src/search/indexer.h | 8 ++--- src/types/redis_json.cc | 1 + tests/gocase/unit/search/search_test.go | 43 +++++++++++++++++++++++++ 5 files changed, 49 insertions(+), 6 deletions(-) diff --git a/src/search/index_manager.h b/src/search/index_manager.h index 0306cbaa474..4e67e3e42d3 100644 --- a/src/search/index_manager.h +++ b/src/search/index_manager.h @@ -208,7 +208,6 @@ struct IndexManager { return plan_op; } - StatusOr> Search(engine::Context &ctx, std::unique_ptr ir, const std::string &ns) const { for (auto &updater : indexer->updater_list) { diff --git a/src/search/indexer.cc b/src/search/indexer.cc index 1a1c2faf936..4caecff50c0 100644 --- a/src/search/indexer.cc +++ b/src/search/indexer.cc @@ -519,7 +519,7 @@ Status IndexUpdater::ScanKeys(engine::Context &ctx) const { auto batch = storage->GetWriteBatchBase(); auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Metadata); - batch->Delete(cf_handle, iter->key()); + batch->Delete(cf_handle, iter->key()); auto s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch()); if (!s.ok()) { diff --git a/src/search/indexer.h b/src/search/indexer.h index 2a28add5efa..785515fdea6 100644 --- a/src/search/indexer.h +++ b/src/search/indexer.h @@ -83,15 +83,16 @@ struct IndexUpdater { const kqir::Value ¤t) const; Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const; - Status Delete(engine::Context &ctx, std::string_view key) const; - Status DeleteKey(engine::Context &ctx, const std::string &field, std::string_view key, const kqir::Value &original_val) const; + Status DeleteKey(engine::Context &ctx, const std::string &field, std::string_view key, + const kqir::Value &original_val) const; Status DeleteTagKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, const SearchKey &search_key, const TagFieldMetadata *tag) const; - Status DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, const SearchKey &search_key, const NumericFieldMetadata *num) const; + Status DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original, + const SearchKey &search_key, const NumericFieldMetadata *num) const; Status Build(engine::Context &ctx) const; @@ -124,7 +125,6 @@ struct GlobalIndexer { void Add(IndexUpdater updater); void Remove(const kqir::IndexInfo *index); - void RemoveKeyFromIndex(engine::Storage *storage, std::string_view key, const std::string &ns); StatusOr Record(engine::Context &ctx, std::string_view key, const std::string &ns); static Status Update(engine::Context &ctx, const RecordResult &original); diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc index ccc4c68ce42..56b68670c7b 100644 --- a/src/types/redis_json.cc +++ b/src/types/redis_json.cc @@ -426,6 +426,7 @@ rocksdb::Status Json::Del(engine::Context &ctx, const std::string &user_key, con JsonValue json_val; JsonMetadata metadata; auto s = read(ctx, ns_key, &metadata, &json_val); + if (!s.ok() && !s.IsNotFound()) return s; if (s.IsNotFound()) { return rocksdb::Status::OK(); diff --git a/tests/gocase/unit/search/search_test.go b/tests/gocase/unit/search/search_test.go index 0144599439f..76cc4e6a863 100644 --- a/tests/gocase/unit/search/search_test.go +++ b/tests/gocase/unit/search/search_test.go @@ -24,6 +24,7 @@ import ( "context" "encoding/binary" "testing" + "time" "github.com/apache/kvrocks/tests/gocase/util" "github.com/redis/go-redis/v9" @@ -169,4 +170,46 @@ func TestSearch(t *testing.T) { srv.Restart() verify(t) }) + + t.Run("JsonTagWithExpiry", func(t *testing.T) { + require.NoError(t, rdb.Do(ctx, "FT.CREATE", "testidx", "ON", "JSON", "PREFIX", "1", "test:", "SCHEMA", "a", "TAG", "b", "NUMERIC").Err()) + + require.NoError(t, rdb.Do(ctx, "JSON.SET", "test:k1", "$", `{"a": "x,y", "b": 11}`).Err()) + require.NoError(t, rdb.Do(ctx, "JSON.SET", "test:k2", "$", `{"a": "y,z", "b": 22}`).Err()) + require.NoError(t, rdb.Do(ctx, "JSON.SET", "test:k3", "$", `{"a": "x,z", "b": 33}`).Err()) + + searchQuery := `@a:{z} @b:[-inf (30]` + initialSearch := rdb.Do(ctx, "FT.SEARCH", "testidx", searchQuery) + require.NoError(t, initialSearch.Err()) + + initialResults := initialSearch.Val().([]interface{}) + require.Equal(t, 3, len(initialResults)) + require.Equal(t, int64(1), initialResults[0]) + require.Equal(t, "test:k2", initialResults[1]) + + fields := initialResults[2].([]interface{}) + fieldMap := make(map[string]string) + for i := 0; i < len(fields); i += 2 { + fieldMap[fields[i].(string)] = fields[i+1].(string) + } + require.Equal(t, "y,z", fieldMap["a"]) + require.Equal(t, "22", fieldMap["b"]) + + require.NoError(t, rdb.Expire(ctx, "test:k2", 5*time.Second).Err()) + + checkSearch := rdb.Do(ctx, "FT.SEARCH", "testidx", searchQuery) + require.NoError(t, checkSearch.Err()) + + time.Sleep(6 * time.Second) + + finalSearch := rdb.Do(ctx, "FT.SEARCH", "testidx", searchQuery) + require.Error(t, finalSearch.Err()) + + // verify that the index entry for test:k2 is deleted + indexKey := "testidx:a:z:test:k2" + + val, err := rdb.Get(ctx, indexKey).Result() + require.ErrorIs(t, err, redis.Nil, "Index entry for expired key2 should be deleted") + require.Equal(t, "", val, "Value for expired index key should be empty") + }) } From fe6dd3765968363ddb0fe68b3980e7177284bd56 Mon Sep 17 00:00:00 2001 From: Jonathan Chen Date: Fri, 27 Sep 2024 17:07:34 -0400 Subject: [PATCH 5/5] lint --- src/types/redis_json.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc index 56b68670c7b..b058a2dfd6a 100644 --- a/src/types/redis_json.cc +++ b/src/types/redis_json.cc @@ -426,7 +426,7 @@ rocksdb::Status Json::Del(engine::Context &ctx, const std::string &user_key, con JsonValue json_val; JsonMetadata metadata; auto s = read(ctx, ns_key, &metadata, &json_val); - + if (!s.ok() && !s.IsNotFound()) return s; if (s.IsNotFound()) { return rocksdb::Status::OK();