Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update IVF_PQ to use temp directory pattern to support parallel ingestion #554

Merged
merged 38 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
90c0912
Support OOC in IVF_PQ ingestion
jparismorgan Sep 23, 2024
422c27f
consoliate farther along
jparismorgan Sep 24, 2024
35309d6
remove unneeded test and start of temporal_policy fixes
jparismorgan Sep 24, 2024
bd68766
some fixes, working on api for ingest()
jparismorgan Sep 24, 2024
b7d87cf
more tests passing
jparismorgan Sep 26, 2024
e521633
unit_ivf_pq_index test passing
jparismorgan Sep 30, 2024
d754be7
unit_api_ivf_pq_index almost working
jparismorgan Sep 30, 2024
a050c1c
fix tests
jparismorgan Sep 30, 2024
a67f4aa
more fixes
jparismorgan Oct 3, 2024
a649631
python fixes, some passing
jparismorgan Oct 3, 2024
3c77471
more tests fixed
jparismorgan Oct 8, 2024
a52798b
fixes
jparismorgan Oct 9, 2024
0d8523b
move partitions to train() instead of create()
jparismorgan Oct 9, 2024
a83a3b4
more fixes, all but one test passing
jparismorgan Oct 11, 2024
9515a1a
Merge branch 'main' of https://github.com/TileDB-Inc/TileDB-Vector-Se…
jparismorgan Oct 14, 2024
0955ee7
test fixes
jparismorgan Oct 14, 2024
346735a
fix python test
jparismorgan Oct 14, 2024
928c379
fix asan and type errors, add train_no_init unit tests
jparismorgan Oct 14, 2024
ff5d32f
cleanup code
jparismorgan Oct 14, 2024
7d5b46f
lint
jparismorgan Oct 15, 2024
0e2bd6f
Merge branch 'main' of https://github.com/TileDB-Inc/TileDB-Vector-Se…
jparismorgan Oct 15, 2024
73c9b0c
fix unit_kmeans
jparismorgan Oct 15, 2024
2f0caf1
cleanup code
jparismorgan Oct 15, 2024
0f36bc2
fix bug with incorrect partitions on second train
jparismorgan Oct 15, 2024
023360f
cleanup code
jparismorgan Oct 16, 2024
ded564e
Merge branch 'main' of https://github.com/TileDB-Inc/TileDB-Vector-Se…
jparismorgan Oct 16, 2024
3f26ae4
Merge branch 'main' of https://github.com/TileDB-Inc/TileDB-Vector-Se…
jparismorgan Oct 16, 2024
70c0f0f
fix AddressSanitizer: FPE, fix incorrect type in create()
jparismorgan Oct 16, 2024
58eaec4
fix flaky test
jparismorgan Oct 16, 2024
4162605
fix asan error when loading index with no data
jparismorgan Oct 17, 2024
105dee3
cleanup
jparismorgan Oct 17, 2024
decbe16
update to use temp dir pattern from python
jparismorgan Oct 17, 2024
b6af0fa
Merge branch 'main' of https://github.com/TileDB-Inc/TileDB-Vector-Se…
jparismorgan Oct 17, 2024
bdaa5a9
cleanup
jparismorgan Oct 17, 2024
d5f676e
fix missing args
jparismorgan Oct 17, 2024
9b3e42d
cleanup
jparismorgan Oct 17, 2024
e46fea2
fix create_temp_data_group()
jparismorgan Oct 17, 2024
38c1e52
cleanup code
jparismorgan Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 40 additions & 30 deletions apis/python/src/tiledb/vector_search/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,11 @@ def ingest(
EXTERNAL_IDS_ARRAY_NAME = storage_formats[storage_version][
"EXTERNAL_IDS_ARRAY_NAME"
]
if index_type == "IVF_PQ":
PARTIAL_WRITE_ARRAY_DIR = storage_formats[storage_version][
"PARTIAL_WRITE_ARRAY_DIR"
]
else:
PARTIAL_WRITE_ARRAY_DIR = (
storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"]
+ "_"
+ "".join(random.choices(string.ascii_letters, k=10))
)
PARTIAL_WRITE_ARRAY_DIR = (
storage_formats[storage_version]["PARTIAL_WRITE_ARRAY_DIR"]
+ "_"
+ "".join(random.choices(string.ascii_letters, k=10))
)
DEFAULT_ATTR_FILTERS = storage_formats[storage_version]["DEFAULT_ATTR_FILTERS"]

# This is used to auto-configure `input_vectors_per_work_item`
Expand Down Expand Up @@ -603,13 +598,26 @@ def create_temp_data_group(
group: tiledb.Group,
) -> tiledb.Group:
partial_write_array_dir_uri = f"{group.uri}/{PARTIAL_WRITE_ARRAY_DIR}"
try:
tiledb.group_create(partial_write_array_dir_uri)
add_to_group(group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR)
except tiledb.TileDBError as err:
message = str(err)
if "already exists" not in message:
raise err
if index_type == "IVF_PQ":
ctx = vspy.Ctx(config)
index = vspy.IndexIVFPQ(
ctx,
index_group_uri,
vspy.IndexLoadStrategy.PQ_INDEX,
0,
to_temporal_policy(index_timestamp),
)
index.create_temp_data_group(PARTIAL_WRITE_ARRAY_DIR)
else:
try:
tiledb.group_create(partial_write_array_dir_uri)
add_to_group(
group, partial_write_array_dir_uri, PARTIAL_WRITE_ARRAY_DIR
)
except tiledb.TileDBError as err:
message = str(err)
if "already exists" not in message:
raise err
return tiledb.Group(partial_write_array_dir_uri, "w")

def create_partial_write_array_group(
Expand Down Expand Up @@ -779,16 +787,6 @@ def create_arrays(
create_index_array=True,
asset_creation_threads=asset_creation_threads,
)
elif index_type == "IVF_PQ":
ctx = vspy.Ctx(config)
index = vspy.IndexIVFPQ(
ctx,
index_group_uri,
vspy.IndexLoadStrategy.PQ_INDEX,
0,
to_temporal_policy(index_timestamp),
)
index.create_temp_data_group()
# Note that we don't create type-erased indexes (i.e. Vamana) here. Instead we create them
# at very start of ingest() in C++.
elif not is_type_erased_index(index_type):
Expand Down Expand Up @@ -1294,9 +1292,13 @@ def ivf_pq_train_udf(
else np.empty((0, dimensions), dtype=vector_type)
)

# Filter out the updated vectors from the sample vectors.
# NOTE: We add kind='sort' as a workaround to this bug: https://github.com/numpy/numpy/issues/26922
updates_filter = np.in1d(
external_ids, updated_ids, assume_unique=True, invert=True
external_ids,
updated_ids,
assume_unique=True,
invert=True,
kind="sort",
)
sample_vectors = sample_vectors[updates_filter]

Expand Down Expand Up @@ -1922,6 +1924,7 @@ def ingest_vectors_udf(
start=part,
end=part_end,
partition_start=part_id * (partitions + 1),
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)
else:
ivf_index_tdb(
Expand Down Expand Up @@ -1979,6 +1982,7 @@ def ingest_vectors_udf(
start=part,
end=part_end,
partition_start=part_id * (partitions + 1),
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)
else:
ivf_index(
Expand Down Expand Up @@ -2069,6 +2073,7 @@ def ingest_additions_udf(
start=write_offset,
end=0,
partition_start=partition_start,
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)
else:
ivf_index(
Expand Down Expand Up @@ -2161,7 +2166,12 @@ def ivf_pq_consolidate_partition_udf(
to_temporal_policy(index_timestamp),
)
index.consolidate_partitions(
partitions, work_items, partition_id_start, partition_id_end, batch
partitions=partitions,
work_items=work_items,
partition_id_start=partition_id_start,
partition_id_end=partition_id_end,
batch=batch,
partial_write_array_dir=PARTIAL_WRITE_ARRAY_DIR,
)

def consolidate_partition_udf(
Expand Down
23 changes: 16 additions & 7 deletions apis/python/src/tiledb/vector_search/type_erased_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,10 @@ void init_type_erased_module(py::module_& m) {
py::arg("temporal_policy") = std::nullopt)
.def(
"create_temp_data_group",
[](IndexIVFPQ& index) { index.create_temp_data_group(); })
[](IndexIVFPQ& index, const std::string& partial_write_array_dir) {
index.create_temp_data_group(partial_write_array_dir);
},
py::arg("partial_write_array_dir"))
.def(
"train",
[](IndexIVFPQ& index,
Expand Down Expand Up @@ -577,41 +580,47 @@ void init_type_erased_module(py::module_& m) {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) {
size_t partition_start,
const std::string& partial_write_array_dir) {
index.ingest_parts(
input_vectors,
external_ids,
deleted_ids,
start,
end,
partition_start);
partition_start,
partial_write_array_dir);
},
py::arg("input_vectors"),
py::arg("external_ids"),
py::arg("deleted_ids"),
py::arg("start"),
py::arg("end"),
py::arg("partition_start"))
py::arg("partition_start"),
py::arg("partial_write_array_dir"))
.def(
"consolidate_partitions",
[](IndexIVFPQ& index,
size_t partitions,
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) {
size_t batch,
const std::string& partial_write_array_dir) {
index.consolidate_partitions(
partitions,
work_items,
partition_id_start,
partition_id_end,
batch);
batch,
partial_write_array_dir);
},
py::arg("partitions"),
py::arg("work_items"),
py::arg("partition_id_start"),
py::arg("partition_id_end"),
py::arg("batch"))
py::arg("batch"),
py::arg("partial_write_array_dir"))
.def(
"ingest",
[](IndexIVFPQ& index, const FeatureVectorArray& input_vectors) {
Expand Down
63 changes: 47 additions & 16 deletions src/include/api/ivf_pq_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,12 @@ class IndexIVFPQ {
dimensions_ = index_->dimensions();
}

void create_temp_data_group() {
void create_temp_data_group(const std::string& partial_write_array_dir) {
if (!index_) {
throw std::runtime_error(
"Cannot create_temp_data_group() because there is no index.");
}
index_->create_temp_data_group();
index_->create_temp_data_group(partial_write_array_dir);
}

/**
Expand Down Expand Up @@ -231,7 +231,8 @@ class IndexIVFPQ {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) {
size_t partition_start,
const std::string& partial_write_array_dir) {
if (feature_datatype_ != input_vectors.feature_type()) {
throw std::runtime_error(
"[ivf_pq_index@ingest_parts] Feature datatype mismatch: " +
Expand All @@ -243,7 +244,13 @@ class IndexIVFPQ {
"Cannot ingest_parts() because there is no index.");
}
index_->ingest_parts(
input_vectors, external_ids, deleted_ids, start, end, partition_start);
input_vectors,
external_ids,
deleted_ids,
start,
end,
partition_start,
partial_write_array_dir);
}

void ingest(
Expand All @@ -266,14 +273,20 @@ class IndexIVFPQ {
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) {
size_t batch,
const std::string& partial_write_array_dir) {
if (!index_) {
throw std::runtime_error(
"[ivf_pq_index@consolidate_partitions] Cannot "
"consolidate_partitions() because there is no index.");
}
index_->consolidate_partitions(
partitions, work_items, partition_id_start, partition_id_end, batch);
partitions,
work_items,
partition_id_start,
partition_id_end,
batch,
partial_write_array_dir);
}

[[nodiscard]] auto query(
Expand Down Expand Up @@ -413,7 +426,8 @@ class IndexIVFPQ {
struct index_base {
virtual ~index_base() = default;

virtual void create_temp_data_group() = 0;
virtual void create_temp_data_group(
const std::string& partial_write_array_dir) = 0;

virtual void train(
const FeatureVectorArray& training_set,
Expand All @@ -426,7 +440,8 @@ class IndexIVFPQ {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) = 0;
size_t partition_start,
const std::string& partial_write_array_dir) = 0;

virtual void ingest(
const FeatureVectorArray& input_vectors,
Expand All @@ -437,7 +452,8 @@ class IndexIVFPQ {
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) = 0;
size_t batch,
const std::string& partial_write_array_dir) = 0;

[[nodiscard]] virtual std::tuple<FeatureVectorArray, FeatureVectorArray>
query(
Expand Down Expand Up @@ -499,8 +515,9 @@ class IndexIVFPQ {
temporal_policy) {
}

void create_temp_data_group() override {
impl_index_.create_temp_data_group();
void create_temp_data_group(
const std::string& partial_write_array_dir) override {
impl_index_.create_temp_data_group(partial_write_array_dir);
}

void train(
Expand All @@ -521,7 +538,8 @@ class IndexIVFPQ {
const FeatureVector& deleted_ids,
size_t start,
size_t end,
size_t partition_start) override {
size_t partition_start,
const std::string& partial_write_array_dir) override {
using feature_type = typename T::feature_type;
using id_type = typename T::id_type;
auto fspan = MatrixView<feature_type, stdx::layout_left>{
Expand All @@ -534,7 +552,13 @@ class IndexIVFPQ {
auto ids = std::vector<id_type>(::num_vectors(input_vectors));
std::iota(ids.begin(), ids.end(), start);
impl_index_.ingest_parts(
fspan, ids, deleted_ids_span, start, end, partition_start);
fspan,
ids,
deleted_ids_span,
start,
end,
partition_start,
partial_write_array_dir);
} else {
auto external_ids_span = std::span<id_type>(
(id_type*)external_ids.data(), external_ids.dimensions());
Expand All @@ -544,7 +568,8 @@ class IndexIVFPQ {
deleted_ids_span,
start,
end,
partition_start);
partition_start,
partial_write_array_dir);
}
}

Expand Down Expand Up @@ -573,9 +598,15 @@ class IndexIVFPQ {
size_t work_items,
size_t partition_id_start,
size_t partition_id_end,
size_t batch) override {
size_t batch,
const std::string& partial_write_array_dir) override {
impl_index_.consolidate_partitions(
partitions, work_items, partition_id_start, partition_id_end, batch);
partitions,
work_items,
partition_id_start,
partition_id_end,
batch,
partial_write_array_dir);
}

/**
Expand Down
Loading
Loading