Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Functor APIs #210

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 151 additions & 1 deletion include/merlin/core_kernels.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <cstdint>
#include "allocator.cuh"
#include "core_kernels/accum_or_assign.cuh"
#include "core_kernels/contains.cuh"
Expand Down Expand Up @@ -774,6 +775,57 @@ __global__ void remove_kernel(const Table<K, V, S>* __restrict table,
}
}

template <typename K, typename V, typename S, typename PredFunctor,
uint32_t GroupSize = 32>
__global__ void remove_kernel_v2(const uint64_t search_length,
const uint64_t offset, PredFunctor pred,
Bucket<K, V, S>* buckets,
int* __restrict buckets_size,
const uint64_t bucket_capacity,
const uint64_t dim, uint64_t* remove_counter) {
cg::thread_block_tile<GroupSize> g =
cg::tiled_partition<GroupSize>(cg::this_thread_block());

uint64_t tid = static_cast<uint64_t>(blockIdx.x) * blockDim.x + threadIdx.x;

for (uint64_t i = tid; i < search_length; i += gridDim.x * blockDim.x) {
uint64_t bkt_idx = (i + offset) / bucket_capacity;
uint64_t key_idx = (i + offset) % bucket_capacity;

// May be different for threads within the same group.
Bucket<K, V, S>* bucket = buckets + bkt_idx;

const K key = bucket->keys(key_idx)->load(cuda::std::memory_order_relaxed);
const S score =
bucket->scores(key_idx)->load(cuda::std::memory_order_relaxed);
const V* value = bucket->vectors + key_idx * dim;

bool match = pred.template operator()<GroupSize>(key, value, score, g);
if (IS_RESERVED_KEY<K>(key)) {
match = false;
}
uint32_t vote = g.ballot(match);
int group_cnt = __popc(vote);
if (g.thread_rank() == 0) {
atomicAdd(remove_counter, static_cast<uint64_t>(group_cnt));
if (bucket_capacity >= GroupSize) {
atomicSub(&buckets_size[bkt_idx], group_cnt);
}
}
// Only matched threads need to erase.
if (match) {
bucket->digests(key_idx)[0] = empty_digest<K>();
bucket->keys(key_idx)->store(static_cast<K>(RECLAIM_KEY),
cuda::std::memory_order_relaxed);
bucket->scores(key_idx)->store(static_cast<S>(EMPTY_SCORE),
cuda::std::memory_order_relaxed);
if (bucket_capacity < GroupSize) {
atomicSub(&buckets_size[bkt_idx], 1);
}
}
}
}

