Skip to content

Commit

Permalink
Don't use static for async event loop
Browse files Browse the repository at this point in the history
Summary: We're seeing occasional timouts and the warning "DeprecationWarning: There is no current event loop" in the asyncio tests. We're also seeing "asyncio.run() creates a new event loop, runs the given coroutine on it, and closes the loop when the coroutine completes. This is the recommended way to run asynchronous code in Python)", which exposes the issue that we should not use a static for the loop, as we're then getting "RuntimeError: Event loop is closed" when the test is run the second time.

Reviewed By: jtbraun

Differential Revision: D67836792

fbshipit-source-id: be3fdfd3208cdd3f0be8762a1d3316d9641c5d13
  • Loading branch information
Georges Berenger authored and facebook-github-bot committed Jan 6, 2025
1 parent 4218510 commit 71865ff
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 21 deletions.
13 changes: 3 additions & 10 deletions csrc/reader/AsyncVRSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,11 @@
namespace pyvrs {

AwaitableRecord::AwaitableRecord(uint32_t index, AsyncJobQueue& queue)
: index_{index}, queue_{queue} {
AsyncJob::getMainLoop(); // make sure the main loop is initialized
}
: index_{index}, queue_{queue} {}

AwaitableRecord::AwaitableRecord(const AwaitableRecord& other)
: index_{other.index_}, queue_{other.queue_} {}

py::object& AsyncJob::getMainLoop() {
static py::object mainLoop = py::module_::import("asyncio").attr("get_running_loop")();
return mainLoop;
}

template <class Reader>
void AsyncThreadHandler<Reader>::cleanup() {
shouldEndAsyncThread_ = true;
Expand Down Expand Up @@ -72,12 +65,12 @@ template class AsyncThreadHandler<OssAsyncMultiVRSReader>;

void AsyncReadJob::performJob(OssAsyncVRSReader& reader) {
py::object record = reader.readRecord(index_);
getMainLoop().attr("call_soon_threadsafe")(future_.attr("set_result"), record);
loop_.attr("call_soon_threadsafe")(future_.attr("set_result"), record);
}

void AsyncReadJob::performJob(OssAsyncMultiVRSReader& reader) {
py::object record = reader.readRecord(index_);
getMainLoop().attr("call_soon_threadsafe")(future_.attr("set_result"), record);
loop_.attr("call_soon_threadsafe")(future_.attr("set_result"), record);
}

AwaitableRecord
Expand Down
20 changes: 11 additions & 9 deletions csrc/reader/AsyncVRSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ class OssAsyncVRSReader;
class OssAsyncMultiVRSReader;

/// \brief The base class for asynchronous job
/// This class takes asyncio's event loop & future in constructor so that we can later call
/// loop.call_soon_threadsafe(future.set_result(<result>)) to set the result into future.
/// This class captures the asyncio's event loop, and creates a future for that loop so that we can
/// later call loop.call_soon_threadsafe(future.set_result(<result>)) to set that future's result.
class AsyncJob {
public:
AsyncJob() : future_{AsyncJob::getMainLoop().attr("create_future")()} {}
AsyncJob()
: loop_{py::module_::import("asyncio").attr("get_running_loop")()},
future_{loop_.attr("create_future")()} {}
AsyncJob(const AsyncJob& other) = delete;
virtual ~AsyncJob() = default;

Expand All @@ -56,9 +58,8 @@ class AsyncJob {
return future_.attr("__await__")();
}

static py::object& getMainLoop();

protected:
py::object loop_;
py::object future_;
};

Expand All @@ -77,10 +78,11 @@ class AsyncReadJob : public AsyncJob {
using AsyncJobQueue = JobQueue<std::unique_ptr<AsyncJob>>;

/// \brief Python awaitable record
/// This class only exposes __await__ method to Python which does
/// - Obtain Python event loop via asyncio.events.get_event_loop
/// - Create future to return to Python side via loop.create_future
/// - Create AsyncReadJob object with loop, future and record index
/// This class only exposes __await__ method to Python which does the following:
/// - Creates an AsyncReadJob object that:
/// 1 - captures asyncio.events.get_running_loop,
/// 2 - creates a future for that loop via loop.create_future(),
/// 3 - captures the record index
/// - Send a job to AsyncReader's background thread
/// - Call future.__await__() and Python side waits until set_result will be called by AsyncReader
class AwaitableRecord {
Expand Down
3 changes: 1 addition & 2 deletions test/pyvrs_async_reader_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ def async_test(func):

@functools.wraps(func)
def wrapper(*args: Any, **kws: Any) -> None:
loop = asyncio.get_event_loop()
loop.run_until_complete(func(*args, **kws))
asyncio.run(func(*args, **kws))

return wrapper

Expand Down

0 comments on commit 71865ff

Please sign in to comment.