Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drain io_context after shutdown of plugins #34

Merged
merged 12 commits into from
Oct 21, 2024
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ Plugins can be registered by calling `appbase::application::register_plugin()`.
### Boost ASIO

AppBase maintains a singleton `application` instance which can be accessed via `appbase::app()`. This
application owns a `boost::asio::io_service` which starts running when `appbase::exec()` is called. If
application owns a `boost::asio::io_context` which starts running when `appbase::exec()` is called. If
a plugin needs to perform IO or other asynchronous operations then it should dispatch it via `application`
`io_service` which is setup to use an execution priority queue.
`io_context` which is setup to use an execution priority queue.
```
app().post( appbase::priority::low, lambda )
```
OR
```
delay_timer->async_wait( app().get_priority_queue().wrap( priority::low, lambda ) );
```
Use of `get_io_service()` directly is not recommended as the priority queue will not be respected.
Use of `get_io_context()` directly is not recommended as the priority queue will not be respected.

Because the app calls `io_service::run()` from within `application::exec()` and does not spawn any threads
all asynchronous operations posted to the io_service should be run in the same thread.
Because the app calls `io_context::run()` from within `application::exec()` and does not spawn any threads
all asynchronous operations posted to the io_context should be run in the same thread.

## Graceful Exit

Expand Down
42 changes: 25 additions & 17 deletions application_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ void application_base::wait_for_signal(std::shared_ptr<boost::asio::signal_set>
});
}

