Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37093: [Python] Add async Flight client with GetFlightInfo #36986

Merged
merged 7 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 69 additions & 0 deletions python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,12 @@ cdef class FlightInfo(_Weakrefable):
cdef:
unique_ptr[CFlightInfo] info

@staticmethod
cdef wrap(CFlightInfo c_info):
cdef FlightInfo obj = FlightInfo.__new__(FlightInfo)
obj.info.reset(new CFlightInfo(move(c_info)))
return obj

def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints,
total_records, total_bytes):
"""Create a FlightInfo object from a schema, descriptor, and endpoints.
Expand Down Expand Up @@ -1219,6 +1225,66 @@ cdef class FlightMetadataWriter(_Weakrefable):
check_flight_status(self.writer.get().WriteMetadata(deref(buf)))


class AsyncioCall:
"""State for an async RPC using asyncio."""

def __init__(self) -> None:
import asyncio
self._future = asyncio.get_running_loop().create_future()

def as_awaitable(self) -> object:
return self._future

def wakeup(self, result_or_exception) -> None:
# Mark the Future done from within its loop (asyncio
# objects are generally not thread-safe)
loop = self._future.get_loop()
if isinstance(result_or_exception, BaseException):
loop.call_soon_threadsafe(
self._future.set_exception, result_or_exception)
else:
loop.call_soon_threadsafe(
self._future.set_result, result_or_exception)


cdef class AsyncioFlightClient:
"""
A FlightClient with an asyncio-based async interface.

This interface is EXPERIMENTAL.
"""

cdef:
FlightClient _client

def __init__(self, FlightClient client) -> None:
self._client = client

async def get_flight_info(
self,
descriptor: FlightDescriptor,
*,
options: FlightCallOptions = None,
):
call = AsyncioCall()
self._get_flight_info(call, descriptor, options)
return await call.as_awaitable()
Comment on lines +1263 to +1271
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be:

Suggested change
async def get_flight_info(
self,
descriptor: FlightDescriptor,
*,
options: FlightCallOptions = None,
):
call = AsyncioCall()
self._get_flight_info(call, descriptor, options)
return await call.as_awaitable()
def get_flight_info(
self,
descriptor: FlightDescriptor,
*,
options: FlightCallOptions = None,
):
call = AsyncioCall()
self._get_flight_info(call, descriptor, options)
return call.as_awaitable()

but I've kept the explicit async def for introspection and self-documentation... what do you think @jorisvandenbossche ?


cdef _get_flight_info(self, call, descriptor, options):
cdef:
CFlightCallOptions* c_options = \
FlightCallOptions.unwrap(options)
CFlightDescriptor c_descriptor = \
FlightDescriptor.unwrap(descriptor)
CFuture[CFlightInfo] c_future

with nogil:
c_future = self._client.client.get().GetFlightInfoAsync(
deref(c_options), c_descriptor)

BindFuture(move(c_future), call.wakeup, FlightInfo.wrap)


cdef class FlightClient(_Weakrefable):
"""A client to a Flight service.

Expand Down Expand Up @@ -1320,6 +1386,9 @@ cdef class FlightClient(_Weakrefable):
check_flight_status(CFlightClient.Connect(c_location, c_options
).Value(&self.client))

def as_async(self) -> None:
return AsyncioFlightClient(self)

