From a7ab8c04288bc7ad0f09583286a23f61aa876c6f Mon Sep 17 00:00:00 2001 From: rim Date: Sun, 6 Oct 2024 10:06:19 +0300 Subject: [PATCH] threadpool: add TP_EV_PROC to track process exit; improve timer handler code. --- CMakeLists.txt | 1 + include/threadpool/threadpool.h | 10 +- src/threadpool/threadpool.c | 179 ++++++++++++++++------- tests/threadpool/main.c | 73 +++++++++ tests/threadpool/test-threadpool.project | 2 +- 5 files changed, 213 insertions(+), 52 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6efbbda..8656943 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -103,6 +103,7 @@ chk_function_exists(kqueuex) chk_function_exists(rtprio) chk_function_exists(pthread_setname_np) chk_function_exists(pthread_set_name_np) +chk_function_exists(posix_spawn_file_actions_addclosefrom_np) # Check macros. chk_symbol_exists(sys/socket.h SOCK_CLOEXEC) diff --git a/include/threadpool/threadpool.h b/include/threadpool/threadpool.h index 4e15a24..e0037ec 100644 --- a/include/threadpool/threadpool.h +++ b/include/threadpool/threadpool.h @@ -56,7 +56,8 @@ typedef struct thread_pool_event_s { /* Thread pool event. */ #define TP_EV_READ 0 /* EVFILT_READ EPOLLIN | EPOLLRDHUP | EPOLLERR */ #define TP_EV_WRITE 1 /* EVFILT_WRITE EPOLLOUT | EPOLLERR */ #define TP_EV_TIMER 2 /* EVFILT_TIMER TP_EV_READ + timerfd_create */ -#define TP_EV_LAST TP_EV_TIMER +#define TP_EV_PROC 3 /* EVFILT_PROC TP_EV_READ + pidfd_open */ +#define TP_EV_LAST TP_EV_PROC #define TP_EV_MASK 0x0003u /* For internal use: event set mask. */ /* Event flags. */ @@ -73,10 +74,12 @@ typedef struct thread_pool_event_s { /* Thread pool event. */ #define TP_F_EOF (((uint16_t)1) << 8) /* Ret: EV_EOF EPOLLRDHUP */ #define TP_F_ERROR (((uint16_t)1) << 9) /* Ret: EV_EOF+fflags EPOLLERR + getsockopt(SO_ERROR) */ /* fflags contain error code. */ + /* Event fflags. */ /* TP_EV_READ/TP_EV_WRITE specific. */ #define TP_FF_RW_LOWAT (((uint32_t)1) << 0) /* For sockets: set SO_RCVLOWAT/SO_SNDLOWAT. */ #define TP_FF_RW_MASK 0x00000001u /* For internal use: fflags set mask. */ + /* TP_EV_TIMER specific: if not set - the default is seconds. */ /* Data units selection ENUM for timer: select only one. */ #define TP_FF_T_SEC 0x00000000u /* data is seconds. */ @@ -90,6 +93,11 @@ typedef struct thread_pool_event_s { /* Thread pool event. */ static const char *tp_ff_time_units[] = { "s", "ms", "us", "ns", NULL }; +/* TP_EV_PROC specific: if not set - the default is seconds. */ +#define TP_FF_P_EXIT (((uint32_t)1) << 0) /* The process has exited. The exit status will be stored in data. */ +#define TP_FF_P_MASK 0x00000001u /* For internal use: fflags set mask. */ + + typedef void (*tpt_hook_cb)(tpt_p tpt); typedef void (*tp_cb)(tp_event_p ev, tp_udata_p tp_udata); diff --git a/src/threadpool/threadpool.c b/src/threadpool/threadpool.c index d910d03..14b5111 100644 --- a/src/threadpool/threadpool.c +++ b/src/threadpool/threadpool.c @@ -44,6 +44,8 @@ # include # include # include +# include +# include #endif /* Linux specific code. */ #include @@ -107,6 +109,7 @@ static const short tp_event_to_kq_map[] = { EVFILT_READ, /* 0: TP_EV_READ */ EVFILT_WRITE, /* 1: TP_EV_WRITE */ EVFILT_TIMER, /* 2: TP_EV_TIMER */ + EVFILT_PROC, /* 3: TP_EV_PROC */ 0 }; @@ -125,6 +128,7 @@ static const uint32_t tp_event_to_ep_map[] = { EPOLL_IN, /* 0: TP_EV_READ */ EPOLL_OUT, /* 1: TP_EV_WRITE */ 0, /* 2: TP_EV_TIMER */ + 0, /* 3: TP_EV_PROC */ 0 }; @@ -261,6 +265,9 @@ tp_fflags_to_kq(uint16_t event, uint32_t fflags) { break; } break; + case TP_EV_PROC: + ret |= NOTE_EXIT; /* Always set. */ + break; } return (ret); @@ -325,6 +332,7 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) { } break; case TP_EV_TIMER: /* Timer: force update. */ + case TP_EV_PROC: /* Process: force update. */ if (0 != ((EV_ADD | EV_ENABLE) & kev.flags)) { kev.flags |= (EV_ADD | EV_ENABLE); } @@ -486,13 +494,26 @@ tpt_loop(tpt_p tpt) { /* Translate kq event to thread poll event. */ switch (kev.filter) { case EVFILT_READ: - ev.event = TP_EV_READ; - break; case EVFILT_WRITE: - ev.event = TP_EV_WRITE; + ev.event = ((EVFILT_READ == kev.filter) ? TP_EV_READ : TP_EV_WRITE); + ev.flags = 0; + if (0 != (EV_EOF & kev.flags)) { + ev.flags |= TP_F_EOF; + if (0 != kev.fflags) { /* For socket: closed, and error present. */ + ev.flags |= TP_F_ERROR; + } + } + ev.fflags = (uint32_t)kev.fflags; break; case EVFILT_TIMER: ev.event = TP_EV_TIMER; + ev.flags = 0; + ev.fflags = 0; + break; + case EVFILT_PROC: + ev.event = TP_EV_PROC; + ev.flags = 0; + ev.fflags = TP_FF_P_EXIT; break; default: syslog(LOG_DEBUG, "kevent with invalid filter = %i, ident = %zu.", @@ -500,14 +521,6 @@ tpt_loop(tpt_p tpt) { debugd_break(); continue; } - ev.flags = 0; - if (0 != (EV_EOF & kev.flags)) { - ev.flags |= TP_F_EOF; - if (0 != kev.fflags) { /* For socket: closed, and error present. */ - ev.flags |= TP_F_ERROR; - } - } - ev.fflags = (uint32_t)kev.fflags; ev.data = (uint64_t)kev.data; tp_udata->cb_func(&ev, tp_udata); @@ -521,6 +534,11 @@ tpt_loop(tpt_p tpt) { #define TP_EV_OTHER(event) \ (TP_EV_READ == (event) ? TP_EV_WRITE : TP_EV_READ) +static int +pidfd_open(pid_t pid, unsigned int flags) { + return syscall(SYS_pidfd_open, pid, flags); +} + /* Translate thread pool flags <-> epoll flags. */ static inline uint32_t tp_flags_to_ep(const int op, const uint16_t flags) { @@ -617,7 +635,7 @@ epoll_ctl_ex(int epfd, int op, int fd, struct epoll_event *event) { static int tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) { - int error = 0, tfd, clockid = CLOCK_MONOTONIC, tmr_flags = 0, op_guess; + int error = 0, tfd, op_guess; uint32_t lowat; struct itimerspec new_tmr; struct epoll_event epev; @@ -630,9 +648,11 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) { epev.events = (EPOLLHUP | EPOLLERR); epev.data.ptr = (void*)tp_udata; - if (TP_EV_TIMER == ev->event) { /* Special handle for timer. */ + switch (ev->event) { + case TP_EV_TIMER: /* Special handle for timer. */ tfd = TPDATA_TFD_GET(tp_udata->tpdata); - if (TP_CTL_DEL == op) { /* Delete timer. */ + switch (op) { + case TP_CTL_DEL: /* Delete timer. */ if (0 == tfd) return (ENOENT); error = 0; @@ -640,8 +660,7 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) { close(tfd); /* No need to epoll_ctl(EPOLL_CTL_DEL). */ tp_udata->tpdata = 0; return (error); - } - if (TP_CTL_DISABLE == op) { + case TP_CTL_DISABLE: if (0 == tfd) return (ENOENT); tp_udata->tpdata |= TPDATA_F_DISABLED; @@ -654,13 +673,9 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) { } /* TP_CTL_ADD, TP_CTL_ENABLE */ - if (0 != (TP_FF_T_ABSTIME & ev->fflags)) { - clockid = CLOCK_REALTIME; - tmr_flags = TFD_TIMER_ABSTIME; - } - if (0 == tfd) { /* Create timer, if needed. */ - tfd = timerfd_create(clockid, + tfd = timerfd_create( + ((0 != (TP_FF_T_ABSTIME & ev->fflags)) ? CLOCK_REALTIME : CLOCK_MONOTONIC), (TFD_NONBLOCK | (0 != (TP_S_F_CLOEXEC & tp_udata->tpt->tp->s.flags) ? TFD_CLOEXEC : 0))); if (-1 == tfd) { @@ -669,8 +684,8 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) { } TPDATA_TFD_SET(tp_udata->tpdata, tfd); TPDATA_EV_FL_SET(tp_udata->tpdata, ev->event, ev->flags); /* Remember original event and flags. */ - /* Adding to epoll. */ - epev.events |= EPOLLIN; /* Not set EPOLLONESHOT, control timer. */ + /* Add to epoll. */ + epev.events |= EPOLLIN; /* Not set EPOLLONESHOT, use timer control. */ if (0 != epoll_ctl((int)tp_udata->tpt->io_fd, EPOLL_CTL_ADD, tfd, &epev)) { error = errno; @@ -702,11 +717,52 @@ tpt_ev_post(int op, tp_event_p ev, tp_udata_p tp_udata) { } else { /* Periodic. */ new_tmr.it_interval = new_tmr.it_value; /* memcpy(). */ } - if (-1 == timerfd_settime(tfd, tmr_flags, &new_tmr, NULL)) { + if (-1 == timerfd_settime(tfd, + ((0 != (TP_FF_T_ABSTIME & ev->fflags)) ? TFD_TIMER_ABSTIME : 0), + &new_tmr, NULL)) { error = errno; goto err_out_timer; } return (0); + case TP_EV_PROC: + tfd = TPDATA_TFD_GET(tp_udata->tpdata); + switch (op) { + case TP_CTL_DEL: /* Delete proc. */ + case TP_CTL_DISABLE: + if (0 == tfd) + return (ENOENT); + error = 0; +err_out_proc: + close(tfd); /* No need to epoll_ctl(EPOLL_CTL_DEL). */ + tp_udata->tpdata = 0; + return (error); + case TP_CTL_ADD: /* Add proc. */ + case TP_CTL_ENABLE: + if (0 != tfd) + return (EEXIST); + /* Create pidfd. */ + tfd = pidfd_open(tp_udata->ident, PIDFD_NONBLOCK); + if (-1 == tfd) + return (errno); + if (0 != (TP_S_F_CLOEXEC & tp_udata->tpt->tp->s.flags)) { + if (-1 == fcntl(tfd, F_SETFD, (FD_CLOEXEC | O_NONBLOCK))) { + error = errno; + goto err_out_proc; + } + } + tp_udata->tpdata = 0; + TPDATA_TFD_SET(tp_udata->tpdata, tfd); + TPDATA_EV_FL_SET(tp_udata->tpdata, ev->event, ev->flags); /* Remember original event and flags. */ + /* Add to epoll. */ + epev.events |= (EPOLLIN | EPOLLONESHOT); /* Always oneshot, by design pidfd(). */ + if (0 != epoll_ctl((int)tp_udata->tpt->io_fd, + EPOLL_CTL_ADD, tfd, &epev)) { + error = errno; + goto err_out_proc; + } + break; + } + return (0); } /* Read/Write events. */ @@ -818,40 +874,55 @@ tpt_loop(tpt_p tpt) { if (0 != (TP_F_DISPATCH & tpev_flags)) { /* Mark as disabled. */ tp_udata->tpdata |= TPDATA_F_DISABLED; } - if (TP_EV_TIMER == ev.event) { /* Timer. */ + + switch (ev.event) { + case TP_EV_READ: + case TP_EV_WRITE: + /* Read/write. */ + if (0 != (EPOLL_HUP & epev.events)) { + ev.flags |= TP_F_EOF; + } + if (0 != (EPOLLERR & epev.events)) { /* Try to get error code. */ + ev.flags |= TP_F_ERROR; + ev.fflags = errno; + optlen = sizeof(int); + if (0 == getsockopt((int)tp_udata->ident, + SOL_SOCKET, SO_ERROR, &itm, &optlen)) { + ev.fflags = itm; + } + if (0 == ev.fflags) { + ev.fflags = EINVAL; + } + } + if (0 != (TP_F_ONESHOT & tpev_flags)) { /* Onetime. */ + epoll_ctl((int)tpt->io_fd, EPOLL_CTL_DEL, + (int)tp_udata->ident, &epev); + tp_udata->tpdata = 0; + } + ev.data = UINT64_MAX; /* Transfer as many as you can. */ + //ioctl((int)tp_udata->ident, FIONREAD, &ev.data); + break; + case TP_EV_TIMER: /* Timer. */ tfd = TPDATA_TFD_GET(tp_udata->tpdata); itm = read(tfd, &ev.data, sizeof(uint64_t)); if (0 != (TP_F_ONESHOT & tpev_flags)) { /* Onetime. */ close(tfd); /* No need to epoll_ctl(EPOLL_CTL_DEL). */ tp_udata->tpdata = 0; } - tp_udata->cb_func(&ev, tp_udata); - continue; - } - /* Read/write. */ - ev.data = UINT64_MAX; /* Transfer as many as you can. */ - //ioctl((int)tp_udata->ident, FIONREAD, &ev.data); - if (0 != (EPOLL_HUP & epev.events)) { - ev.flags |= TP_F_EOF; - } - if (0 != (EPOLLERR & epev.events)) { /* Try to get error code. */ - ev.flags |= TP_F_ERROR; - ev.fflags = errno; - optlen = sizeof(int); - if (0 == getsockopt((int)tp_udata->ident, - SOL_SOCKET, SO_ERROR, &itm, &optlen)) { - ev.fflags = itm; - } - if (0 == ev.fflags) { - ev.fflags = EINVAL; - } - } - if (0 != (TP_F_ONESHOT & tpev_flags)) { /* Onetime. */ - epoll_ctl((int)tpt->io_fd, EPOLL_CTL_DEL, - (int)tp_udata->ident, &epev); + break; + case TP_EV_PROC: /* Process. */ + /* Read exit code. */ + itm = 0; + waitpid((pid_t)tp_udata->ident, &itm, WNOHANG); + ev.fflags = TP_FF_P_EXIT; + ev.data = (uint64_t)itm; + /* Close pidfd. */ + close(TPDATA_TFD_GET(tp_udata->tpdata)); /* No need to epoll_ctl(EPOLL_CTL_DEL). */ tp_udata->tpdata = 0; + break; } + /* Do callback. */ tp_udata->cb_func(&ev, tp_udata); } /* End Main loop. */ return; @@ -1476,6 +1547,14 @@ tpt_ev_validate(int op, tp_event_p ev, tp_udata_p tp_udata) { if (0 != (~(TP_FF_T_MASK) & ev->fflags)) return (EINVAL); /* Invalid fflags: some unknown bits is set. */ break; + case TP_EV_PROC: +#if defined(TP_F_EDGE) + if (0 != (TP_F_EDGE & ev->flags)) + return (EINVAL); /* Invalid flags. */ +#endif + if (0 != (~(TP_FF_P_MASK) & ev->fflags)) + return (EINVAL); /* Invalid fflags: some unknown bits is set. */ + break; default: return (EINVAL); /* Bad event. */ } diff --git a/tests/threadpool/main.c b/tests/threadpool/main.c index a6d0f22..e81bd7f 100644 --- a/tests/threadpool/main.c +++ b/tests/threadpool/main.c @@ -43,6 +43,8 @@ #include #include #include +#include +#include #include #include @@ -63,11 +65,15 @@ #define TEST_EV_CNT_MAX 12 #define TEST_TIMER_ID 36434632 /* Random value. */ #define TEST_TIMER_INTERVAL 30 +#define TEST_PROC_INTERVAL 1 /* sec */ +#define TEST_PROC_INTERVAL_STR "1" #define TEST_SLEEP_TIME_MS 1000 +extern char ** environ; static tp_p tp = NULL; static size_t threads_count; static int pipe_fd[2] = {-1, -1}; +static pid_t pid; static uint8_t thr_arr[(THREADS_COUNT_MAX + 4)]; static uint8_t thr_tls_arr[(THREADS_COUNT_MAX + 4)]; static size_t thr_flood_arr[(THREADS_COUNT_MAX + 4)]; @@ -120,6 +126,7 @@ static void test_tpt_ev_add_ex_tmr_dispatch(void); #ifdef TP_F_EDGE static void test_tpt_ev_add_ex_tmr_edge(void); #endif +static void test_tpt_ev_add_ex_proc_0(void); static void test_tp_pvt_msg_flood(void); @@ -189,6 +196,8 @@ main(int argc __unused, char *argv[] __unused) { #ifdef TP_F_EDGE NULL == CU_add_test(psuite, "test of tpt_ev_add_args(TP_EV_TIMER, TP_F_EDGE)", test_tpt_ev_add_ex_tmr_edge) || #endif + NULL == CU_add_test(psuite, "test of tpt_ev_add_args(TP_EV_PROC, 0)", test_tpt_ev_add_ex_proc_0) || + 0 || NULL == CU_add_test(psuite, "test of test_tp_pvt_msg_flood()", test_tp_pvt_msg_flood) || NULL == CU_add_test(psuite, "test of test_tp_destroy()", test_tp_destroy) || NULL == CU_add_test(psuite, "test of test_tp_tpt_hooks()", test_tp_tpt_hooks) || @@ -231,6 +240,8 @@ main(int argc __unused, char *argv[] __unused) { #ifdef TP_F_EDGE NULL == CU_add_test(psuite, "test of tpt_ev_add_args(TP_EV_TIMER, TP_F_EDGE)", test_tpt_ev_add_ex_tmr_edge) || #endif + NULL == CU_add_test(psuite, "test of tpt_ev_add_args(TP_EV_PROC, 0)", test_tpt_ev_add_ex_proc_0) || + 0 || NULL == CU_add_test(psuite, "test of test_tp_pvt_msg_flood()", test_tp_pvt_msg_flood) || NULL == CU_add_test(psuite, "test of test_tp_destroy()", test_tp_destroy) || NULL == CU_add_test(psuite, "test of test_tp_tpt_hooks()", test_tp_tpt_hooks) || @@ -875,6 +886,68 @@ test_tpt_ev_add_ex_tmr_edge(void) { } #endif + +static void +tpt_ev_add_proc_cb(tp_event_p ev, tp_udata_p tp_udata) { + + CU_ASSERT(TP_EV_PROC == ev->event) + CU_ASSERT(tpt_ev_add_proc_cb == tp_udata->cb_func) + CU_ASSERT((uintptr_t)pid == tp_udata->ident) + + if (TP_EV_PROC == ev->event && + tpt_ev_add_proc_cb == tp_udata->cb_func && + (uintptr_t)pid == tp_udata->ident) { + thr_arr[0] ++; + if (TEST_EV_CNT_MAX <= thr_arr[0]) { + tpt_ev_enable_args1(0, TP_EV_PROC, tp_udata); + } + } +} +static void +test_tpt_ev_add_ex_proc_0(void) { + tp_udata_t tp_udata; + posix_spawn_file_actions_t actions; + char *const argv[] = { "sleep", TEST_PROC_INTERVAL_STR, NULL, NULL }; + + /* Init. */ + thr_arr[0] = 0; + memset(&tp_udata, 0x00, sizeof(tp_udata)); + + /* Start process to track... */ + CU_ASSERT(0 == posix_spawn_file_actions_init(&actions)) + /* Do not pass all fd~s to child. */ +#ifdef HAVE_POSIX_SPAWN_FILE_ACTIONS_ADDCLOSEFROM_NP + CU_ASSERT(0 == posix_spawn_file_actions_addclosefrom_np(&actions, (STDERR_FILENO + 1))) +#elif defined(CLOSE_RANGE_CLOEXEC) + CU_ASSERT(-1 != close_range((STDERR_FILENO + 1), ~0U, CLOSE_RANGE_CLOEXEC)) +#else + for (int tfd = (STDERR_FILENO + 1); tfd < 1024; tfd ++) { + fcntl(tfd, F_SETFD, FD_CLOEXEC); + } +#endif + CU_ASSERT(0 == posix_spawnp(&pid, argv[0], &actions, NULL, argv, environ)) + posix_spawn_file_actions_destroy(&actions); + + + tp_udata.cb_func = tpt_ev_add_proc_cb; + tp_udata.ident = (uintptr_t)pid; + if (0 != tpt_ev_add_args(tp_thread_get(tp, 0), TP_EV_PROC, 0, + TP_FF_P_EXIT, 0, &tp_udata)) { + CU_FAIL("tpt_ev_add_args(TP_EV_PROC)") /* Fail. */ + tpt_ev_del_args1(TP_EV_PROC, &tp_udata); + return; /* Fail. */ + } + /* Wait for all threads process. */ + test_sleep((TEST_SLEEP_TIME_MS + ((TEST_PROC_INTERVAL * 1000) * 2))); + if (1 != thr_arr[0]) { + CU_FAIL("tpt_ev_add_args(TP_EV_PROC) - not work") /* Fail. */ + LOG_CONS_INFO_FMT("%i", (int)thr_arr[0]); + } + /* Clean. */ + CU_ASSERT(0 != tpt_ev_del_args1(TP_EV_PROC, &tp_udata)) +} + + static void msg_send_pvt_msg_flood_cb(tpt_p tpt __unused, void *udata) { size_t tpt_num = tpt_get_num(tpt_get_current()); /* Get real thread. */ diff --git a/tests/threadpool/test-threadpool.project b/tests/threadpool/test-threadpool.project index 8b4e5bd..fc24b02 100644 --- a/tests/threadpool/test-threadpool.project +++ b/tests/threadpool/test-threadpool.project @@ -10,7 +10,7 @@ - +