Skip to content

Commit

Permalink
reconsider the lock design
Browse files Browse the repository at this point in the history
  • Loading branch information
ymmt2005 committed Aug 14, 2024
1 parent bedf652 commit d04cba0
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 108 deletions.
9 changes: 5 additions & 4 deletions cybozu/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,16 @@ void reactor::remove_resource(int fd) {
dump_stack();
throw std::logic_error("bug in remove_resource");
}
// early close the file descriptor
it->second->close();
auto res = it->second.get();
m_garbage.emplace_back( std::move(it->second) );
m_resources.erase(it);
if( epoll_ctl(m_fd, EPOLL_CTL_DEL, fd, NULL) == -1 )
throw_unix_error(errno, "epoll_ctl(EPOLL_CTL_DEL)");
m_readables.erase(std::remove(m_readables.begin(), m_readables.end(), fd),
m_readables.end());

// early close the file descriptor
res->close();
}

void reactor::poll() {
Expand All @@ -95,8 +97,7 @@ void reactor::poll() {
std::back_inserter(m_readables_copy));
m_readables.clear();
for( int fd: m_readables_copy ) {
auto& res = m_resources[fd];
if( ! res->on_readable(fd) )
if( ! m_resources[fd]->on_readable(fd) )
remove_resource(fd);
}
m_readables_copy.clear();
Expand Down
35 changes: 18 additions & 17 deletions cybozu/reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,8 @@ class reactor;
// An abstraction of a file descriptor for use with <reactor>.
// The file descriptor should be set non-blocking.
//
// All member functions except for <invalidate_and_close> and <with_fd>
// are for the reactor thread. Sub classes can add methods for non-reactor
// threads.
// All member functions except for <with_fd> are for the reactor thread.
// Sub classes can add methods for non-reactor threads.
class resource {
public:
// Constructor.
Expand Down Expand Up @@ -70,14 +69,6 @@ class resource {
return false;
}

// Invalidate this resource then request the reactor to remove this.
//
// This method invalidates this resource then requests the reactor
// thread to remove this resource from the reactor.
//
// **Do not use this in the reactor thread**.
void invalidate_and_close();

// A template method called from within invalidate.
//
// A template method called from within invalidate.
Expand Down Expand Up @@ -133,13 +124,22 @@ class resource {
// Call `f` with a valid file descriptor.
// This is intended to be called by non-reactor threads.
//
// If the resource has already been invalidated, this returns false.
// Otherwise, this calls `f` and returns true.
bool with_fd(std::function<void(int)> f) const {
// If the resource has already been invalidated, this returns `false`.
// Otherwise, this calls `f` and returns it's return value.
//
// `f` should return `true` if it wants to keep the resource valid.
// Otherwise, it should return `false`, then `with_fd` invalidates
// the resource.
bool with_fd(std::function<bool(int)> f) {
read_lock g(m_lock);
if( ! m_valid ) return false;
f(m_fd);
return true;
if( f(m_fd) ) {
return true;
}

g.unlock();
invalidate_and_close();
return false;
}

friend class reactor;
Expand All @@ -158,6 +158,8 @@ class resource {
::close(m_fd);
m_closed = true;
}

void invalidate_and_close();
};


Expand Down Expand Up @@ -288,7 +290,6 @@ class reactor {
void poll();
};


inline void resource::invalidate_and_close() {
write_lock g(m_lock);
if( ! m_valid ) return;
Expand Down
3 changes: 2 additions & 1 deletion cybozu/signal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ class signal_reader: public resource {
resource( signalfd(-1, mask, SFD_NONBLOCK|SFD_CLOEXEC) ),
m_callback(callback)
{
with_fd([](int fd) {
with_fd([](int fd) -> bool {
if( fd == -1 )
throw_unix_error(errno, "signalfd");
return true;
});
}
// Constructor.
Expand Down
4 changes: 0 additions & 4 deletions cybozu/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ bool tcp_socket::_send(int fd, const char* p, std::size_t len, lock_guard& g) {
logger::error() << "<tcp_socket::_send>: ("
<< ecnd.value() << ") "
<< ecnd.message();
g.unlock();
invalidate_and_close();
return false;
}
p += n;
Expand Down Expand Up @@ -259,8 +257,6 @@ bool tcp_socket::_sendv(int fd, const iovec* iov, const int iovcnt, lock_guard&
logger::error() << "<tcp_socket::_sendv>: ("
<< ecnd.value() << ") "
<< ecnd.message();
g.unlock();
invalidate_and_close();
return false;
}
while( n > 0 ) {
Expand Down
32 changes: 18 additions & 14 deletions cybozu/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ class tcp_socket: public resource {
//
// @return `true` if this socket is valid, `false` otherwise.
bool send(const char* p, std::size_t len, bool flush=false) {
return with_fd([=](int fd) {
return with_fd([=](int fd) -> bool {
lock_guard g(m_lock);
if( ! _send(fd, p, len, g) )
if( ! _send(fd, p, len, g) ) {
return false;
}
if( flush && empty() )
_flush(fd);
return true;
});
}

Expand All @@ -105,14 +107,16 @@ class tcp_socket: public resource {
//
// @return `true` if this socket is valid, `false` otherwise.
bool sendv(const iovec* iov, int iovcnt, bool flush=false) {
return with_fd([=](int fd) {
return with_fd([=](int fd) -> bool {
if( iovcnt >= MAX_IOVCNT )
throw std::logic_error("<tcp_socket::sendv> too many iovec.");
lock_guard g(m_lock);
if( ! _sendv(fd, iov, iovcnt, g) )
if( ! _sendv(fd, iov, iovcnt, g) ) {
return false;
}
if( flush && empty() )
_flush(fd);
return true;
});
}

Expand All @@ -126,19 +130,19 @@ class tcp_socket: public resource {
//
// @return `true` if this socket is valid, `false` otherwise.
bool send_close(const char* p, std::size_t len) {
return with_fd([=](int fd) {
return with_fd([=](int fd) -> bool {
lock_guard g(m_lock);
if( ! _send(fd, p, len, g) )
if( ! _send(fd, p, len, g) ) {
return false;
}
m_shutdown = true;
if( empty() ) {
_flush(fd);
g.unlock();
invalidate_and_close();
return true;
return false;
}
g.unlock();
m_cond_write.notify_all();
return true;
});
}

Expand All @@ -152,21 +156,21 @@ class tcp_socket: public resource {
//
// @return `true` if this socket is valid, `false` otherwise.
bool sendv_close(const iovec* iov, int iovcnt) {
return with_fd([=](int fd) {
return with_fd([=](int fd) -> bool {
if( iovcnt >= MAX_IOVCNT )
throw std::logic_error("<tcp_socket::sendv> too many iov.");
lock_guard g(m_lock);
if( ! _sendv(fd, iov, iovcnt, g) )
if( ! _sendv(fd, iov, iovcnt, g) ) {
return false;
}
m_shutdown = true;
if( empty() ) {
_flush(fd);
g.unlock();
invalidate_and_close();
return true;
return false;
}
g.unlock();
m_cond_write.notify_all();
return true;
});
}

Expand Down
41 changes: 18 additions & 23 deletions src/counter/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,32 @@ counter_socket::counter_socket(int fd,
g_stats.total_connections.fetch_add(1);

m_recvjob = [this](cybozu::dynbuf& buf) {
with_fd([this, &buf](int fd) {
// load pending data
if( ! m_pending.empty() ) {
buf.append(m_pending.data(), m_pending.size());
m_pending.reset();
}
// load pending data
if( ! m_pending.empty() ) {
buf.append(m_pending.data(), m_pending.size());
m_pending.reset();
}

with_fd([this, &buf](int fd) -> bool {
while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
return true;
if( errno == EINTR )
continue;
if( errno == ECONNRESET ) {
buf.reset();
release_all();
invalidate_and_close();
break;
return false;
}
cybozu::throw_unix_error(errno, "recv");
}
if( n == 0 ) {
buf.reset();
release_all();
invalidate_and_close();
break;
return false;
}
// if (n != -1) && (n != 0)
buf.consume(n);
Expand All @@ -71,24 +69,21 @@ counter_socket::counter_socket(int fd,
<< len << " bytes.";
buf.reset();
release_all();
invalidate_and_close();
break;
return false;
}
buf.erase(head - buf.data());
}

// recv returns EAGAIN, or some error happens.
if( buf.size() > 0 )
m_pending.append(buf.data(), buf.size());

m_busy.store(false, std::memory_order_release);
});

// recv returns EAGAIN, or some error happens.
if( buf.size() > 0 )
m_pending.append(buf.data(), buf.size());
m_busy.store(false, std::memory_order_release);
};

m_sendjob = [this](cybozu::dynbuf& buf) {
with_fd([this, &buf](int fd) {
if( ! write_pending_data(fd) )
invalidate_and_close();
m_sendjob = [this](cybozu::dynbuf&) {
with_fd([this](int fd) -> bool {
return write_pending_data(fd);
});
};
}
Expand Down
Loading

0 comments on commit d04cba0

Please sign in to comment.