/* Dump with score. */
template <class K, class V, class S>
inline std::tuple<size_t, size_t> dump_kernel_shared_memory_size(
Expand Down Expand Up @@ -923,7 +975,7 @@ __global__ void dump_kernel_v2(const Table<K, V, S>* __restrict table,
auto g = cg::tiled_partition<TILE_SIZE>(cg::this_thread_block());

PredFunctor<K, S> pred;
size_t tid = static_cast<size_t>(blockIdx.x * blockDim.x + threadIdx.x);
size_t tid = static_cast<size_t>(blockIdx.x) * blockDim.x + threadIdx.x;

for (size_t i = tid; i < search_length; i += gridDim.x * blockDim.x) {
size_t bkt_idx = (i + offset) / bucket_max_size;
Expand Down Expand Up @@ -972,6 +1024,77 @@ __global__ void dump_kernel_v2(const Table<K, V, S>* __restrict table,
}
}

template <typename K, typename V, typename S, typename PredFunctor,
uint32_t GroupSize = 32>
__global__ void dump_kernel(const uint64_t search_length, const uint64_t offset,
PredFunctor pred, Bucket<K, V, S>* buckets,
const uint64_t bucket_capacity, const uint64_t dim,
K* __restrict__ out_keys, V* __restrict__ out_vals,
S* __restrict__ out_scores,
uint64_t* dump_counter) {
cg::thread_block_tile<GroupSize> g =
cg::tiled_partition<GroupSize>(cg::this_thread_block());

uint64_t tid = static_cast<uint64_t>(blockIdx.x) * blockDim.x + threadIdx.x;

for (uint64_t i = tid; i < search_length; i += gridDim.x * blockDim.x) {
uint64_t bkt_idx = (i + offset) / bucket_capacity;
uint64_t key_idx = (i + offset) % bucket_capacity;

// May be different for threads within the same group.
Bucket<K, V, S>* bucket = buckets + bkt_idx;

const K key = bucket->keys(key_idx)->load(cuda::std::memory_order_relaxed);
const S score =
bucket->scores(key_idx)->load(cuda::std::memory_order_relaxed);
const V* value = bucket->vectors + key_idx * dim;

bool match = pred.template operator()<GroupSize>(key, value, score, g);
uint32_t vote = g.ballot(match);
int group_cnt = __popc(vote);
uint64_t group_offset = 0;
if (g.thread_rank() == 0) {
group_offset = atomicAdd(dump_counter, static_cast<uint64_t>(group_cnt));
}
group_offset = g.shfl(group_offset, 0);
// Each thread gets the count of previous matches ranks.
// Using `g.thread_rank()` instead of `key_idx % GroupSize` to handle case:
// bucket_capacity < GroupSize.
int previous_cnt = group_cnt - __popc(vote >> g.thread_rank());
// Only matched threads need to output.
if (match) {
out_keys[group_offset + previous_cnt] = key;
if (out_scores) {
out_scores[group_offset + previous_cnt] = score;
}
}

for (int r = 0; r < GroupSize; r++) {
uint32_t biased_vote = vote >> r;
bool cur_match = biased_vote & 1;
if (cur_match) {
int bias = group_cnt - __popc(biased_vote);

/// TODO:timing them
//----------------------- Solution 1
// uint64_t cur_bkt_idx = g.shfl(bkt_idx, r);
// uint64_t cur_key_idx = g.shfl(key_idx, r);
// auto cur_bucket = buckets + cur_bkt_idx;
//----------------------- Solution 2
uint64_t cur_idx = (i / GroupSize) * GroupSize + r + offset;
uint64_t cur_bkt_idx = cur_idx / bucket_capacity;
uint64_t cur_key_idx = cur_idx % bucket_capacity;
Bucket<K, V, S>* cur_bucket = buckets + cur_bkt_idx;

for (int j = g.thread_rank(); j < dim; j += GroupSize) {
out_vals[(group_offset + bias) * dim + j] =
cur_bucket->vectors[cur_key_idx * dim + j];
}
}
}
}
}

template <class K, class V, class S,
template <typename, typename> class PredFunctor>
__global__ void size_if_kernel(const Table<K, V, S>* __restrict table,
Expand Down Expand Up @@ -1012,5 +1135,32 @@ __global__ void size_if_kernel(const Table<K, V, S>* __restrict table,
}
}

template <typename K, typename V, typename S, typename ExecutionFunc,
uint32_t GroupSize = 32>
__global__ void traverse_kernel(const uint64_t search_length,
const uint64_t offset, ExecutionFunc f,
Bucket<K, V, S>* buckets,
const uint64_t bucket_capacity,
const uint64_t dim) {
cg::thread_block_tile<GroupSize> g =
cg::tiled_partition<GroupSize>(cg::this_thread_block());

uint64_t tid = static_cast<uint64_t>(blockIdx.x) * blockDim.x + threadIdx.x;

for (uint64_t i = tid; i < search_length; i += gridDim.x * blockDim.x) {
uint64_t bkt_idx = (i + offset) / bucket_capacity;
uint64_t key_idx = (i + offset) % bucket_capacity;

// May be different for threads within the same group.
Bucket<K, V, S>* bucket = buckets + bkt_idx;

const K key = bucket->keys(key_idx)->load(cuda::std::memory_order_relaxed);
S* score = reinterpret_cast<S*>(bucket->scores(key_idx));
V* value = bucket->vectors + key_idx * dim;

f.template operator()<GroupSize>(key, value, score, g);
}
}

} // namespace merlin
} // namespace nv
182 changes: 181 additions & 1 deletion include/merlin_hashtable.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -2466,6 +2466,61 @@ class HashTable : public HashTableBase<K, V, S> {
return count;
}

