Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37093: [Python] Add async Flight client with GetFlightInfo #36986

Merged
merged 7 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,9 @@ Status FlightClient::Close() {
return Status::OK();
}

bool FlightClient::supports_async() const { return transport_->supports_async(); }
bool FlightClient::supports_async() const { return transport_->CheckAsyncSupport().ok(); }

Status FlightClient::CheckAsyncSupport() const { return transport_->CheckAsyncSupport(); }

Status FlightClient::CheckOpen() const {
if (closed_) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,13 @@ class ARROW_FLIGHT_EXPORT FlightClient {
/// \brief Whether this client supports asynchronous methods.
bool supports_async() const;

/// \brief Check whether this client supports asynchronous methods.
///
/// This is like supports_async(), except that a detailed error message
/// is returned if async support is not available. If async support is
/// available, this function returns successfully.
Status CheckAsyncSupport() const;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm Does this API look ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, though I would be happy to replace supports_async entirely with this.


private:
FlightClient();
Status CheckOpen() const;
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/flight/test_definitions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1832,7 +1832,9 @@ void AsyncClientTest::SetUpTest() {
ASSERT_OK_AND_ASSIGN(client_, FlightClient::Connect(real_location, client_options));

ASSERT_TRUE(client_->supports_async());
ASSERT_OK(client_->CheckAsyncSupport());
}

void AsyncClientTest::TearDownTest() {
if (supports_async()) {
ASSERT_OK(client_->Close());
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/flight/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,11 @@ class ARROW_FLIGHT_EXPORT ClientTransport {
virtual Status DoExchange(const FlightCallOptions& options,
std::unique_ptr<ClientDataStream>* stream);

virtual bool supports_async() const { return false; }
bool supports_async() const { return CheckAsyncSupport().ok(); }
virtual Status CheckAsyncSupport() const {
return Status::NotImplemented(
"this Flight transport does not support async operations");
}

static void SetAsyncRpc(AsyncListenerBase* listener, std::unique_ptr<AsyncRpc>&& rpc);
static AsyncRpc* GetAsyncRpc(AsyncListenerBase* listener);
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/arrow/flight/transport/grpc/grpc_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1034,16 +1034,17 @@ class GrpcClientImpl : public internal::ClientTransport {
->StartCall();
}

bool supports_async() const override { return true; }
Status CheckAsyncSupport() const override { return Status::OK(); }
#else
void GetFlightInfoAsync(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::shared_ptr<AsyncListener<FlightInfo>> listener) override {
listener->OnFinish(
Status::NotImplemented("gRPC 1.40 or newer is required to use async"));
listener->OnFinish(CheckAsyncSupport());
}

bool supports_async() const override { return false; }
Status CheckAsyncSupport() const override {
return Status::NotImplemented("gRPC 1.40 or newer is required to use async");
}
#endif

private:
Expand Down
74 changes: 74 additions & 0 deletions python/pyarrow/_flight.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,12 @@ cdef class FlightInfo(_Weakrefable):
cdef:
unique_ptr[CFlightInfo] info

@staticmethod
cdef wrap(CFlightInfo c_info):
cdef FlightInfo obj = FlightInfo.__new__(FlightInfo)
obj.info.reset(new CFlightInfo(move(c_info)))
return obj

def __init__(self, Schema schema, FlightDescriptor descriptor, endpoints,
total_records, total_bytes):
"""Create a FlightInfo object from a schema, descriptor, and endpoints.
Expand Down Expand Up @@ -1219,6 +1225,66 @@ cdef class FlightMetadataWriter(_Weakrefable):
check_flight_status(self.writer.get().WriteMetadata(deref(buf)))


class AsyncioCall:
"""State for an async RPC using asyncio."""

def __init__(self) -> None:
import asyncio
self._future = asyncio.get_running_loop().create_future()

def as_awaitable(self) -> object:
return self._future

def wakeup(self, result_or_exception) -> None:
# Mark the Future done from within its loop (asyncio
# objects are generally not thread-safe)
loop = self._future.get_loop()
if isinstance(result_or_exception, BaseException):
loop.call_soon_threadsafe(
self._future.set_exception, result_or_exception)
else:
loop.call_soon_threadsafe(
self._future.set_result, result_or_exception)


cdef class AsyncioFlightClient:
"""
A FlightClient with an asyncio-based async interface.

This interface is EXPERIMENTAL.
"""

cdef:
FlightClient _client

def __init__(self, FlightClient client) -> None:
self._client = client

async def get_flight_info(
self,
descriptor: FlightDescriptor,
*,
options: FlightCallOptions = None,
):
call = AsyncioCall()
self._get_flight_info(call, descriptor, options)
return await call.as_awaitable()
Comment on lines +1263 to +1271
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be:

Suggested change
async def get_flight_info(
self,
descriptor: FlightDescriptor,
*,
options: FlightCallOptions = None,
):
call = AsyncioCall()
self._get_flight_info(call, descriptor, options)
return await call.as_awaitable()
def get_flight_info(
self,
descriptor: FlightDescriptor,
*,
options: FlightCallOptions = None,
):
call = AsyncioCall()
self._get_flight_info(call, descriptor, options)
return call.as_awaitable()

but I've kept the explicit async def for introspection and self-documentation... what do you think @jorisvandenbossche ?


cdef _get_flight_info(self, call, descriptor, options):
cdef:
CFlightCallOptions* c_options = \
FlightCallOptions.unwrap(options)
CFlightDescriptor c_descriptor = \
FlightDescriptor.unwrap(descriptor)
CFuture[CFlightInfo] c_future

with nogil:
c_future = self._client.client.get().GetFlightInfoAsync(
deref(c_options), c_descriptor)

BindFuture(move(c_future), call.wakeup, FlightInfo.wrap)


cdef class FlightClient(_Weakrefable):
"""A client to a Flight service.

Expand Down Expand Up @@ -1320,6 +1386,14 @@ cdef class FlightClient(_Weakrefable):
check_flight_status(CFlightClient.Connect(c_location, c_options
).Value(&self.client))

@property
def supports_async(self):
return self.client.get().supports_async()

def as_async(self) -> None:
check_status(self.client.get().CheckAsyncSupport())
return AsyncioFlightClient(self)

def wait_for_available(self, timeout=5):
"""Block until the server can be contacted.

Expand Down
113 changes: 63 additions & 50 deletions python/pyarrow/error.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ class ArrowCancelled(ArrowException):
ArrowIOError = IOError


# This function could be written directly in C++ if we didn't
# define Arrow-specific subclasses (ArrowInvalid etc.)
# check_status() and convert_status() could be written directly in C++
# if we didn't define Arrow-specific subclasses (ArrowInvalid etc.)
cdef int check_status(const CStatus& status) except -1 nogil:
if status.ok():
return 0
Expand All @@ -88,61 +88,74 @@ cdef int check_status(const CStatus& status) except -1 nogil:
RestorePyError(status)
return -1

# We don't use Status::ToString() as it would redundantly include
# the C++ class name.
message = frombytes(status.message(), safe=True)
detail = status.detail()
if detail != nullptr:
message += ". Detail: " + frombytes(detail.get().ToString(),
safe=True)

if status.IsInvalid():
raise ArrowInvalid(message)
elif status.IsIOError():
# Note: OSError constructor is
# OSError(message)
# or
# OSError(errno, message, filename=None)
# or (on Windows)
# OSError(errno, message, filename, winerror)
errno = ErrnoFromStatus(status)
winerror = WinErrorFromStatus(status)
if winerror != 0:
raise IOError(errno, message, None, winerror)
elif errno != 0:
raise IOError(errno, message)
else:
raise IOError(message)
elif status.IsOutOfMemory():
raise ArrowMemoryError(message)
elif status.IsKeyError():
raise ArrowKeyError(message)
elif status.IsNotImplemented():
raise ArrowNotImplementedError(message)
elif status.IsTypeError():
raise ArrowTypeError(message)
elif status.IsCapacityError():
raise ArrowCapacityError(message)
elif status.IsIndexError():
raise ArrowIndexError(message)
elif status.IsSerializationError():
raise ArrowSerializationError(message)
elif status.IsCancelled():
signum = SignalFromStatus(status)
if signum > 0:
raise ArrowCancelled(message, signum)
else:
raise ArrowCancelled(message)
raise convert_status(status)


cdef object convert_status(const CStatus& status):
if IsPyError(status):
try:
RestorePyError(status)
except BaseException as e:
return e

# We don't use Status::ToString() as it would redundantly include
# the C++ class name.
message = frombytes(status.message(), safe=True)
detail = status.detail()
if detail != nullptr:
message += ". Detail: " + frombytes(detail.get().ToString(),
safe=True)

if status.IsInvalid():
return ArrowInvalid(message)
elif status.IsIOError():
# Note: OSError constructor is
# OSError(message)
# or
# OSError(errno, message, filename=None)
# or (on Windows)
# OSError(errno, message, filename, winerror)
errno = ErrnoFromStatus(status)
winerror = WinErrorFromStatus(status)
if winerror != 0:
return IOError(errno, message, None, winerror)
elif errno != 0:
return IOError(errno, message)
else:
return IOError(message)
elif status.IsOutOfMemory():
return ArrowMemoryError(message)
elif status.IsKeyError():
return ArrowKeyError(message)
elif status.IsNotImplemented():
return ArrowNotImplementedError(message)
elif status.IsTypeError():
return ArrowTypeError(message)
elif status.IsCapacityError():
return ArrowCapacityError(message)
elif status.IsIndexError():
return ArrowIndexError(message)
elif status.IsSerializationError():
return ArrowSerializationError(message)
elif status.IsCancelled():
signum = SignalFromStatus(status)
if signum > 0:
return ArrowCancelled(message, signum)
else:
message = frombytes(status.ToString(), safe=True)
raise ArrowException(message)
return ArrowCancelled(message)
else:
message = frombytes(status.ToString(), safe=True)
return ArrowException(message)


# This is an API function for C++ PyArrow
# These are API functions for C++ PyArrow
cdef api int pyarrow_internal_check_status(const CStatus& status) \
except -1 nogil:
return check_status(status)

cdef api object pyarrow_internal_convert_status(const CStatus& status):
return convert_status(status)


cdef class StopToken:
cdef void init(self, CStopToken stop_token):
Expand Down
14 changes: 14 additions & 0 deletions python/pyarrow/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,20 @@ cdef extern from "arrow/result.h" namespace "arrow" nogil:
T operator*()


cdef extern from "arrow/util/future.h" namespace "arrow" nogil:
cdef cppclass CFuture "arrow::Future"[T]:
CFuture()


cdef extern from "arrow/python/async.h" namespace "arrow::py" nogil:
# BindFuture's third argument is really a C++ callable with
# the signature `object(T*)`, but Cython does not allow declaring that.
# We use an ellipsis as a workaround.
# Another possibility is to type-erase the argument by making it
# `object(void*)`, but it would lose compile-time C++ type safety.
void BindFuture[T](CFuture[T], object cb, ...)


cdef extern from "arrow/python/common.h" namespace "arrow::py" nogil:
T GetResultValue[T](CResult[T]) except *
cdef function[F] BindFunction[F](void* unbound, object bound, ...)
Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/includes/libarrow_flight.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
CResult[unique_ptr[CFlightClient]] Connect(const CLocation& location,
const CFlightClientOptions& options)

c_bool supports_async()
CStatus CheckAsyncSupport()

CStatus Authenticate(CFlightCallOptions& options,
unique_ptr[CClientAuthHandler] auth_handler)

Expand All @@ -396,6 +399,8 @@ cdef extern from "arrow/flight/api.h" namespace "arrow" nogil:
CResult[unique_ptr[CFlightListing]] ListFlights(CFlightCallOptions& options, CCriteria criteria)
CResult[unique_ptr[CFlightInfo]] GetFlightInfo(CFlightCallOptions& options,
CFlightDescriptor& descriptor)
CFuture[CFlightInfo] GetFlightInfoAsync(CFlightCallOptions& options,
CFlightDescriptor& descriptor)
CResult[unique_ptr[CSchemaResult]] GetSchema(CFlightCallOptions& options,
CFlightDescriptor& descriptor)
CResult[unique_ptr[CFlightStreamReader]] DoGet(CFlightCallOptions& options, CTicket& ticket)
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_python.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ cdef extern from "arrow/python/pyarrow.h" namespace "arrow::py":

cdef extern from "arrow/python/common.h" namespace "arrow::py":
c_bool IsPyError(const CStatus& status)
void RestorePyError(const CStatus& status)
void RestorePyError(const CStatus& status) except *


cdef extern from "arrow/python/inference.h" namespace "arrow::py":
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ cdef extern from "Python.h":


cdef int check_status(const CStatus& status) except -1 nogil
cdef object convert_status(const CStatus& status)


cdef class _Weakrefable:
Expand Down
Loading
Loading