Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] Adding recipe for custom compute functions #227

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
237 changes: 237 additions & 0 deletions cpp/code/compute_fn.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// ------------------------------
// Dependencies

// standard dependencies
#include <stdint.h>
#include <string>
#include <iostream>

// arrow dependencies
#include <arrow/api.h>
#include <arrow/compute/api.h>
#include <arrow/util/hashing.h>

#include "common.h"


// >> aliases for types in standard library
using std::shared_ptr;
using std::vector;

// >> commonly used arrow types
// |> general programming support
using arrow::Result;
using arrow::Status;
using arrow::Datum;

// |> arrow data types and helpers
using arrow::Int64Builder;
using arrow::Array;
using arrow::ArraySpan;


// >> aliases for types used to define a custom function (e.g. `NamedScalarFn`)
// |> kernel parameters
using arrow::compute::KernelContext;
using arrow::compute::ExecSpan;
using arrow::compute::ExecResult;

// |> for defining compute functions and their compute kernels
using arrow::compute::FunctionDoc;
using arrow::compute::InputType;
using arrow::compute::OutputType;
using arrow::compute::Arity;
using arrow::compute::ScalarFunction;

// |> for adding to a function registry or using `CallFunction`
using arrow::compute::FunctionRegistry;
using arrow::compute::ExecContext;


// ------------------------------
// Structs and Classes

// >> Documentation for a compute function
/**
* Create a const instance of `FunctionDoc` that contains 3 attributes:
* 1. Short description
* 2. Long description (can be multiple lines, each limited to 78 characters in width)
* 3. Name of input arguments
*/
const FunctionDoc named_scalar_fn_doc {
"Unary function that calculates a hash for each element of the input"
,("This function uses the xxHash algorithm.\n"
"The result contains a 64-bit hash value for each input element.")
,{ "input_array" }
};


// >> Kernel implementations for a compute function
// StartRecipe("DefineAComputeKernel");
/**
* Create implementations that will be associated with our compute function. When a
* compute function is invoked, the compute API framework will delegate execution to an
* associated kernel that matches: (1) input argument types/shapes and (2) output argument
* types/shapes.
*
* Kernel implementations may be functions or may be methods (functions within a class or
* struct).
*/
struct NamedScalarFn {

/**
* A kernel implementation that expects a single array as input, and outputs an array of
* int64 values. We write this implementation knowing what function we want to
* associate it with ("NamedScalarFn"), but that association is made later (see
* `RegisterScalarFnKernels()` below).
*/
static Status
Exec(KernelContext *ctx, const ExecSpan &input_arg, ExecResult *out) {

// Validate inputs
if (input_arg.num_values() != 1 or !input_arg[0].is_array()) {
return Status::Invalid("Unsupported argument types or shape");
}

// The input ArraySpan manages data as 3 buffers; the data buffer has index `1`
constexpr int bufndx_data = 1;
const int64_t *hash_inputs = input_arg[0].array.GetValues<int64_t>(bufndx_data);
const auto input_len = input_arg[0].array.length;

// Allocate an Arrow buffer for output
ARROW_ASSIGN_OR_RAISE(std::unique_ptr<Buffer> hash_buffer,
AllocateBuffer(input_len * sizeof(int64_t)));

// Call hashing function, using both prime multipliers from xxHash
int64_t *hash_results = reinterpret_cast<int64_t*>(hash_buffer->mutable_data());
for (int val_ndx = 0; val_ndx < input_len; ++val_ndx) {
hash_results[val_ndx] = (
ScalarHelper<int64_t, 0>::ComputeHash(hash_inputs[val_ndx])
+ ScalarHelper<int64_t, 1>::ComputeHash(hash_inputs[val_ndx])
);
}

// Use ArrayData (not ArraySpan) for ownership of result buffer
out->value = ArrayData{int64(), input_len, {nullptr, std::move(hash_buffer)}};
return Status::OK();
}
};
// EndRecipe("DefineAComputeKernel");


// ------------------------------
// Functions