/**
* @brief Erase the key-value-score tuples which match @tparam PredFunctor.
* @param pred A functor with template <K, V, S> defined an operator with
* signature: __device__ (bool*)(const K&, const V*, const S&, const
* cg::thread_block_tile<GroupSize>&).
* @param stream The CUDA stream that is used to execute the operation.
*
* @return The number of elements removed.
*/

template <typename PredFunctor>
size_type erase_if_v2(PredFunctor& pred, cudaStream_t stream = 0) {
update_read_lock lock(mutex_, stream);

auto dev_ws{dev_mem_pool_->get_workspace<1>(sizeof(size_type), stream)};
auto d_count{dev_ws.get<size_type*>(0)};

CUDA_CHECK(cudaMemsetAsync(d_count, 0, sizeof(size_type), stream));

{
/// Search_length should be multiple of GroupSize for communication.
uint64_t dim = table_->dim;
uint64_t n = options_.max_capacity;
auto kernel = [&] {
if (dim >= 32 && n % 32 == 0) {
return remove_kernel_v2<key_type, value_type, score_type, PredFunctor,
32>;
} else if (dim >= 16 && n % 16 == 0) {
return remove_kernel_v2<key_type, value_type, score_type, PredFunctor,
16>;
} else if (dim >= 8 && n % 8 == 0) {
return remove_kernel_v2<key_type, value_type, score_type, PredFunctor,
8>;
}
return remove_kernel_v2<key_type, value_type, score_type, PredFunctor,
1>;
}();
uint64_t block_size = 128UL;
uint64_t grid_size =
std::min(sm_cnt_ * max_threads_per_block_ / block_size,
SAFE_GET_GRID_SIZE(n, block_size));
kernel<<<grid_size, block_size, 0, stream>>>(
n, 0, pred, table_->buckets, table_->buckets_size,
table_->bucket_max_size, table_->dim, d_count);
}

size_type count = 0;
CUDA_CHECK(cudaMemcpyAsync(&count, d_count, sizeof(size_type),
cudaMemcpyDeviceToHost, stream));
CUDA_CHECK(cudaStreamSynchronize(stream));

CudaCheckError();
return count;
}

/**
* @brief Removes all of the elements in the hash table with no release
* object.
Expand Down Expand Up @@ -2581,7 +2636,7 @@ class HashTable : public HashTableBase<K, V, S> {
* type.
* @param threshold The fourth user-defined argument to @p pred with
* score_type type.
* @param offset The position of the key to remove.
* @param offset The position of the key to search.
* @param keys The keys to dump from GPU-accessible memory with shape (n).
* @param values The values to dump from GPU-accessible memory with shape
* (n, DIM).
Expand Down Expand Up @@ -2680,6 +2735,131 @@ class HashTable : public HashTableBase<K, V, S> {
CudaCheckError();
}

/**
* @brief Exports a certain number of key-value-score tuples that match a
* given predicate.
*
* @tparam PredFunctor A functor type with a template signature `<K, V, S>`.
* It should define an operator with the signature:
* `__device__ bool operator()(const K&, const V*, const S&,
* cg::thread_block_tile<GroupSize>&)`.
*
* @param pred A functor of type `PredFunctor` that defines the predicate for
* filtering tuples.
* @param n The maximum number of exported pairs.
* @param offset The position of the key to search.
* @param d_counter The number of elements dumped which is on device.
* @param keys The keys to dump from GPU-accessible memory with shape (n).
* @param values The values to dump from GPU-accessible memory with shape (n,
* DIM).
* @param scores The scores to search on GPU-accessible memory with shape (n).
* @parblock
* If @p scores is `nullptr`, the score for each key will not be returned.
* @endparblock
*
* @param stream The CUDA stream that is used to execute the operation.
*
* @return void
*
*/

