Skip to content

Commit

Permalink
Expose stream parameter in public reduction APIs (#15737)
Browse files Browse the repository at this point in the history
Add stream parameter to public reduction APIs:

- `reduce()`
- `segmented_reduce()`
- `scan()`
- `minmax()`

Reference #13744

Authors:
  - Srinivas Yadav (https://github.com/srinivasyadav18)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #15737
  • Loading branch information
srinivasyadav18 authored May 16, 2024
1 parent 516d0f9 commit ec07927
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 22 deletions.
12 changes: 12 additions & 0 deletions cpp/include/cudf/reduction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ enum class scan_type : bool { INCLUSIVE, EXCLUSIVE };
* @param col Input column view
* @param agg Aggregation operator applied by the reduction
* @param output_dtype The output scalar type
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output scalar with reduce result
*/
std::unique_ptr<scalar> reduce(
column_view const& col,
reduce_aggregation const& agg,
data_type output_dtype,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -96,6 +98,7 @@ std::unique_ptr<scalar> reduce(
* @param agg Aggregation operator applied by the reduction
* @param output_dtype The output scalar type
* @param init The initial value of the reduction
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output scalar with reduce result
*/
Expand All @@ -104,6 +107,7 @@ std::unique_ptr<scalar> reduce(
reduce_aggregation const& agg,
data_type output_dtype,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -145,6 +149,7 @@ std::unique_ptr<scalar> reduce(
* @param null_handling If `INCLUDE`, the reduction is valid if all elements in a segment are valid,
* otherwise null. If `EXCLUDE`, the reduction is valid if any element in the segment is valid,
* otherwise null.
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output column with results of segmented reduction
*/
Expand All @@ -154,6 +159,7 @@ std::unique_ptr<column> segmented_reduce(
segmented_reduce_aggregation const& agg,
data_type output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -169,6 +175,7 @@ std::unique_ptr<column> segmented_reduce(
* otherwise null. If `EXCLUDE`, the reduction is valid if any element in the segment is valid,
* otherwise null.
* @param init The initial value of the reduction
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned scalar's device memory
* @returns Output column with results of segmented reduction.
*/
Expand All @@ -179,6 +186,7 @@ std::unique_ptr<column> segmented_reduce(
data_type output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -195,6 +203,7 @@ std::unique_ptr<column> segmented_reduce(
* exclusive scan if scan_type::EXCLUSIVE.
* @param[in] null_handling Exclude null values when computing the result if null_policy::EXCLUDE.
* Include nulls if null_policy::INCLUDE. Any operation with a null results in a null.
* @param[in] stream CUDA stream used for device memory operations and kernel launches
* @param[in] mr Device memory resource used to allocate the returned scalar's device memory
* @returns Scanned output column
*/
Expand All @@ -203,19 +212,22 @@ std::unique_ptr<column> scan(
scan_aggregation const& agg,
scan_type inclusive,
null_policy null_handling = null_policy::EXCLUDE,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/**
* @brief Determines the minimum and maximum values of a column.
*
*
* @param col column to compute minmax
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return A std::pair of scalars with the first scalar being the minimum value and the second
* scalar being the maximum value of the input column.
*/
std::pair<std::unique_ptr<scalar>, std::unique_ptr<scalar>> minmax(
column_view const& col,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/reductions/minmax.cu
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,10 @@ std::pair<std::unique_ptr<scalar>, std::unique_ptr<scalar>> minmax(
} // namespace detail

std::pair<std::unique_ptr<scalar>, std::unique_ptr<scalar>> minmax(
column_view const& col, rmm::device_async_resource_ref mr)
column_view const& col, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::minmax(col, cudf::get_default_stream(), mr);
return detail::minmax(col, stream, mr);
}

} // namespace cudf
7 changes: 4 additions & 3 deletions cpp/src/reductions/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,21 @@ std::unique_ptr<scalar> reduce(column_view const& col,
std::unique_ptr<scalar> reduce(column_view const& col,
reduce_aggregation const& agg,
data_type output_dtype,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::reduce(
col, agg, output_dtype, std::nullopt, cudf::get_default_stream(), mr);
return reduction::detail::reduce(col, agg, output_dtype, std::nullopt, stream, mr);
}

std::unique_ptr<scalar> reduce(column_view const& col,
reduce_aggregation const& agg,
data_type output_dtype,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::reduce(col, agg, output_dtype, init, cudf::get_default_stream(), mr);
return reduction::detail::reduce(col, agg, output_dtype, init, stream, mr);
}
} // namespace cudf
3 changes: 2 additions & 1 deletion cpp/src/reductions/scan/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ std::unique_ptr<column> scan(column_view const& input,
scan_aggregation const& agg,
scan_type inclusive,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return detail::scan(input, agg, inclusive, null_handling, cudf::get_default_stream(), mr);
return detail::scan(input, agg, inclusive, null_handling, stream, mr);
}

} // namespace cudf
22 changes: 6 additions & 16 deletions cpp/src/reductions/segmented/reductions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,12 @@ std::unique_ptr<column> segmented_reduce(column_view const& segmented_values,
segmented_reduce_aggregation const& agg,
data_type output_dtype,
null_policy null_handling,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::segmented_reduce(segmented_values,
offsets,
agg,
output_dtype,
null_handling,
std::nullopt,
cudf::get_default_stream(),
mr);
return reduction::detail::segmented_reduce(
segmented_values, offsets, agg, output_dtype, null_handling, std::nullopt, stream, mr);
}

std::unique_ptr<column> segmented_reduce(column_view const& segmented_values,
Expand All @@ -157,17 +152,12 @@ std::unique_ptr<column> segmented_reduce(column_view const& segmented_values,
data_type output_dtype,
null_policy null_handling,
std::optional<std::reference_wrapper<scalar const>> init,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
CUDF_FUNC_RANGE();
return reduction::detail::segmented_reduce(segmented_values,
offsets,
agg,
output_dtype,
null_handling,
init,
cudf::get_default_stream(),
mr);
return reduction::detail::segmented_reduce(
segmented_values, offsets, agg, output_dtype, null_handling, init, stream, mr);
}

} // namespace cudf
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,7 @@ ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_REDUCTION_TEST streams/reduction_test.cpp STREAM_MODE testing)
ConfigureTest(
STREAM_STRINGS_TEST
streams/strings/case_test.cpp
Expand Down
102 changes: 102 additions & 0 deletions cpp/tests/streams/reduction_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/type_lists.hpp>

#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/reduction.hpp>
#include <cudf/scalar/scalar.hpp>
#include <cudf/scalar/scalar_factories.hpp>

class ReductionTest : public cudf::test::BaseFixture {};

TEST_F(ReductionTest, ReductionSum)
{
cudf::test::fixed_width_column_wrapper<int> input({1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
cudf::reduce(input,
*cudf::make_sum_aggregation<cudf::reduce_aggregation>(),
cudf::data_type(cudf::type_id::INT32),
cudf::test::get_default_stream());
}

TEST_F(ReductionTest, ReductionSumScalarInit)
{
cudf::test::fixed_width_column_wrapper<int> input({1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
auto const init_scalar = cudf::make_fixed_width_scalar<int>(3, cudf::test::get_default_stream());
cudf::reduce(input,
*cudf::make_sum_aggregation<cudf::reduce_aggregation>(),
cudf::data_type(cudf::type_id::INT32),
*init_scalar,
cudf::test::get_default_stream());
}

TEST_F(ReductionTest, SegmentedReductionSum)
{
auto const input = cudf::test::fixed_width_column_wrapper<int>{{1, 2, 3, 1, 0, 3, 1, 0, 0, 0},
{1, 1, 1, 1, 0, 1, 1, 0, 0, 0}};
auto const offsets = std::vector<cudf::size_type>{0, 3, 6, 7, 8, 10, 10};
auto const d_offsets = cudf::detail::make_device_uvector_async(
offsets, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource());

auto res =
cudf::segmented_reduce(input,
d_offsets,
*cudf::make_sum_aggregation<cudf::segmented_reduce_aggregation>(),
cudf::data_type(cudf::type_id::INT32),
cudf::null_policy::EXCLUDE,
cudf::test::get_default_stream());
}

TEST_F(ReductionTest, SegmentedReductionSumScalarInit)
{
auto const input = cudf::test::fixed_width_column_wrapper<int>{{1, 2, 3, 1, 0, 3, 1, 0, 0, 0},
{1, 1, 1, 1, 0, 1, 1, 0, 0, 0}};
auto const offsets = std::vector<cudf::size_type>{0, 3, 6, 7, 8, 10, 10};
auto const d_offsets = cudf::detail::make_device_uvector_async(
offsets, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource());
auto const init_scalar = cudf::make_fixed_width_scalar<int>(3, cudf::test::get_default_stream());
auto res =
cudf::segmented_reduce(input,
d_offsets,
*cudf::make_sum_aggregation<cudf::segmented_reduce_aggregation>(),
cudf::data_type(cudf::type_id::INT32),
cudf::null_policy::EXCLUDE,
*init_scalar,
cudf::test::get_default_stream());
}

TEST_F(ReductionTest, ScanMin)
{
auto const input = cudf::test::fixed_width_column_wrapper<int>{
{123, 64, 63, 99, -5, 123, -16, -120, -111}, {1, 0, 1, 1, 1, 1, 0, 0, 1}};

cudf::scan(input,
*cudf::make_min_aggregation<cudf::scan_aggregation>(),
cudf::scan_type::INCLUSIVE,
cudf::null_policy::EXCLUDE,
cudf::test::get_default_stream());
}

TEST_F(ReductionTest, MinMax)
{
auto const input = cudf::test::fixed_width_column_wrapper<int>{
{123, 64, 63, 99, -5, 123, -16, -120, -111}, {1, 0, 1, 1, 1, 1, 0, 0, 1}};

cudf::minmax(input, cudf::test::get_default_stream());
}

0 comments on commit ec07927

Please sign in to comment.