Skip to content

Commit

Permalink
Expose stream-ordering in partitioning API (#17213)
Browse files Browse the repository at this point in the history
Add stream parameter to public APIs:
```
cudf::partition
cudf::round_robin_partition
```
Added stream gtest for above two functions and for `hash_partition`.

Reference: #13744

Authors:
  - Shruti Shivakumar (https://github.com/shrshi)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #17213
  • Loading branch information
shrshi authored Oct 31, 2024
1 parent f99ef41 commit f7020f1
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 3 deletions.
4 changes: 4 additions & 0 deletions cpp/include/cudf/partitioning.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ enum class hash_id {
* @param partition_map Non-nullable column of integer values that map each row
* in `t` to it's partition.
* @param num_partitions The total number of partitions
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned table's device memory
* @return Pair containing the reordered table and vector of `num_partitions +
* 1` offsets to each partition such that the size of partition `i` is
Expand All @@ -79,6 +80,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> partition(
table_view const& t,
column_view const& partition_map,
size_type num_partitions,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/**
Expand Down Expand Up @@ -242,6 +244,7 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> hash_partition(
* @param[in] input The input table to be round-robin partitioned
* @param[in] num_partitions Number of partitions for the table
* @param[in] start_partition Index of the 1st partition
* @param[in] stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate the returned table's device memory
*
* @return A std::pair consisting of a unique_ptr to the partitioned table
Expand All @@ -251,6 +254,7 @@ std::pair<std::unique_ptr<cudf::table>, std::vector<cudf::size_type>> round_robi
table_view const& input,
cudf::size_type num_partitions,
cudf::size_type start_partition = 0,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref());

/** @} */ // end of group
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/partitioning/partitioning.cu
Original file line number Diff line number Diff line change
Expand Up @@ -834,10 +834,11 @@ std::pair<std::unique_ptr<table>, std::vector<size_type>> partition(
table_view const& t,
column_view const& partition_map,
size_type num_partitions,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::partition(t, partition_map, num_partitions, cudf::get_default_stream(), mr);
return detail::partition(t, partition_map, num_partitions, stream, mr);
}

} // namespace cudf
4 changes: 2 additions & 2 deletions cpp/src/partitioning/round_robin.cu
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ std::pair<std::unique_ptr<cudf::table>, std::vector<cudf::size_type>> round_robi
table_view const& input,
cudf::size_type num_partitions,
cudf::size_type start_partition,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::round_robin_partition(
input, num_partitions, start_partition, cudf::get_default_stream(), mr);
return detail::round_robin_partition(input, num_partitions, start_partition, stream, mr);
}

} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ ConfigureTest(STREAM_MULTIBYTE_SPLIT_TEST streams/io/multibyte_split_test.cpp ST
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_ORCIO_TEST streams/io/orc_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_PARTITIONING_TEST streams/partitioning_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
Expand Down
73 changes: 73 additions & 0 deletions cpp/tests/streams/partitioning_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/copying.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/partitioning.hpp>
#include <cudf/sorting.hpp>
#include <cudf/table/table.hpp>

using cudf::test::fixed_width_column_wrapper;
using cudf::test::strings_column_wrapper;

class PartitionTest : public cudf::test::BaseFixture {};

TEST_F(PartitionTest, Struct)
{
fixed_width_column_wrapper<numeric::decimal32, int32_t> A({1, 2}, {0, 1});
auto struct_col = cudf::test::structs_column_wrapper({A}, {0, 1}).release();
auto table_to_partition = cudf::table_view{{*struct_col}};
fixed_width_column_wrapper<int32_t> map{9, 2};

auto num_partitions = 12;
auto result =
cudf::partition(table_to_partition, map, num_partitions, cudf::test::get_default_stream());
}

TEST_F(PartitionTest, EmptyInput)
{
auto const empty_column = fixed_width_column_wrapper<int32_t>{};
auto const num_partitions = 5;
auto const start_partition = 0;
auto const [out_table, out_offsets] =
cudf::round_robin_partition(cudf::table_view{{empty_column}},
num_partitions,
start_partition,
cudf::test::get_default_stream());
}

TEST_F(PartitionTest, ZeroPartitions)
{
fixed_width_column_wrapper<float> floats({1.f, 2.f, 3.f, 4.f, 5.f, 6.f, 7.f, 8.f});
fixed_width_column_wrapper<int16_t> integers({1, 2, 3, 4, 5, 6, 7, 8});
strings_column_wrapper strings({"a", "bb", "ccc", "d", "ee", "fff", "gg", "h"});
auto input = cudf::table_view({floats, integers, strings});

auto columns_to_hash = std::vector<cudf::size_type>({2});

cudf::size_type const num_partitions = 0;
auto [output, offsets] = cudf::hash_partition(input,
columns_to_hash,
num_partitions,
cudf::hash_id::HASH_MURMUR3,
cudf::DEFAULT_HASH_SEED,
cudf::test::get_default_stream());
}

0 comments on commit f7020f1

Please sign in to comment.