Skip to content

Commit

Permalink
have wait thread periodically check for cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 26, 2024
1 parent ae36f61 commit d2e88e1
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions src/fd.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#ifdef _WIN32
#ifndef FD_SETSIZE
#define FD_SETSIZE 2048 // Increase max fds - only for select() fallback
#define FD_SETSIZE 2048 // Set max fds - only for select() fallback
#endif
#include <winsock2.h>
#else
Expand Down Expand Up @@ -28,6 +28,8 @@ extern CallbackRegistryTable callbackRegistryTable;
extern SEXP later_fdcancel;
extern SEXP later_invisibleSymbol;

#define LATER_INTERVAL 1024

#ifdef _WIN32
#define POLL_FUNC WSAPoll
#else
Expand Down Expand Up @@ -67,6 +69,10 @@ static int wait_thread(void *arg) {

int result;

const bool infinite = args->timeout == R_PosInf;
int timeoutms = infinite || args->timeout < 0 ? 1000 : (int) (args->timeout * 1000);
int repeats = infinite || args->timeout < 0 ? 0 : timeoutms / LATER_INTERVAL;

#ifndef POLLIN // fall back to select() for R on older Windows

fd_set readfds, writefds, exceptfds;
Expand All @@ -85,13 +91,16 @@ static int wait_thread(void *arg) {
}

struct timeval tv;
const int use_timeout = args->timeout != R_PosInf;
if (use_timeout) {
tv.tv_sec = args->timeout < 0 ? 1 : (int) args->timeout;
tv.tv_usec = args->timeout < 0 ? 0 : ((int) (args->timeout * 1000)) % 1000 * 1000;
}

result = select(max_fd + 1, &readfds, &writefds, &exceptfds, use_timeout ? &tv : NULL);
do {
tv.tv_sec = (repeats ? LATER_INTERVAL : timeoutms) / 1000;
tv.tv_usec = ((repeats ? LATER_INTERVAL : timeoutms) % 1000) * 1000;

result = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);

if (args->flag->load()) goto callback;
if (result) break;
} while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL)));

if (result > 0) {
for (int i = 0; i < args->rfds; i++) {
Expand All @@ -113,8 +122,6 @@ static int wait_thread(void *arg) {

#else

int timeout = args->timeout == R_PosInf ? -1 : args->timeout < 0 ? 1000 : (int) (args->timeout * 1000);

std::vector<struct pollfd> pollfds;
pollfds.reserve(args->num_fds);
struct pollfd pfd;
Expand All @@ -137,7 +144,11 @@ static int wait_thread(void *arg) {
pollfds.push_back(pfd);
}

result = POLL_FUNC(pollfds.data(), args->num_fds, timeout);
do {
result = POLL_FUNC(pollfds.data(), args->num_fds, repeats ? LATER_INTERVAL : timeoutms);
if (args->flag->load()) goto callback;
if (result) break;
} while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL)));

if (result > 0) {
for (int i = 0; i < args->rfds; i++) {
Expand All @@ -159,6 +170,7 @@ static int wait_thread(void *arg) {

#endif // POLLIN

callback:
callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);

return 0;
Expand Down

0 comments on commit d2e88e1

Please sign in to comment.