void application_base::setup_signal_handling_on_ios(boost::asio::io_service& ios, bool startup) {
std::shared_ptr<boost::asio::signal_set> ss = std::make_shared<boost::asio::signal_set>(ios, SIGINT, SIGTERM);
void application_base::setup_signal_handling_on_ioc(boost::asio::io_context& io_ctx, bool startup) {
std::shared_ptr<boost::asio::signal_set> ss = std::make_shared<boost::asio::signal_set>(io_ctx, SIGINT, SIGTERM);
#ifdef SIGPIPE
ss->add(SIGPIPE);
#endif
Expand All @@ -165,15 +165,15 @@ void application_base::setup_signal_handling_on_ios(boost::asio::io_service& ios
wait_for_signal(ss);
}

void application_base::startup(boost::asio::io_service& io_serv) {
void application_base::startup(boost::asio::io_context& io_ctx) {
//during startup, run a second thread to catch SIGINT/SIGTERM/SIGPIPE/SIGHUP
boost::asio::io_service startup_thread_ios;
setup_signal_handling_on_ios(startup_thread_ios, true);
std::thread startup_thread([&startup_thread_ios]() {
startup_thread_ios.run();
boost::asio::io_context startup_thread_io_ctx;
setup_signal_handling_on_ioc(startup_thread_io_ctx, true);
std::thread startup_thread([&startup_thread_io_ctx]() {
startup_thread_io_ctx.run();
});
auto clean_up_signal_thread = [&startup_thread_ios, &startup_thread]() {
startup_thread_ios.stop();
auto clean_up_signal_thread = [&startup_thread_io_ctx, &startup_thread]() {
startup_thread_io_ctx.stop();
startup_thread.join();
};

Expand All @@ -185,16 +185,16 @@ void application_base::startup(boost::asio::io_service& io_serv) {

} catch( ... ) {
clean_up_signal_thread();
shutdown();
shutdown_plugins();
throw;
}

//after startup, shut down the signal handling thread and catch the signals back on main io_service
//after startup, shut down the signal handling thread and catch the signals back on main io_context
clean_up_signal_thread();
setup_signal_handling_on_ios(io_serv, false);
setup_signal_handling_on_ioc(io_ctx, false);

#ifdef SIGHUP
std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(io_serv, SIGHUP));
std::shared_ptr<boost::asio::signal_set> sighup_set(new boost::asio::signal_set(io_ctx, SIGHUP));
start_sighup_handler( sighup_set );
#endif
}
Expand Down Expand Up @@ -441,7 +441,7 @@ void application_base::handle_exception(std::exception_ptr eptr, std::string_vie
}
}

void application_base::shutdown() {
void application_base::shutdown_plugins() {
std::exception_ptr eptr = nullptr;

for(auto ritr = running_plugins.rbegin();
Expand All @@ -454,8 +454,17 @@ void application_base::shutdown() {
handle_exception(std::current_exception(), (*ritr)->name());
}
}
for(auto ritr = running_plugins.rbegin();
ritr != running_plugins.rend(); ++ritr) {

// if we caught an exception while shutting down a plugin, rethrow it so that main()
// can catch it and report the error
if (eptr)
std::rethrow_exception(eptr);
}

void application_base::destroy_plugins() {
std::exception_ptr eptr = nullptr;

for(auto ritr = running_plugins.rbegin(); ritr != running_plugins.rend(); ++ritr) {
try {
plugins.erase((*ritr)->name());
} catch(...) {
Expand All @@ -474,7 +483,6 @@ void application_base::shutdown() {
eptr = std::current_exception();
handle_exception(std::current_exception(), "plugin cleanup");
}
quit();

// if we caught an exception while shutting down a plugin, rethrow it so that main()
// can catch it and report the error
Expand Down
8 changes: 5 additions & 3 deletions examples/executor_example/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#include <appbase/application_base.hpp>
#include <appbase/execution_priority_queue.hpp>

#include <limits>

class my_executor {
public:
template <typename Func>
auto post( int priority, Func&& func ) {
return boost::asio::post(io_serv, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
return boost::asio::post(io_ctx, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
}

auto& get_priority_queue() { return pri_queue; }
Expand All @@ -31,11 +33,11 @@ class my_executor {

void clear() { pri_queue.clear(); }

boost::asio::io_service& get_io_service() { return io_serv; }
boost::asio::io_context& get_io_context() { return io_ctx; }

private:
// members are ordered taking into account that the last one is destructed first
boost::asio::io_service io_serv;
boost::asio::io_context io_ctx;
appbase::execution_priority_queue pri_queue;
std::size_t order = std::numeric_limits<size_t>::max(); // to maintain FIFO ordering in queue within priority
};
Expand Down
78 changes: 59 additions & 19 deletions include/appbase/application_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ class application_base {
return initialize_impl(argc, argv, {find_plugin<Plugin>()...}, initialize_logging);
}

void startup(boost::asio::io_service& io_serv);
void shutdown();
void startup(boost::asio::io_context& io_ctx);

/**
* Wait until quit(), SIGINT or SIGTERM and then shutdown.
Expand All @@ -125,29 +124,51 @@ class application_base {
void exec(Executor& exec) {
std::exception_ptr eptr = nullptr;
{
auto& io_serv{exec.get_io_service()};
boost::asio::io_service::work work(io_serv);
auto& io_ctx{exec.get_io_context()};
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work = boost::asio::make_work_guard(io_ctx);
(void)work;
bool more = true;

while (more || io_serv.run_one()) {
if (is_quiting())
break;
while (more || io_ctx.run_one()) {
try {
io_serv.poll(); // queue up any ready; allowing high priority item to get into the queue
io_ctx.poll(); // queue up any ready; allowing high priority item to get into the queue
if (io_ctx.stopped())
break;
// execute the highest priority item
more = exec.execute_highest();
} catch (...) {
more = true; // so we exit the while loop without calling io_serv.run_one()
more = true; // so we exit the while loop without calling io_ctx.run_one()
quit();
eptr = std::current_exception();
handle_exception(eptr, "application loop");
}
}

quit();

try {
shutdown_plugins(); // may rethrow exceptions
} catch (...) {
if (!eptr)
eptr = std::current_exception();
}

work.reset();

try {
exec.clear(); // make sure the queue is empty
shutdown(); // may rethrow exceptions
// plugins shutdown down at this point,

// Drain the io_context of anything that could be referencing plugins.
// Note this does not call exec.execute_highest(), so only drains into the priority queue assuming nothing
// has hijacked the io_context for other purposes.
io_ctx.restart();
while (io_ctx.poll())
;
// clear priority queue of anything pushed by poll()
exec.clear();

// destroy the plugins now that all lambda that reference them have been destroyed
greg7mdp marked this conversation as resolved.
Show resolved Hide resolved
destroy_plugins();
} catch (...) {
if (!eptr)
eptr = std::current_exception();
Expand Down Expand Up @@ -290,6 +311,9 @@ class application_base {
}
///@}

void shutdown_plugins();
void destroy_plugins();

application_base(std::shared_ptr<void>&& e); ///< protected because application is a singleton that should be accessed via instance()

/// !!! must be dtor'ed after plugins
Expand Down Expand Up @@ -318,7 +342,7 @@ class application_base {
void print_default_config(std::ostream& os);

void wait_for_signal(std::shared_ptr<boost::asio::signal_set> ss);
void setup_signal_handling_on_ios(boost::asio::io_service& ios, bool startup);
void setup_signal_handling_on_ioc(boost::asio::io_context& io_ctx, bool startup);

void handle_exception(std::exception_ptr eptr, std::string_view origin);
};
Expand All @@ -343,12 +367,12 @@ class application_t : public application_base {
}

/**
* Post func to run on io_service with given priority.
* Post func to run on io_context with given priority.
*
* -- deprecated: use app().executor().post()
*
* @param priority can be appbase::priority::* constants or any int, larger ints run first
* @param func function to run on io_service
* @param func function to run on io_context
* @return result of boost::asio::post
*/
template <typename Func>
Expand All @@ -365,20 +389,36 @@ class application_t : public application_base {
}

/**
* Anything posted directly on this io_service is run at the highest of priority as it by-passes the
* Anything posted directly on this io_context is run at the highest of priority as it by-passes the
* priority queue and is run immediately in exec(). Use with care and consider using app().executor().post() instead.
* @return
*/
boost::asio::io_service& get_io_service() {
return executor().get_io_service();
boost::asio::io_context& get_io_context() {
return executor().get_io_context();
}

/**
* Create a timer with the main application io_context. Timer async_wait will execute on the main thread.
*
* Use with app().executor().wrap(priority::x, exec_queue::x, [](const boost::system::error_code& ec).
* For example:
* _timer.async_wait(app().executor().wrap(priority::high, exec_queue::read_write,
* [](const boost::system::error_code& ec) {
* if (!ec)
* // do something
* }));
*/
template<typename Timer>
auto make_timer() {
return Timer{get_io_context()};
}

void startup() {
application_base::startup(get_io_service());
application_base::startup(get_io_context());
}

application_t() : application_base(std::make_shared<executor_t>()) {
set_stop_executor_cb([&]() { get_io_service().stop(); });
set_stop_executor_cb([&]() { get_io_context().stop(); });
set_post_cb([&](int prio, std::function<void()> cb) { executor().post(prio, std::move(cb)); });
}

Expand Down
16 changes: 9 additions & 7 deletions include/appbase/default_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@
#include <appbase/application_base.hpp>
#include <appbase/execution_priority_queue.hpp>

#include <limits>

namespace appbase {

class default_executor {
public:
template <typename Func>
auto post(int priority, Func&& func) {
return boost::asio::post(io_serv, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
return boost::asio::post(io_ctx, pri_queue.wrap(priority, --order, std::forward<Func>(func)));
}

/**
* Provide access to execution priority queue so it can be used to wrap functions for
* prioritized execution.
*
* Example:
* boost::asio::steady_timer timer( app().get_io_service() );
* boost::asio::steady_timer timer( app().get_io_context() );
* timer.async_wait( app().get_priority_queue().wrap(priority::low, [](){ do_something(); }) );
*/
auto& get_priority_queue() {
Expand All @@ -33,16 +35,16 @@ class default_executor {
}

/**
* Do not run io_service in any other threads, as application assumes single-threaded execution in exec().
* @return io_serivice of application
* Do not run io_context in any other threads, as application assumes single-threaded execution in exec().
* @return io_context of application
*/
boost::asio::io_service& get_io_service() {
return io_serv;
boost::asio::io_context& get_io_context() {
return io_ctx;
}

private:
// members are ordered taking into account that the last one is destructed first
boost::asio::io_service io_serv;
boost::asio::io_context io_ctx;
execution_priority_queue pri_queue;
std::size_t order = std::numeric_limits<size_t>::max(); // to maintain FIFO ordering in queue within priority
};
Expand Down
Loading
Loading