Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added key expiration support #2556

Open
wants to merge 7 commits into
base: unstable
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
added expiration
jonathanc-n committed Sep 25, 2024
commit 0964db6bf768524275090baf7d457686641fcea3
172 changes: 171 additions & 1 deletion src/search/indexer.cc
Original file line number Diff line number Diff line change
@@ -354,6 +354,163 @@ Status IndexUpdater::Update(engine::Context &ctx, const FieldValues &original, s
return Status::OK();
}

Status IndexUpdater::DeleteTagKey(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const SearchKey &search_key, const TagFieldMetadata *tag) const {
CHECK(original.IsNull() || original.Is<kqir::StringArray>());

if (original.IsNull()) {
return Status::OK();
}

auto original_tags = original.IsNull() ? std::vector<std::string>() : original.Get<kqir::StringArray>();

auto to_tag_set = [&](const std::vector<std::string> &tags) -> std::set<std::string> {
if (tag->case_sensitive) {
return {tags.begin(), tags.end()};
} else {
std::set<std::string> res;
std::transform(tags.begin(), tags.end(), std::inserter(res, res.begin()), util::ToLower);
return res;
}
};

std::set<std::string> tags_to_delete = to_tag_set(original_tags);

if (tags_to_delete.empty()) {
return Status::OK();
}

auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

for (const auto &tag_str : tags_to_delete) {
auto index_key = search_key.ConstructTagFieldData(tag_str, key);

auto delete_status = batch->Delete(cf_handle, index_key);
if (!delete_status.ok()) {
return {Status::NotOK, delete_status.ToString()};
}
}

auto write_status = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!write_status.ok()) {
return {Status::NotOK, write_status.ToString()};
}

return Status::OK();
}

Status IndexUpdater::DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const SearchKey &search_key) const {
CHECK(original.IsNull() || original.Is<kqir::Numeric>());

auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto cf_handle = storage->GetCFHandle(ColumnFamilyID::Search);

auto index_key = search_key.ConstructNumericFieldData(original.Get<kqir::Numeric>(), key);

auto s = batch->Delete(cf_handle, index_key);
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

s = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!s.ok()) {
return {Status::NotOK, s.ToString()};
}

return Status::OK();
}

Status IndexUpdater::DeleteHnswVectorKey(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const SearchKey &search_key, HnswVectorFieldMetadata *vector) const {
CHECK(original.IsNull() || original.Is<kqir::NumericArray>());

if (original.IsNull()) {
return Status::OK();
}

auto *storage = indexer->storage;
auto batch = storage->GetWriteBatchBase();
auto hnsw = HnswIndex(search_key, vector, storage);

auto delete_status = hnsw.DeleteVectorEntry(ctx, key, batch);
if (!delete_status.IsOK()) {
return {Status::NotOK};
}

auto write_status = storage->Write(ctx, storage->DefaultWriteOptions(), batch->GetWriteBatch());
if (!write_status.ok()) {
return {Status::NotOK, write_status.ToString()};
}

return Status::OK();
}

Status IndexUpdater::Delete(engine::Context &ctx, std::string_view key) const {
const auto &ns = info->ns;
Database db(indexer->storage, ns);

RedisType type = kRedisNone;
auto s = db.Type(ctx, key, &type);
if (!s.ok()) return {Status::NotOK, s.ToString()};

if (type == kRedisNone) return Status::OK();

if (type != static_cast<RedisType>(info->metadata.on_data_type)) {
return {Status::TypeMismatched};
}

auto original = GET_OR_RET(Record(ctx, key));

for (const auto &[field, i] : info->fields) {
if (i.metadata->noindex) {
continue;
}

const kqir::Value &original_val = original.find(field) != original.end() ? original.at(field) : kqir::Value();
SearchKey search_key(info->ns, info->name, field);

// Invoke the appropriate delete function based on the field type
if (auto tag = dynamic_cast<TagFieldMetadata *>(i.metadata.get())) {
GET_OR_RET(DeleteTagKey(ctx, key, original_val, search_key, tag));
} else if (auto numeric = dynamic_cast<NumericFieldMetadata *>(i.metadata.get())) {
GET_OR_RET(DeleteNumericKey(ctx, key, original_val, search_key));
} else if (auto vector = dynamic_cast<HnswVectorFieldMetadata *>(i.metadata.get())) {
GET_OR_RET(DeleteHnswVectorKey(ctx, key, original_val, search_key, vector));
} else {
return {Status::NotOK, "Unexpected field type during deletion"};
}
}

return Status::OK();
}