def wait_for_available(self, timeout=5):
"""Block until the server can be contacted.

Expand Down
113 changes: 63 additions & 50 deletions python/pyarrow/error.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class ArrowCancelled(ArrowException):
ArrowIOError = IOError


# This function could be written directly in C++ if we didn't
# define Arrow-specific subclasses (ArrowInvalid etc.)
# check_status() and convert_status() could be written directly in C++
# if we didn't define Arrow-specific subclasses (ArrowInvalid etc.)
cdef int check_status(const CStatus& status) except -1 nogil:
if status.ok():
return 0
Expand All @@ -88,61 +88,74 @@ cdef int check_status(const CStatus& status) except -1 nogil:
RestorePyError(status)
return -1

# We don't use Status::ToString() as it would redundantly include
# the C++ class name.
message = frombytes(status.message(), safe=True)
detail = status.detail()
if detail != nullptr:
message += ". Detail: " + frombytes(detail.get().ToString(),
safe=True)

if status.IsInvalid():
raise ArrowInvalid(message)
elif status.IsIOError():
# Note: OSError constructor is
# OSError(message)
# or
# OSError(errno, message, filename=None)
# or (on Windows)
# OSError(errno, message, filename, winerror)
errno = ErrnoFromStatus(status)
winerror = WinErrorFromStatus(status)
if winerror != 0:
raise IOError(errno, message, None, winerror)
elif errno != 0:
raise IOError(errno, message)
else:
raise IOError(message)
elif status.IsOutOfMemory():
raise ArrowMemoryError(message)
elif status.IsKeyError():
raise ArrowKeyError(message)
elif status.IsNotImplemented():
raise ArrowNotImplementedError(message)
elif status.IsTypeError():
raise ArrowTypeError(message)
elif status.IsCapacityError():
raise ArrowCapacityError(message)
elif status.IsIndexError():
raise ArrowIndexError(message)
elif status.IsSerializationError():
raise ArrowSerializationError(message)
elif status.IsCancelled():
signum = SignalFromStatus(status)
if signum > 0:
raise ArrowCancelled(message, signum)
else:
raise ArrowCancelled(message)
raise convert_status(status)


cdef object convert_status(const CStatus& status):
if IsPyError(status):
try:
RestorePyError(status)
except BaseException as e:
return e

# We don't use Status::ToString() as it would redundantly include
# the C++ class name.
message = frombytes(status.message(), safe=True)
detail = status.detail()
if detail != nullptr:
message += ". Detail: " + frombytes(detail.get().ToString(),
safe=True)

if status.IsInvalid():
return ArrowInvalid(message)
elif status.IsIOError():
# Note: OSError constructor is
# OSError(message)
# or
# OSError(errno, message, filename=None)
# or (on Windows)
# OSError(errno, message, filename, winerror)
errno = ErrnoFromStatus(status)
winerror = WinErrorFromStatus(status)
if winerror != 0:
return IOError(errno, message, None, winerror)
elif errno != 0:
return IOError(errno, message)
else:
return IOError(message)
elif status.IsOutOfMemory():
return ArrowMemoryError(message)
elif status.IsKeyError():
return ArrowKeyError(message)
elif status.IsNotImplemented():
return ArrowNotImplementedError(message)
elif status.IsTypeError():
return ArrowTypeError(message)
elif status.IsCapacityError():
return ArrowCapacityError(message)
elif status.IsIndexError():
return ArrowIndexError(message)
elif status.IsSerializationError():
return ArrowSerializationError(message)
elif status.IsCancelled():
signum = SignalFromStatus(status)
if signum > 0:
return ArrowCancelled(message, signum)
else:
message = frombytes(status.ToString(), safe=True)
raise ArrowException(message)
return ArrowCancelled(message)
else:
message = frombytes(status.ToString(), safe=True)
return ArrowException(message)


# This is an API function for C++ PyArrow
# These are API functions for C++ PyArrow
cdef api int pyarrow_internal_check_status(const CStatus& status) \
except -1 nogil:
return check_status(status)

cdef api object pyarrow_internal_convert_status(const CStatus& status):
return convert_status(status)


cdef class StopToken:
cdef void init(self, CStopToken stop_token):
Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ cdef extern from "arrow/result.h" namespace "arrow" nogil:
T operator*()


cdef extern from "arrow/util/future.h" namespace "arrow" nogil:
cdef cppclass CFuture "arrow::Future"[T]:
CFuture()


cdef extern from "arrow/python/async.h" namespace "arrow::py" nogil:
# BindFuture's third argument is really a C++ callable with
# the signature `object(T*)`, but Cython does not allow declaring that.
# We use an ellipsis as a workaround.
# Another possibility is to type-erase the argument by making it
# `object(void*)`, but it would lose compile-time C++ type safety.
void BindFuture[T](CFuture[T], object cb, ...)


cdef extern from "arrow/python/common.h" namespace "arrow::py" nogil:
T GetResultValue[T](CResult[T]) except *
cdef function[F] BindFunction[F](void* unbound, object bound, ...)
Expand Down
2 changes: 2 additions & 0 deletions python/pyarrow/includes/libarrow_flight.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,8 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
CResult[unique_ptr[CFlightListing]] ListFlights(CFlightCallOptions& options, CCriteria criteria)
CResult[unique_ptr[CFlightInfo]] GetFlightInfo(CFlightCallOptions& options,
CFlightDescriptor& descriptor)
CFuture[CFlightInfo] GetFlightInfoAsync(CFlightCallOptions& options,
CFlightDescriptor& descriptor)
CResult[unique_ptr[CSchemaResult]] GetSchema(CFlightCallOptions& options,
CFlightDescriptor& descriptor)
CResult[unique_ptr[CFlightStreamReader]] DoGet(CFlightCallOptions& options, CTicket& ticket)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_python.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ cdef extern from "arrow/python/pyarrow.h" namespace "arrow::py":

cdef extern from "arrow/python/common.h" namespace "arrow::py":
c_bool IsPyError(const CStatus& status)
void RestorePyError(const CStatus& status)
void RestorePyError(const CStatus& status) except *


cdef extern from "arrow/python/inference.h" namespace "arrow::py":
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ cdef extern from "Python.h":


cdef int check_status(const CStatus& status) except -1 nogil
cdef object convert_status(const CStatus& status)


cdef class _Weakrefable:
Expand Down
60 changes: 60 additions & 0 deletions python/pyarrow/src/arrow/python/async.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <utility>

#include "arrow/python/common.h"
#include "arrow/status.h"
#include "arrow/util/future.h"

namespace arrow::py {

/// \brief Bind a Python callback to an arrow::Future.
///
/// If the Future finishes successfully, py_wrapper is called with its
/// result value and should return a PyObject*. If py_wrapper is successful,
/// py_cb is called with its return value.
///
/// If either the Future or py_wrapper fails, py_cb is called with the
/// associated Python exception.
///
/// \param future The future to bind to.
/// \param py_cb The Python callback function. Will be passed the result of
/// py_wrapper, or a Python exception if the future failed or one was
/// raised by py_wrapper.
/// \param py_wrapper A function (likely defined in Cython) to convert the C++
/// result of the future to a Python object.
template <typename T, typename PyWrapper = PyObject* (*)(T)>
void BindFuture(Future<T> future, PyObject* py_cb, PyWrapper py_wrapper) {
Py_INCREF(py_cb);
OwnedRefNoGIL cb_ref(py_cb);

auto future_cb = [cb_ref = std::move(cb_ref),
py_wrapper = std::move(py_wrapper)](Result<T> result) {
SafeCallIntoPythonVoid([&]() {
OwnedRef py_value_or_exc{WrapResult(std::move(result), std::move(py_wrapper))};
Py_XDECREF(
PyObject_CallFunctionObjArgs(cb_ref.obj(), py_value_or_exc.obj(), NULLPTR));
ARROW_WARN_NOT_OK(CheckPyError(), "Internal error in async call");
});
};
future.AddCallback(std::move(future_cb));
}

} // namespace arrow::py
44 changes: 44 additions & 0 deletions python/pyarrow/src/arrow/python/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,37 @@ T GetResultValue(Result<T> result) {
}
}

/// \brief Wrap a Result and return the corresponding Python object.
///
/// If the Result is successful, py_wrapper is called with its result value
/// and should return a PyObject*. If py_wrapper is successful (returns
/// a non-NULL value), its return value is returned.
///
/// If either the Result or py_wrapper fails, the associated Python exception
/// is raised and NULL is returned.
//
/// \param result The Result whose value to wrap in a Python object.
/// \param py_wrapper A function (likely defined in Cython) to convert the C++
/// value of the Result to a Python object.
/// \return A new Python reference, or NULL if an exception occurred
template <typename T, typename PyWrapper = PyObject* (*)(T)>
PyObject* WrapResult(Result<T> result, PyWrapper&& py_wrapper) {
static_assert(std::is_same_v<PyObject*, decltype(py_wrapper(std::declval<T>()))>,
"PyWrapper argument to WrapResult should return a PyObject* "
"when called with a T*");
Status st = result.status();
if (st.ok()) {
PyObject* py_value = py_wrapper(result.MoveValueUnsafe());
st = CheckPyError();
if (st.ok()) {
return py_value;
}
Py_XDECREF(py_value); // should be null, but who knows
}
// Status is an error, convert it to an exception.
return internal::convert_status(st);
}

// A RAII-style helper that ensures the GIL is acquired inside a lexical block.
class ARROW_PYTHON_EXPORT PyAcquireGIL {
public:
Expand Down Expand Up @@ -131,6 +162,19 @@ auto SafeCallIntoPython(Function&& func) -> decltype(func()) {
return maybe_status;
}

template <typename Function>
auto SafeCallIntoPythonVoid(Function&& func) -> decltype(func()) {
PyAcquireGIL lock;
PyObject* exc_type;
PyObject* exc_value;
PyObject* exc_traceback;
PyErr_Fetch(&exc_type, &exc_value, &exc_traceback);
func();
if (exc_type != NULLPTR) {
PyErr_Restore(exc_type, exc_value, exc_traceback);
}
}

// A RAII primitive that DECREFs the underlying PyObject* when it
// goes out of scope.
class ARROW_PYTHON_EXPORT OwnedRef {
Expand Down
Loading