diff --git a/cpp/code/compute_fn.cc b/cpp/code/compute_fn.cc new file mode 100644 index 00000000..cd5c32a3 --- /dev/null +++ b/cpp/code/compute_fn.cc @@ -0,0 +1,237 @@ +// ------------------------------ +// Dependencies + +// standard dependencies +#include +#include +#include + +// arrow dependencies +#include +#include +#include + +#include "common.h" + + +// >> aliases for types in standard library +using std::shared_ptr; +using std::vector; + +// >> commonly used arrow types +// |> general programming support +using arrow::Result; +using arrow::Status; +using arrow::Datum; + +// |> arrow data types and helpers +using arrow::Int64Builder; +using arrow::Array; +using arrow::ArraySpan; + + +// >> aliases for types used to define a custom function (e.g. `NamedScalarFn`) +// |> kernel parameters +using arrow::compute::KernelContext; +using arrow::compute::ExecSpan; +using arrow::compute::ExecResult; + +// |> for defining compute functions and their compute kernels +using arrow::compute::FunctionDoc; +using arrow::compute::InputType; +using arrow::compute::OutputType; +using arrow::compute::Arity; +using arrow::compute::ScalarFunction; + +// |> for adding to a function registry or using `CallFunction` +using arrow::compute::FunctionRegistry; +using arrow::compute::ExecContext; + + +// ------------------------------ +// Structs and Classes + +// >> Documentation for a compute function +/** + * Create a const instance of `FunctionDoc` that contains 3 attributes: + * 1. Short description + * 2. Long description (can be multiple lines, each limited to 78 characters in width) + * 3. Name of input arguments + */ +const FunctionDoc named_scalar_fn_doc { + "Unary function that calculates a hash for each element of the input" + ,("This function uses the xxHash algorithm.\n" + "The result contains a 64-bit hash value for each input element.") + ,{ "input_array" } +}; + + +// >> Kernel implementations for a compute function +// StartRecipe("DefineAComputeKernel"); +/** + * Create implementations that will be associated with our compute function. When a + * compute function is invoked, the compute API framework will delegate execution to an + * associated kernel that matches: (1) input argument types/shapes and (2) output argument + * types/shapes. + * + * Kernel implementations may be functions or may be methods (functions within a class or + * struct). + */ +struct NamedScalarFn { + + /** + * A kernel implementation that expects a single array as input, and outputs an array of + * int64 values. We write this implementation knowing what function we want to + * associate it with ("NamedScalarFn"), but that association is made later (see + * `RegisterScalarFnKernels()` below). + */ + static Status + Exec(KernelContext *ctx, const ExecSpan &input_arg, ExecResult *out) { + + // Validate inputs + if (input_arg.num_values() != 1 or !input_arg[0].is_array()) { + return Status::Invalid("Unsupported argument types or shape"); + } + + // The input ArraySpan manages data as 3 buffers; the data buffer has index `1` + constexpr int bufndx_data = 1; + const int64_t *hash_inputs = input_arg[0].array.GetValues(bufndx_data); + const auto input_len = input_arg[0].array.length; + + // Allocate an Arrow buffer for output + ARROW_ASSIGN_OR_RAISE(std::unique_ptr hash_buffer, + AllocateBuffer(input_len * sizeof(int64_t))); + + // Call hashing function, using both prime multipliers from xxHash + int64_t *hash_results = reinterpret_cast(hash_buffer->mutable_data()); + for (int val_ndx = 0; val_ndx < input_len; ++val_ndx) { + hash_results[val_ndx] = ( + ScalarHelper::ComputeHash(hash_inputs[val_ndx]) + + ScalarHelper::ComputeHash(hash_inputs[val_ndx]) + ); + } + + // Use ArrayData (not ArraySpan) for ownership of result buffer + out->value = ArrayData{int64(), input_len, {nullptr, std::move(hash_buffer)}}; + return Status::OK(); + } +}; +// EndRecipe("DefineAComputeKernel"); + + +// ------------------------------ +// Functions + + +// >> Function registration and kernel association +// StartRecipe("AddKernelToFunction"); +/** + * A convenience function that shows how we construct an instance of `ScalarFunction` that + * will be registered in a function registry. The instance is constructed with: (1) a + * unique name ("named_scalar_fn"), (2) an "arity" (`Arity::Unary()`), and (3) an instance + * of `FunctionDoc`. + * + * The function name is used to invoke it from a function registry after it has been + * registered. The "arity" is the cardinality of the function's parameters--1 parameter is + * a unary function, 2 parameters is a binary function, etc. Finally, it is helpful to + * associate the function with documentation, which uses the `FunctionDoc` struct. + */ +shared_ptr +RegisterScalarFnKernels() { + // Instantiate a function to be registered + auto fn_named_scalar = std::make_shared( + "named_scalar_fn" + ,Arity::Unary() + ,std::move(named_scalar_fn_doc) + ); + + // Associate a function and kernel using `ScalarFunction::AddKernel()` + DCHECK_OK( + fn_named_scalar->AddKernel( + { InputType(arrow::int64()) } + ,OutputType(arrow::int64()) + ,NamedScalarFn::Exec + ) + ); + + return fn_named_scalar; +} +// EndRecipe("AddKernelToFunction"); + + +// StartRecipe("AddFunctionToRegistry"); +/** + * A convenience function that shows how we register a custom function with a + * `FunctionRegistry`. To keep this simple and general, this function takes a pointer to a + * FunctionRegistry as an input argument, then invokes `FunctionRegistry::AddFunction()`. + */ +void +RegisterNamedScalarFn(FunctionRegistry *registry) { + // scalar_fn has type: shared_ptr + auto scalar_fn = RegisterScalarFnKernels(); + DCHECK_OK(registry->AddFunction(std::move(scalar_fn))); +} +// EndRecipe("AddFunctionToRegistry"); + + +// >> Convenience functions +// StartRecipe("InvokeByCallFunction"); +/** + * An optional, convenient invocation function to easily call our compute function. This + * executes our compute function by invoking `CallFunction` with the name that we used to + * register the function ("named_scalar_fn" in this case). + */ +ARROW_EXPORT +Result +NamedScalarFn(const Datum &input_arg, ExecContext *ctx) { + auto func_name = "named_scalar_fn"; + auto result_datum = CallFunction(func_name, { input_arg }, ctx); + + return result_datum; +} +// EndRecipe("InvokeByCallFunction"); + + +Result> +BuildIntArray() { + vector col_vals { 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 }; + + Int64Builder builder; + ARROW_RETURN_NOT_OK(builder.Reserve(col_vals.size())); + ARROW_RETURN_NOT_OK(builder.AppendValues(col_vals)); + return builder.Finish(); +} + + +class ComputeFunctionTest : public ::testing::Test {}; + +TEST(ComputeFunctionTest, TestRegisterAndCallFunction) { + // >> Register the function first + auto fn_registry = arrow::compute::GetFunctionRegistry(); + RegisterNamedScalarFn(fn_registry); + + // >> Then we can call the function + StartRecipe("InvokeByConvenienceFunction"); + auto build_result = BuildIntArray(); + if (not build_result.ok()) { + std::cerr << build_result.status().message() << std::endl; + return 1; + } + + Datum col_data { *build_result }; + auto fn_result = NamedScalarFn(col_data); + if (not fn_result.ok()) { + std::cerr << fn_result.status().message() << std::endl; + return 2; + } + + auto result_data = fn_result->make_array(); + std::cout << "Success:" << std::endl; + std::cout << "\t" << result_data->ToString() << std::endl; + EndRecipe("InvokeByConvenienceFunction"); + + // If we want to peek at the input data + std::cout << col_data.make_array()->ToString() << std::endl; + + return 0; +} diff --git a/cpp/source/compute.rst b/cpp/source/compute.rst new file mode 100644 index 00000000..6822d060 --- /dev/null +++ b/cpp/source/compute.rst @@ -0,0 +1,153 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +==================================== +Defining and Using Compute Functions +==================================== + +Arrow contains a "Compute API," which primarily consists of a "registry" of functions that +can be invoked. Currently, Arrow populates a default registry with a variety of +functions, which we call "compute functions". This section contains (or will contain) a +number of recipes illustrating how to define compute functions or how to use existing +ones. + + +.. contents:: + +Invoke a Compute Function +========================= + +When invoking a compute function, the function must exist in a function registry. Here, we +use :func:`arrow::compute::CallFunction` to invoke the function with name +"named_scalar_fn". :func:`arrow::compute::CallFunction` uses the function registry +referenced from the :class:`ExecContext` argument. If an :class:`ExecContext` is not +specified, the default :class:`ExecContext` is used (which references a default +:class:`FunctionRegistry`). + +.. literalinclude:: ../code/compute_fn.cc + :language: cpp + :lines: 179-191 + :caption: Use CallFunction() to invoke a compute function by name + :dedent: 2 + +Sometimes, a convenience function (such as :func:`arrow::compute::Add` or +:func:`arrow::compute::Subtract`) is defined. These functions are usually implemented as +wrappers around :func:`arrow::compute::CallFunction`. + +.. recipe:: ../code/compute_fn.cc InvokeByConvenienceFunction + :caption: Use a convenience invocation function to call a compute function + :dedent: 2 + + +Adding a Custom Compute Function +================================ + +To make a custom compute function available, there are 3 primary steps: + +1. Define kernels for the function (these implement the actual logic) + +2. Associate the kernels with a function object + +3. Add the function object to a function registry + + +Define Function Kernels +----------------------- + +The signature of an execution kernel is relatively standardized: it returns a +:class:`arrow::Status` and takes a context, some arguments, and a pointer to an output +result. The context wraps an :class:`arrow::compute::ExecContext` and other metadata about +the environment in which the kernel function should be executed. The input arguments are +contained within an :class:`arrow::compute::ExecSpan` (newly added in place of +:class:`arrow::compute::ExecBatch`), which holds non-owning references to argument data. +Finally, the :class:`arrow::compute::ExecResult` pointed to should be set to an +appropriate :class:`arrow::ArraySpan` or :class:`arrow::ArrayData` instance, depending on +ownership semantics of the kernel's output. + +.. literalinclude:: ../code/compute_fn.cc + :language: cpp + :lines: 71-118 + :caption: Define an example compute kernel that uses ScalarHelper from hashing.h to + hash input values + :dedent: 2 + +This recipe shows basic validation of `input_arg` which contains a vector of input +arguments. Then, the input :class:`arrow::Array` is accessed from `input_arg` and a +:class:`arrow::Buffer` is allocated to hold output results. After the main loop is +completed, the allocated :class:`arrow::Buffer` is wrapped in an :class:`arrow::ArrayData` +instance and referenced by `out`. + + +Associate Kernels with a Function +--------------------------------- + +Kernels are added to a compute function in 2 steps: (1) create an appropriate function +object--:class:`arrow::compute::ScalarFunction` in this case--and (2) call the +:func:`arrow::compute::ScalarFunction::AddKernel` function. The AddKernel function is +repeated for each desired input data type. + +.. literalinclude:: ../code/compute_fn.cc + :language: cpp + :lines: 128-158 + :caption: Instantiate a ScalarFunction and add our execution kernel to it + :dedent: 2 + +A :class:`arrow::compute::ScalarFunction` represents a "scalar" or "element-wise" compute +function (see documentation on the Compute API). The signature used in this recipe passes: + +1. A function name (to be used when calling it) + +2. An "Arity" meaning how many input arguments it takes (like cardinality) + +3. A :class:`arrow::compute::FunctionDoc` instance (to associate some documentation + programmatically) + +Then, :func:`arrow::compute::ScalarFunction::AddKernel` expects: + +1. A vector of data types for each input argument + +2. An output data type for the result + +3. The function to be used as the execution kernel + +4. The function to be used as the initialization kernel (optional) + +Note that the constructor for :class:`arrow::compute::ScalarFunction` is more interested +in how many arguments to expect, and some information about the compute function itself; +whereas, the function to add a kernel specifies data types and the functions to call at +runtime. + + +Add Function to Registry +------------------------ + +Finally, adding the function to a registry is wonderfully straightforward. + +.. literalinclude:: ../code/compute_fn.cc + :language: cpp + :lines: 163-173 + :caption: Use convenience function to get a ScalarFunction with associated kernels, + then add it to the given FunctionRegistry + :dedent: 2 + +In this recipe, we simply wrap the logic in a convenience function that: (1) creates a +:class:`arrow::compute::ScalarFunction`, (2) adds our execution kernel to the compute +function, and (3) returns the compute function. Then, we add the compute function to some +registry. This recipe takes the :class:`arrow::compute::FunctionRegistry` as an argument +so that it is easy to call from the same place that the Arrow codebase registers other +provided functions. Otherwise, we can add our compute function to the default registry, +or a custom registry.