Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

net/tls: Wait for data_{source,sink}::close() #2547

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions src/net/tls.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ module;
#endif

#include <any>
#include <coroutine>
#include <filesystem>
#include <stdexcept>
#include <system_error>
Expand Down Expand Up @@ -1594,26 +1595,32 @@ class session : public enable_lw_shared_from_this<session> {
auto me = shared_from_this();
// running in background. try to bye-handshake us nicely, but after 10s we forcefully close.
engine().run_in_background(with_timeout(timer<>::clock::now() + std::chrono::seconds(10), shutdown()).finally([this] {
_eof = true;
try {
(void)_in.close().handle_exception([](std::exception_ptr) {}); // should wake any waiters
} catch (...) {
}
try {
(void)_out.close().handle_exception([](std::exception_ptr) {});
} catch (...) {
}
// make sure to wait for handshake attempt to leave semaphores. Must be in same order as
// handshake aqcuire, because in worst case, we get here while a reader is attempting
// re-handshake.
return with_semaphore(_in_sem, 1, [this] {
return with_semaphore(_out_sem, 1, [] {});
});
return close_after_shutdown();
}).then_wrapped([me = std::move(me)](future<> f) { // must keep object alive until here.
f.ignore_ready_future();
}));
}
}

future<> close_after_shutdown() {
_eof = true;
try {
co_await _in.close(); // should wake any waiters
} catch (...) {
}
try {
co_await _out.close();
} catch (...) {
}

// make sure to wait for handshake attempt to leave semaphores. Must be in same order as
// handshake aqcuire, because in worst case, we get here while a reader is attempting
// re-handshake.
co_await with_semaphore(_in_sem, 1, [this] {
return with_semaphore(_out_sem, 1, [] {});
});
}

// helper for sink
future<> flush() noexcept {
return with_semaphore(_out_sem, 1, [this] {
Expand Down