Skip to content

Commit

Permalink
feat(functions): Add support for REST based remote functions
Browse files Browse the repository at this point in the history
Co-authored-by: Wills Feng <[email protected]>
  • Loading branch information
Joe-Abraham and wills-feng committed Feb 12, 2025
1 parent cf43fd0 commit 07f290e
Show file tree
Hide file tree
Showing 17 changed files with 1,130 additions and 110 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,9 @@ if(VELOX_ENABLE_REMOTE_FUNCTIONS)
find_package(fizz CONFIG REQUIRED)
find_package(wangle CONFIG REQUIRED)
find_package(FBThrift CONFIG REQUIRED)
set(cpr_SOURCE BUNDLED)
velox_resolve_dependency(cpr)
FetchContent_MakeAvailable(cpr)
endif()

if(VELOX_ENABLE_GCS)
Expand Down
58 changes: 58 additions & 0 deletions velox/docs/develop/rest-based-remote-functions.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
============================================
Using REST-based Remote Functions in Velox
============================================

Overview
--------
This document provides an overview of how to register and use REST-based remote functions
in Velox. REST-based remote functions are external functions that Velox can call via an HTTP
endpoint, where function execution is offloaded to a remote service.

Registration
------------
Before you can call the remote function in a query, you need to register it with Velox.
Here is an example of how to create the necessary metadata, build the function signature,
and register the remote function:

.. code-block:: c++

void registerRemoteFunction(
const std::string& name,
std::vector<exec::FunctionSignaturePtr> signatures,
const RemoteVectorFunctionMetadata& metadata = {},
bool overwrite = true);

.. note::

- ``metadata.serdeFormat`` must be set to the PRESTO_PAGE format i.e, ``PageFormat::PRESTO_PAGE``.
- ``metadata.location`` is the REST endpoint to which Velox will send the function invocation
requests.

Query Execution
---------------
Once the remote function is registered, it can be used in SQL queries or other Velox-based
systems. During query execution:

1. Velox packages the function input arguments into a request payload using the
PrestoVectorSerde format.
2. This request is sent to the REST endpoint specified in ``metadata.location``.
3. The remote service executes the function and returns a response payload, also serialized
in the PrestoVectorSerde format.
4. Velox then deserializes the result payload and proceeds with further query processing.

Serialization Details
---------------------
The request and response payloads are transferred using a ``folly::IOBuf`` under the hood.
Because the format is ``PageFormat::PRESTO_PAGE``, the serialization and deserialization
are done by the PrestoVectorSerde implementation. This means that the remote function server
must be able to understand the Presto page format and return the results in the same format.

Summary
-------
Using REST-based remote functions in Velox involves two major steps:

1. **Registration**: Provide the remote server's endpoint and other metadata via
``registerRemoteFunction()`` so that Velox knows how to connect and what format to use.
2. **Execution**: During query execution, Velox serializes function inputs, sends them
to the remote server, and deserializes the results.

6 changes: 6 additions & 0 deletions velox/functions/remote/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ velox_add_library(velox_functions_remote_thrift_client ThriftClient.cpp)
velox_link_libraries(velox_functions_remote_thrift_client
PUBLIC remote_function_thrift FBThrift::thriftcpp2)

velox_add_library(velox_functions_remote_rest_client RestClient.cpp)
velox_link_libraries(velox_functions_remote_rest_client Folly::folly cpr::cpr)

velox_add_library(velox_functions_remote Remote.cpp)
velox_link_libraries(
velox_functions_remote
PUBLIC velox_expression
velox_exec
velox_presto_serializer
velox_functions_remote_thrift_client
velox_functions_remote_rest_client
velox_functions_remote_get_serde
velox_type_fbhive
Folly::folly)
Expand Down
78 changes: 67 additions & 11 deletions velox/functions/remote/client/Remote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@

#include "velox/functions/remote/client/Remote.h"

#include <fmt/format.h>
#include <folly/io/async/EventBase.h>
#include <sstream>
#include <string>