Status IndexUpdater::IsKeyExpired(engine::Context &ctx, std::string_view key, const std::string &ns, bool *expired) {
Database db(ctx.storage, ns);

std::string ns_key = db.AppendNamespacePrefix(key);
std::string metadata_value;
auto s = db.GetRawMetadata(ctx, ns_key, &metadata_value);
if (!s.ok()) {
if (s.IsNotFound()) {
*expired = true;
}
return {Status::NotOK};
}

Metadata metadata(kRedisNone);
rocksdb::Slice meta_slice(metadata_value);
s = metadata.Decode(&meta_slice);
if (!s.ok()) {
return {Status::NotOK};
}
*expired = metadata.Expired();
return Status::OK();
}

Status IndexUpdater::Build(engine::Context &ctx) const {
auto storage = indexer->storage;
util::UniqueIterator iter(ctx, ctx.DefaultScanOptions(), ColumnFamilyID::Metadata);
@@ -367,7 +524,20 @@ Status IndexUpdater::Build(engine::Context &ctx) const {

auto [_, key] = ExtractNamespaceKey(iter->key(), storage->IsSlotIdEncoded());

auto s = Update(ctx, {}, key.ToStringView());
bool is_expired = false;
Status s = IsKeyExpired(ctx, key.ToStringView(), info->ns, &is_expired);
if (!s.IsOK()) {
return s;
}
if (is_expired) {
s = Delete(ctx, key.ToStringView());
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
return s;
}
continue;
}

s = Update(ctx, {}, key.ToStringView());
if (s.Is<Status::TypeMismatched>()) continue;
if (!s.OK()) return s;
}
12 changes: 12 additions & 0 deletions src/search/indexer.h
Original file line number Diff line number Diff line change
@@ -83,6 +83,16 @@ struct IndexUpdater {
const kqir::Value &current) const;
Status Update(engine::Context &ctx, const FieldValues &original, std::string_view key) const;

Status Delete(engine::Context &ctx, std::string_view key) const;
Status DeleteTagKey(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const SearchKey &search_key, const TagFieldMetadata *tag) const;

Status DeleteNumericKey(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const SearchKey &search_key) const;

Status DeleteHnswVectorKey(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const SearchKey &search_key, HnswVectorFieldMetadata *vector) const;

Status Build(engine::Context &ctx) const;

Status UpdateTagIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original,
@@ -93,6 +103,7 @@ struct IndexUpdater {
Status UpdateHnswVectorIndex(engine::Context &ctx, std::string_view key, const kqir::Value &original,
const kqir::Value &current, const SearchKey &search_key,
HnswVectorFieldMetadata *vector) const;
static Status IsKeyExpired(engine::Context &ctx, std::string_view key, const std::string &ns, bool *expired);
};

struct GlobalIndexer {
@@ -112,6 +123,7 @@ struct GlobalIndexer {

void Add(IndexUpdater updater);
void Remove(const kqir::IndexInfo *index);
void RemoveKeyFromIndex(engine::Storage *storage, std::string_view key, const std::string &ns);

StatusOr<RecordResult> Record(engine::Context &ctx, std::string_view key, const std::string &ns);
static Status Update(engine::Context &ctx, const RecordResult &original);