// >> Function registration and kernel association
// StartRecipe("AddKernelToFunction");
/**
* A convenience function that shows how we construct an instance of `ScalarFunction` that
* will be registered in a function registry. The instance is constructed with: (1) a
* unique name ("named_scalar_fn"), (2) an "arity" (`Arity::Unary()`), and (3) an instance
* of `FunctionDoc`.
*
* The function name is used to invoke it from a function registry after it has been
* registered. The "arity" is the cardinality of the function's parameters--1 parameter is
* a unary function, 2 parameters is a binary function, etc. Finally, it is helpful to
* associate the function with documentation, which uses the `FunctionDoc` struct.
*/
shared_ptr<ScalarFunction>
RegisterScalarFnKernels() {
// Instantiate a function to be registered
auto fn_named_scalar = std::make_shared<ScalarFunction>(
"named_scalar_fn"
,Arity::Unary()
,std::move(named_scalar_fn_doc)
);

// Associate a function and kernel using `ScalarFunction::AddKernel()`
DCHECK_OK(
fn_named_scalar->AddKernel(
{ InputType(arrow::int64()) }
,OutputType(arrow::int64())
,NamedScalarFn::Exec
)
);

return fn_named_scalar;
}
// EndRecipe("AddKernelToFunction");


// StartRecipe("AddFunctionToRegistry");
/**
* A convenience function that shows how we register a custom function with a
* `FunctionRegistry`. To keep this simple and general, this function takes a pointer to a
* FunctionRegistry as an input argument, then invokes `FunctionRegistry::AddFunction()`.
*/
void
RegisterNamedScalarFn(FunctionRegistry *registry) {
// scalar_fn has type: shared_ptr<ScalarFunction>
auto scalar_fn = RegisterScalarFnKernels();
DCHECK_OK(registry->AddFunction(std::move(scalar_fn)));
}
// EndRecipe("AddFunctionToRegistry");


// >> Convenience functions
// StartRecipe("InvokeByCallFunction");
/**
* An optional, convenient invocation function to easily call our compute function. This
* executes our compute function by invoking `CallFunction` with the name that we used to
* register the function ("named_scalar_fn" in this case).
*/
ARROW_EXPORT
Result<Datum>
NamedScalarFn(const Datum &input_arg, ExecContext *ctx) {
auto func_name = "named_scalar_fn";
auto result_datum = CallFunction(func_name, { input_arg }, ctx);

return result_datum;
}
// EndRecipe("InvokeByCallFunction");


Result<shared_ptr<Array>>
BuildIntArray() {
vector<int64_t> col_vals { 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 };

Int64Builder builder;
ARROW_RETURN_NOT_OK(builder.Reserve(col_vals.size()));
ARROW_RETURN_NOT_OK(builder.AppendValues(col_vals));
return builder.Finish();
}


class ComputeFunctionTest : public ::testing::Test {};

TEST(ComputeFunctionTest, TestRegisterAndCallFunction) {
// >> Register the function first
auto fn_registry = arrow::compute::GetFunctionRegistry();
RegisterNamedScalarFn(fn_registry);

// >> Then we can call the function
StartRecipe("InvokeByConvenienceFunction");
auto build_result = BuildIntArray();
if (not build_result.ok()) {
std::cerr << build_result.status().message() << std::endl;
return 1;
}

Datum col_data { *build_result };
auto fn_result = NamedScalarFn(col_data);
if (not fn_result.ok()) {
std::cerr << fn_result.status().message() << std::endl;
return 2;
}

auto result_data = fn_result->make_array();
std::cout << "Success:" << std::endl;
std::cout << "\t" << result_data->ToString() << std::endl;
EndRecipe("InvokeByConvenienceFunction");

// If we want to peek at the input data
std::cout << col_data.make_array()->ToString() << std::endl;

return 0;
}
153 changes: 153 additions & 0 deletions cpp/source/compute.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.

====================================
Defining and Using Compute Functions
====================================

Arrow contains a "Compute API," which primarily consists of a "registry" of functions that
can be invoked. Currently, Arrow populates a default registry with a variety of
functions, which we call "compute functions". This section contains (or will contain) a
number of recipes illustrating how to define compute functions or how to use existing
ones.


.. contents::

Invoke a Compute Function
=========================

When invoking a compute function, the function must exist in a function registry. Here, we
use :func:`arrow::compute::CallFunction` to invoke the function with name
"named_scalar_fn". :func:`arrow::compute::CallFunction` uses the function registry
referenced from the :class:`ExecContext` argument. If an :class:`ExecContext` is not
specified, the default :class:`ExecContext` is used (which references a default
:class:`FunctionRegistry`).