#include "velox/common/memory/ByteStream.h"
#include "velox/expression/Expr.h"
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/client/RestClient.h"
#include "velox/functions/remote/client/ThriftClient.h"
#include "velox/functions/remote/if/GetSerde.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunctionServiceAsyncClient.h"
#include "velox/serializers/PrestoSerializer.h"
#include "velox/type/fbhive/HiveTypeSerializer.h"
#include "velox/vector/VectorStream.h"

namespace facebook::velox::functions {
namespace {

std::string serializeType(const TypePtr& type) {
// Use hive type serializer.
return type::fbhive::HiveTypeSerializer::serialize(type);
}

Expand All @@ -40,10 +46,11 @@ class RemoteFunction : public exec::VectorFunction {
const std::vector<exec::VectorFunctionArg>& inputArgs,
const RemoteVectorFunctionMetadata& metadata)
: functionName_(functionName),
location_(metadata.location),
thriftClient_(getThriftClient(location_, &eventBase_)),
metadata_(metadata),
serdeFormat_(metadata.serdeFormat),
serde_(getSerde(serdeFormat_)) {
boost::apply_visitor(*this, metadata_.location);

std::vector<TypePtr> types;
types.reserve(inputArgs.size());
serializedInputTypes_.reserve(inputArgs.size());
Expand All @@ -55,14 +62,26 @@ class RemoteFunction : public exec::VectorFunction {
remoteInputType_ = ROW(std::move(types));
}

void operator()(const folly::SocketAddress& address) {
thriftClient_ = getThriftClient(address, &eventBase_);
}

void operator()(const std::string& url) {
restClient_ = getRestClient();
}

void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const override {
try {
applyRemote(rows, args, outputType, context, result);
if ((metadata_.location.type() == typeid(folly::SocketAddress))) {
applyThriftRemote(rows, args, outputType, context, result);
} else if (metadata_.location.type() == typeid(std::string)) {
applyRestRemote(rows, args, outputType, context, result);
}
} catch (const VeloxRuntimeError&) {
throw;
} catch (const std::exception&) {
Expand All @@ -71,7 +90,41 @@ class RemoteFunction : public exec::VectorFunction {
}

private:
void applyRemote(
void applyRestRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const {
try {
serializer::presto::PrestoVectorSerde serde;
auto remoteRowVector = std::make_shared<RowVector>(
context.pool(),
remoteInputType_,
BufferPtr{},
rows.end(),
std::move(args));

std::unique_ptr<folly::IOBuf> requestBody =
std::make_unique<folly::IOBuf>(rowVectorToIOBuf(
remoteRowVector, rows.end(), *context.pool(), &serde));

std::unique_ptr<folly::IOBuf> responseBody = restClient_->invokeFunction(
boost::get<std::string>(metadata_.location), std::move(requestBody));

auto outputRowVector = IOBufToRowVector(
*responseBody, ROW({outputType}), *context.pool(), &serde);

result = outputRowVector->childAt(0);
} catch (const std::exception& e) {
VELOX_FAIL(
"Error while executing remote function '{}': {}",
functionName_,
e.what());
}
}

void applyThriftRemote(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
Expand Down Expand Up @@ -109,7 +162,7 @@ class RemoteFunction : public exec::VectorFunction {
VELOX_FAIL(
"Error while executing remote function '{}' at '{}': {}",
functionName_,
location_.describe(),
boost::get<folly::SocketAddress>(metadata_.location).describe(),
e.what());
}

Expand Down Expand Up @@ -142,10 +195,13 @@ class RemoteFunction : public exec::VectorFunction {
}

const std::string functionName_;
folly::SocketAddress location_;

const RemoteVectorFunctionMetadata metadata_;
folly::EventBase eventBase_;
std::unique_ptr<RemoteFunctionClient> thriftClient_;

// Depending on the location, one of these is initialized by the visitor.
std::unique_ptr<RemoteFunctionClient> thriftClient_{nullptr};
std::unique_ptr<HttpClient> restClient_{nullptr};

remote::PageFormat serdeFormat_;
std::unique_ptr<VectorSerde> serde_;

Expand All @@ -159,7 +215,7 @@ std::shared_ptr<exec::VectorFunction> createRemoteFunction(
const std::vector<exec::VectorFunctionArg>& inputArgs,
const core::QueryConfig& /*config*/,
const RemoteVectorFunctionMetadata& metadata) {
return std::make_unique<RemoteFunction>(name, inputArgs, metadata);
return std::make_shared<RemoteFunction>(name, inputArgs, metadata);
}

} // namespace
Expand All @@ -169,7 +225,7 @@ void registerRemoteFunction(
std::vector<exec::FunctionSignaturePtr> signatures,
const RemoteVectorFunctionMetadata& metadata,
bool overwrite) {
exec::registerStatefulVectorFunction(
registerStatefulVectorFunction(
name,
signatures,
std::bind(
Expand Down
25 changes: 21 additions & 4 deletions velox/functions/remote/client/Remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,37 @@

#pragma once

#include <boost/variant.hpp>
#include <folly/SocketAddress.h>
#include "velox/expression/VectorFunction.h"
#include "velox/functions/remote/if/gen-cpp2/RemoteFunction_types.h"

namespace facebook::velox::functions {

struct RemoteVectorFunctionMetadata : public exec::VectorFunctionMetadata {
/// Network address of the servr to communicate with. Note that this can hold
/// a network location (ip/port pair) or a unix domain socket path (see
/// URL of the HTTP/REST server for remote function.
/// Or Network address of the server to communicate with. Note that this can
/// hold a network location (ip/port pair) or a unix domain socket path (see
/// SocketAddress::makeFromPath()).
folly::SocketAddress location;
boost::variant<folly::SocketAddress, std::string> location;

/// The serialization format to be used
/// The serialization format to be used when sending data to the remote.
remote::PageFormat serdeFormat{remote::PageFormat::PRESTO_PAGE};

/// Optional schema defining the structure of the data or input/output types
/// involved in the remote function. This may include details such as column
/// names and data types.
std::optional<std::string> schema;

/// Optional identifier for the specific remote function to be invoked.
/// This can be useful when the same server hosts multiple functions,
/// and the client needs to specify which function to call.
std::optional<std::string> functionId;

/// Optional version information to be used when calling the remote function.
/// This can help in ensuring compatibility with a particular version of the
/// function if multiple versions are available on the server.
std::optional<std::string> version;
};

/// Registers a new remote function. It will use the meatadata defined in
Expand Down
61 changes: 61 additions & 0 deletions velox/functions/remote/client/RestClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/functions/remote/client/RestClient.h"

#include <cpr/cpr.h>
#include <folly/io/IOBufQueue.h>

#include "velox/common/base/Exceptions.h"

using namespace folly;
namespace facebook::velox::functions {

std::unique_ptr<IOBuf> RestClient::invokeFunction(
const std::string& fullUrl,
std::unique_ptr<IOBuf> requestPayload) {
IOBufQueue inputBufQueue(IOBufQueue::cacheChainLength());
inputBufQueue.append(std::move(requestPayload));

std::string requestBody;
for (auto range : *inputBufQueue.front()) {
requestBody.append(
reinterpret_cast<const char*>(range.data()), range.size());
}

cpr::Response response = cpr::Post(
cpr::Url{fullUrl},
cpr::Header{
{"Content-Type", "application/X-presto-pages"},
{"Accept", "application/X-presto-pages"}},
cpr::Body{requestBody});

if (response.error) {
VELOX_FAIL(fmt::format(
"Error communicating with server: {} URL: {}",
response.error.message,
fullUrl));
}

auto outputBuf = IOBuf::copyBuffer(response.text);
return outputBuf;
}

std::unique_ptr<HttpClient> getRestClient() {
return std::make_unique<RestClient>();
}

} // namespace facebook::velox::functions
Loading

0 comments on commit 07f290e

Please sign in to comment.