Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for concurrency bug #70

Merged
merged 4 commits into from
Jul 16, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions cpp/TypedIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class TypedIndex : public Index {
bool normalize = false;
bool useOrderPreservingTransform = false;
int numThreadsDefault;
hnswlib::labeltype currentLabel;
std::atomic<hnswlib::labeltype> currentLabel;
std::unique_ptr<hnswlib::HierarchicalNSW<dist_t, data_t>> algorithmImpl;
std::unique_ptr<hnswlib::Space<dist_t, data_t>> spaceImpl;
std::unique_ptr<voyager::Metadata::V1> metadata;
Expand Down Expand Up @@ -360,7 +360,7 @@ class TypedIndex : public Index {

int start = 0;
if (!ep_added) {
size_t id = ids.size() ? ids.at(0) : (currentLabel);
size_t id = ids.size() ? ids.at(0) : (currentLabel.fetch_add(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is .fetch_add(1) actually what we want throughout this change? Wouldn't that result in us incrementing currentLabel multiple times where we actually only wanted to increment it once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, I'm reverting this

Copy link
Contributor Author

@miclegr miclegr Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually not, my original implementation is correct. Basically here we assign an id, then here:

start = 1;

we set start = 1 so that when we loop via ParallelFor we start from 1:

ParallelFor(start, rows, numThreads, [&](size_t row, size_t threadId) {

therefore we need currentLabel already incremented

// TODO(psobot): Should inputVector be on the stack instead?
std::vector<float> inputVector(actualDimensions);
std::vector<data_t> convertedVector(actualDimensions);
Expand Down Expand Up @@ -402,7 +402,7 @@ class TypedIndex : public Index {
floatToDataType<data_t, scalefactor>(&inputArray[startIndex],
&convertedArray[startIndex],
actualDimensions);
size_t id = ids.size() ? ids.at(row) : (currentLabel + row);
size_t id = ids.size() ? ids.at(row) : (currentLabel.fetch_add(1));
try {
algorithmImpl->addPoint(convertedArray.data() + startIndex, id);
} catch (IndexFullError &e) {
Expand Down Expand Up @@ -438,7 +438,7 @@ class TypedIndex : public Index {
normalizeVector<dist_t, data_t, scalefactor>(
&inputArray[startIndex], &normalizedArray[startIndex],
actualDimensions);
size_t id = ids.size() ? ids.at(row) : (currentLabel + row);
size_t id = ids.size() ? ids.at(row) : (currentLabel.fetch_add(1));

try {
algorithmImpl->addPoint(normalizedArray.data() + startIndex, id);
Expand All @@ -460,8 +460,6 @@ class TypedIndex : public Index {
});
};

currentLabel += rows;

return idsToReturn;
}

Expand Down
Loading