.. literalinclude:: ../code/compute_fn.cc
:language: cpp
:lines: 179-191
:caption: Use CallFunction() to invoke a compute function by name
:dedent: 2

Sometimes, a convenience function (such as :func:`arrow::compute::Add` or
:func:`arrow::compute::Subtract`) is defined. These functions are usually implemented as
wrappers around :func:`arrow::compute::CallFunction`.

.. recipe:: ../code/compute_fn.cc InvokeByConvenienceFunction
:caption: Use a convenience invocation function to call a compute function
:dedent: 2


Adding a Custom Compute Function
================================

To make a custom compute function available, there are 3 primary steps:

1. Define kernels for the function (these implement the actual logic)

2. Associate the kernels with a function object

3. Add the function object to a function registry


Define Function Kernels
-----------------------

The signature of an execution kernel is relatively standardized: it returns a
:class:`arrow::Status` and takes a context, some arguments, and a pointer to an output
result. The context wraps an :class:`arrow::compute::ExecContext` and other metadata about
the environment in which the kernel function should be executed. The input arguments are
contained within an :class:`arrow::compute::ExecSpan` (newly added in place of
:class:`arrow::compute::ExecBatch`), which holds non-owning references to argument data.
Finally, the :class:`arrow::compute::ExecResult` pointed to should be set to an
appropriate :class:`arrow::ArraySpan` or :class:`arrow::ArrayData` instance, depending on
ownership semantics of the kernel's output.

.. literalinclude:: ../code/compute_fn.cc
:language: cpp
:lines: 71-118
:caption: Define an example compute kernel that uses ScalarHelper from hashing.h to
hash input values
:dedent: 2

This recipe shows basic validation of `input_arg` which contains a vector of input
arguments. Then, the input :class:`arrow::Array` is accessed from `input_arg` and a
:class:`arrow::Buffer` is allocated to hold output results. After the main loop is
completed, the allocated :class:`arrow::Buffer` is wrapped in an :class:`arrow::ArrayData`
instance and referenced by `out`.


Associate Kernels with a Function
---------------------------------

Kernels are added to a compute function in 2 steps: (1) create an appropriate function
object--:class:`arrow::compute::ScalarFunction` in this case--and (2) call the
:func:`arrow::compute::ScalarFunction::AddKernel` function. The AddKernel function is
repeated for each desired input data type.

.. literalinclude:: ../code/compute_fn.cc
:language: cpp
:lines: 128-158
:caption: Instantiate a ScalarFunction and add our execution kernel to it
:dedent: 2

A :class:`arrow::compute::ScalarFunction` represents a "scalar" or "element-wise" compute
function (see documentation on the Compute API). The signature used in this recipe passes:

1. A function name (to be used when calling it)

2. An "Arity" meaning how many input arguments it takes (like cardinality)

3. A :class:`arrow::compute::FunctionDoc` instance (to associate some documentation
programmatically)

Then, :func:`arrow::compute::ScalarFunction::AddKernel` expects:

1. A vector of data types for each input argument

2. An output data type for the result

3. The function to be used as the execution kernel

4. The function to be used as the initialization kernel (optional)

Note that the constructor for :class:`arrow::compute::ScalarFunction` is more interested
in how many arguments to expect, and some information about the compute function itself;
whereas, the function to add a kernel specifies data types and the functions to call at
runtime.


Add Function to Registry
------------------------

Finally, adding the function to a registry is wonderfully straightforward.

.. literalinclude:: ../code/compute_fn.cc
:language: cpp
:lines: 163-173
:caption: Use convenience function to get a ScalarFunction with associated kernels,
then add it to the given FunctionRegistry
:dedent: 2

In this recipe, we simply wrap the logic in a convenience function that: (1) creates a
:class:`arrow::compute::ScalarFunction`, (2) adds our execution kernel to the compute
function, and (3) returns the compute function. Then, we add the compute function to some
registry. This recipe takes the :class:`arrow::compute::FunctionRegistry` as an argument
so that it is easy to call from the same place that the Arrow codebase registers other
provided functions. Otherwise, we can add our compute function to the default registry,
or a custom registry.