template <typename PredFunctor>
void export_batch_if_v2(PredFunctor& pred, size_type n,
const size_type offset, size_type* d_counter,
key_type* keys, // (n)
value_type* values, // (n, DIM)
score_type* scores = nullptr, // (n)
cudaStream_t stream = 0) const {
read_shared_lock lock(mutex_, stream);
CUDA_CHECK(cudaMemsetAsync(d_counter, 0, sizeof(size_type), stream));

if (offset >= table_->capacity) {
return;
}
n = std::min(table_->capacity - offset, n);
if (n == 0) {
return;
}

/// Search_length should be multiple of GroupSize for communication.
uint64_t dim = table_->dim;
auto kernel = [&] {
if (dim >= 32 && n % 32 == 0) {
return dump_kernel<key_type, value_type, score_type, PredFunctor, 32>;
} else if (dim >= 16 && n % 16 == 0) {
return dump_kernel<key_type, value_type, score_type, PredFunctor, 16>;
} else if (dim >= 8 && n % 8 == 0) {
return dump_kernel<key_type, value_type, score_type, PredFunctor, 8>;
}
return dump_kernel<key_type, value_type, score_type, PredFunctor, 1>;
}();
uint64_t block_size = 128UL;
uint64_t grid_size = std::min(sm_cnt_ * max_threads_per_block_ / block_size,
SAFE_GET_GRID_SIZE(n, block_size));
kernel<<<grid_size, block_size, 0, stream>>>(
n, offset, pred, table_->buckets, table_->bucket_max_size, dim, keys,
values, scores, d_counter);

CudaCheckError();
}

/**
* @brief Applies the given function to items in the range [first, last) in
* the table.
*
* @tparam ExecutionFunc A functor type with a template signature `<K, V, S>`.
* It should define an operator with the signature:
* `__device__ void operator()(const K&, V*, S*,
* cg::thread_block_tile<GroupSize>&)`.
*
* @param first The first element to which the function object will be
* applied.
* @param last The last element(excluding) to which the function object will
* be applied.
* @param f A functor of type `ExecutionFunc` that defines the predicate for
* filtering tuples. signature: __device__ (bool*)(const K&, const V*, const
* S&, const cg::tiled_partition<GroupSize>&).
* @param stream The CUDA stream that is used to execute the operation.
*
* @return void
*
*/

template <typename ExecutionFunc>
void for_each(const size_type first, const size_type last, ExecutionFunc& f,
cudaStream_t stream = 0) {
update_read_lock lock(mutex_, stream);

if (first >= table_->capacity or last > table_->capacity or first >= last) {
return;
}
uint64_t n = last - first;

/// Search_length should be multiple of GroupSize for communication.
uint64_t dim = table_->dim;
auto kernel = [&] {
if (dim >= 32 && n % 32 == 0) {
return traverse_kernel<key_type, value_type, score_type, ExecutionFunc,
32>;
} else if (dim >= 16 && n % 16 == 0) {
return traverse_kernel<key_type, value_type, score_type, ExecutionFunc,
16>;
} else if (dim >= 8 && n % 8 == 0) {
return traverse_kernel<key_type, value_type, score_type, ExecutionFunc,
8>;
}
return traverse_kernel<key_type, value_type, score_type, ExecutionFunc,
1>;
}();
uint64_t block_size = 128UL;
uint64_t grid_size = std::min(sm_cnt_ * max_threads_per_block_ / block_size,
SAFE_GET_GRID_SIZE(n, block_size));
kernel<<<grid_size, block_size, 0, stream>>>(n, first, f, table_->buckets,
table_->bucket_max_size, dim);

CudaCheckError();
}

public:
/**
* @brief Indicates if the hash table has no elements.
Expand Down
Loading