Skip to content

Commit

Permalink
PROTON-2832: proactor raw connection shutdown race with simultaneous …
Browse files Browse the repository at this point in the history
…epoll event
  • Loading branch information
Cliff Jansen committed Aug 1, 2024
1 parent da3096b commit bbe9a4b
Showing 1 changed file with 32 additions and 10 deletions.
42 changes: 32 additions & 10 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct praw_connection_t {
pn_event_batch_t batch;
struct addrinfo *addrinfo; /* Resolved address list */
struct addrinfo *ai; /* Current connect address */
int current_arm; /* Active epoll io events */
bool armed;
bool connected;
bool disconnected;
bool hup_detected;
Expand Down Expand Up @@ -101,7 +103,8 @@ static void praw_connection_start(praw_connection_t *prc, int fd) {
pclosefd(prc->task.proactor, fd);
}
ee->fd = fd;
ee->wanted = EPOLLIN | EPOLLOUT;
prc->current_arm = ee->wanted = EPOLLIN | EPOLLOUT;
prc->armed = true;
start_polling(ee, efd); // TODO: check for error
}

Expand Down Expand Up @@ -148,6 +151,8 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra
prc->connected = false;
prc->disconnected = false;
prc->first_schedule = false;
prc->armed = false;
prc->current_arm = 0;
prc->taddr = NULL;
prc->batch.next_event = pni_raw_batch_next;

Expand All @@ -173,6 +178,17 @@ static void praw_connection_cleanup(praw_connection_t *prc) {
// else proactor_disconnect logic owns prc and its final free
}

static void praw_initiate_cleanup(praw_connection_t *prc) {
if (prc->armed) {
// Possible race with epoll event. Wait for it to clear.
// Force EPOLLHUP callback if not already pending.
shutdown(prc->psocket.epoll_io.fd, SHUT_RDWR);
return;
}
pni_raw_finalize(&prc->raw_connection);
praw_connection_cleanup(prc);
}

pn_raw_connection_t *pn_raw_connection(void) {
praw_connection_t *conn = (praw_connection_t*) calloc(1, sizeof(praw_connection_t));
if (!conn) return NULL;
Expand Down Expand Up @@ -400,10 +416,13 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
else
pni_task_wake_done(&rc->task); // Complete task wake without event.
}
if (io_events) {
rc->armed = false;
rc->current_arm = 0;
}
if (pni_raw_finished(&rc->raw_connection)) {
unlock(&rc->task.mutex);
pni_raw_finalize(&rc->raw_connection);
praw_connection_cleanup(rc);
praw_initiate_cleanup(rc);
return NULL;
}
int events = io_events;
Expand Down Expand Up @@ -492,8 +511,7 @@ void pni_raw_connection_done(praw_connection_t *rc) {

if (pni_raw_finished(raw) && !ready) {
// If raw connection has no more work to do and safe to free resources, do so.
pni_raw_finalize(raw);
praw_connection_cleanup(rc);
praw_initiate_cleanup(rc);
} else if (ready) {
// Already scheduled to run. Skip poll. Remember if we want a read.
rc->read_check = pni_raw_can_read(raw);
Expand All @@ -509,12 +527,16 @@ void pni_raw_connection_done(praw_connection_t *rc) {
// If wanted == 0 and hup_detected, blocking not possible, so skip arming until
// application provides read buffers.
if (wanted || !rc->hup_detected) {
rc->psocket.epoll_io.wanted = wanted;
rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
// Arm only if there is a change.
if (!rc->armed || (wanted != rc->current_arm)) {
rc->psocket.epoll_io.wanted = rc->current_arm = wanted;
rc->armed = true;
rearm_polling(&rc->psocket.epoll_io, p->epollfd); // TODO: check for error
}
}
}

// praw_connection_cleanup() may have been called above. Can no longer touch rc or raw.
// praw_initiate_cleanup() may have been called above. Can no longer touch rc or raw.

lock(&p->sched_mutex);
tslot_t *resume_thread;
Expand All @@ -525,6 +547,6 @@ void pni_raw_connection_done(praw_connection_t *rc) {
}

void pni_raw_connection_forced_shutdown(praw_connection_t *rc) {
pni_raw_finalize(&rc->raw_connection);
praw_connection_cleanup(rc);
rc->armed = false; // Tear down. No epoll event callbacks.
praw_initiate_cleanup(rc);
}

0 comments on commit bbe9a4b

Please sign in to comment.