diff --git a/lib_fiber/c/src/hook/fiber_read.c b/lib_fiber/c/src/hook/fiber_read.c index 5ebec9c17..eb7470fb4 100644 --- a/lib_fiber/c/src/hook/fiber_read.c +++ b/lib_fiber/c/src/hook/fiber_read.c @@ -6,6 +6,7 @@ #include "io.h" #if defined(HAS_IO_URING) + static int uring_wait_read(FILE_EVENT *fe) { while (1) { @@ -30,13 +31,16 @@ static int uring_wait_read(FILE_EVENT *fe) return fe->reader_ctx.res; } - err = acl_fiber_last_error(); + // Use the negative of res as the error. + err = -fe->reader_ctx.res; + acl_fiber_set_error(err); fiber_save_errno(err); if (!error_again(err)) { - if (!(fe->type & TYPE_EVENTABLE)) { - fiber_file_free(fe); - } + // Don't free fe here, which will be freed in close. + //if (!(fe->type & TYPE_EVENTABLE)) { + // fiber_file_free(fe); + //} return -1; } } @@ -60,9 +64,9 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) return iocp_wait_read(fe); } -#endif // HAS_IO_URING -#if defined(HAS_IOCP) +#elif defined(HAS_IOCP) + static int iocp_wait_read(FILE_EVENT *fe) { while (1) { @@ -122,7 +126,8 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) fe->res = 0; return iocp_wait_read(fe); } -#endif // HAS_IOCP + +#endif // HAS_IOCP || HAS_IO_URING // After calling fiber_wait_read(): // The fiber_wait_read will return three status: @@ -148,7 +153,8 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) // not monitor the file fd in fiber_wait_read. #if defined(_WIN32) || defined(_WIN64) -#define FIBER_READ(_fn, _fe, ...) do { \ + +# define FIBER_READ(_fn, _fe, ...) do { \ ssize_t ret; \ int err; \ if (IS_READABLE((_fe))) { \ @@ -172,8 +178,10 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) return -1; \ } \ } while (1) + #else -#define FIBER_READ(_fn, _fe, _args...) do { \ + +# define FIBER_READ(_fn, _fe, _args...) do { \ ssize_t ret; \ int err; \ if (IS_READABLE((_fe))) { \ @@ -197,6 +205,7 @@ int fiber_iocp_read(FILE_EVENT *fe, char *buf, int len) return -1; \ } \ } while (1) + #endif #define FILE_ALLOC(f, t, fd) do { \ @@ -212,18 +221,18 @@ ssize_t fiber_read(FILE_EVENT *fe, void *buf, size_t count) CLR_POLLING(fe); #ifdef HAS_IO_URING - // One FILE_EVENT can be used by multiple fibers with the same - // EVENT_BUSY_READ or EVENT_BUSY_WRITE in the same time. But can be - // used by two fibers that one is a reader and the other is a writer, - // because there're two different objects for reader and writer. - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_READ(f) do { \ +# define SET_READ(f) do { \ (f)->in.read_ctx.buf = buf; \ (f)->in.read_ctx.len = (int) count; \ (f)->mask |= EVENT_READ; \ } while (0) + // One FILE_EVENT can be used by multiple fibers with the same + // EVENT_BUSY_READ or EVENT_BUSY_WRITE in the same time. But can be + // used by two fibers that one is a reader and the other is a writer, + // because there're two different objects for reader and writer. + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_READ)) { @@ -251,15 +260,15 @@ ssize_t fiber_readv(FILE_EVENT *fe, const struct iovec *iov, int iovcnt) CLR_POLLING(fe); #ifdef HAS_IO_URING - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_READV(f) do { \ +# define SET_READV(f) do { \ (f)->in.readv_ctx.iov = iov; \ (f)->in.readv_ctx.cnt = iovcnt; \ (f)->in.readv_ctx.off = 0; \ (f)->mask |= EVENT_READV; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_READ)) { @@ -287,14 +296,14 @@ ssize_t fiber_recvmsg(FILE_EVENT *fe, struct msghdr *msg, int flags) CLR_POLLING(fe); #ifdef HAS_IO_URING - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_RECVMSG(f) do { \ +# define SET_RECVMSG(f) do { \ (f)->in.recvmsg_ctx.msg = msg; \ (f)->in.recvmsg_ctx.flags = flags; \ (f)->mask |= EVENT_RECVMSG; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_READ)) { @@ -318,6 +327,7 @@ ssize_t fiber_recvmsg(FILE_EVENT *fe, struct msghdr *msg, int flags) } # ifdef HAS_MMSG + ssize_t fiber_recvmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec, unsigned int vlen, int flags, const struct timespec *timeout) { @@ -336,6 +346,7 @@ ssize_t fiber_recvmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec, FIBER_READ(sys_recvmmsg, fe, msgvec, vlen, flags, NULL); } + # endif // HAS_MMSG #endif // SYS_UNIX @@ -353,15 +364,15 @@ ssize_t fiber_recv(FILE_EVENT *fe, void *buf, size_t len, int flags) # endif } #elif defined(HAS_IO_URING) - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_RECV(f) do { \ +# define SET_RECV(f) do { \ (f)->in.recv_ctx.buf = buf; \ (f)->in.recv_ctx.len = (unsigned) len; \ (f)->in.recv_ctx.flags = flags; \ (f)->mask |= EVENT_RECV; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_READ)) { @@ -398,9 +409,8 @@ ssize_t fiber_recvfrom(FILE_EVENT *fe, void *buf, size_t len, return fiber_iocp_read(fe, buf, (int) len); } #elif defined(HAS_IO_URING) && defined(IO_URING_HAS_RECVFROM) - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_RECVFROM(f) do { \ +# define SET_RECVFROM(f) do { \ (f)->in.recvfrom_ctx.buf = buf; \ (f)->in.recvfrom_ctx.len = (unsigned) len; \ (f)->in.recvfrom_ctx.flags = flags; \ @@ -409,6 +419,7 @@ ssize_t fiber_recvfrom(FILE_EVENT *fe, void *buf, size_t len, (f)->mask |= EVENT_RECVFROM; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_READ)) { diff --git a/lib_fiber/c/src/hook/fiber_write.c b/lib_fiber/c/src/hook/fiber_write.c index 268ced1a8..6bec73855 100644 --- a/lib_fiber/c/src/hook/fiber_write.c +++ b/lib_fiber/c/src/hook/fiber_write.c @@ -10,6 +10,7 @@ // from connecting process. #if defined(HAS_IO_URING) + # define CHECK_SET_NBLOCK(_fd) do { \ if (var_hook_sys_api && !EVENT_IS_IO_URING(fiber_io_event())) { \ FILE_EVENT *_fe = fiber_file_get(_fd); \ @@ -19,7 +20,9 @@ } \ } \ } while (0) + #else + # define CHECK_SET_NBLOCK(_fd) do { \ if (var_hook_sys_api) { \ FILE_EVENT *_fe = fiber_file_get(_fd); \ @@ -29,7 +32,8 @@ } \ } \ } while (0) -#endif + +#endif // HAS_IO_URING static int wait_write(FILE_EVENT *fe) { @@ -52,7 +56,8 @@ static int wait_write(FILE_EVENT *fe) } #if defined(HAS_IO_URING) -static int iocp_wait_write(FILE_EVENT *fe) + +static int uring_wait_write(FILE_EVENT *fe) { while (1) { int err; @@ -73,22 +78,22 @@ static int iocp_wait_write(FILE_EVENT *fe) fiber_save_errno(err); if (!error_again(err)) { - if (!(fe->type & TYPE_EVENTABLE)) { - fiber_file_free(fe); - } + // Don't free fe here, which will be freed in close. + //if (!(fe->type & TYPE_EVENTABLE)) { + // fiber_file_free(fe); + //} return -1; } } } -#endif -#if defined(HAS_IO_URING) -int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len) +int fiber_uring_write(FILE_EVENT *fe, const char *buf, int len) { fe->out.write_ctx.buf = buf; fe->out.write_ctx.len = len; - return iocp_wait_write(fe); + return uring_wait_write(fe); } + #endif // HAS_IO_URING #define CHECK_WRITE_RESULT(_fe, _n) do { \ @@ -112,34 +117,34 @@ int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len) (__fe)->type = TYPE_EVENTABLE; \ } while (0) - #ifdef SYS_UNIX + ssize_t fiber_write(FILE_EVENT *fe, const void *buf, size_t count) { CLR_POLLING(fe); #if defined(HAS_IO_URING) - if (EVENT_IS_IO_URING(fiber_io_event()) && !(fe->mask & EVENT_SYSIO)) { -#define SET_WRITE(f) do { \ +# define SET_WRITE(f) do { \ (f)->out.write_ctx.buf = buf; \ (f)->out.write_ctx.len = (unsigned) count; \ (f)->mask |= EVENT_WRITE; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event()) && !(fe->mask & EVENT_SYSIO)) { int ret; if (!(fe->busy & EVENT_BUSY_WRITE)) { SET_WRITE(fe); fe->busy |= EVENT_BUSY_WRITE; - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); fe->busy &= ~EVENT_BUSY_WRITE; } else { FILE_ALLOC(fe, 0, fe->fd); SET_WRITE(fe); - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); file_event_unrefer(fe); } return ret; @@ -164,28 +169,28 @@ ssize_t fiber_writev(FILE_EVENT *fe, const struct iovec *iov, int iovcnt) CLR_POLLING(fe); #if defined(HAS_IO_URING) - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_WRITEV(f) do { \ +# define SET_WRITEV(f) do { \ (f)->out.writev_ctx.iov = iov; \ (f)->out.writev_ctx.cnt = iovcnt; \ (f)->out.writev_ctx.off = 0; \ (f)->mask |= EVENT_WRITEV; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_WRITE)) { SET_WRITEV(fe); fe->busy |= EVENT_BUSY_WRITE; - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); fe->busy &= ~EVENT_BUSY_WRITE; } else { FILE_ALLOC(fe, 0, fe->fd); SET_WRITEV(fe); - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); file_event_unrefer(fe); } return ret; @@ -211,28 +216,28 @@ ssize_t fiber_send(FILE_EVENT *fe, const void *buf, size_t len, int flags) CLR_POLLING(fe); #if defined(HAS_IO_URING) - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_SEND(f) do { \ +# define SET_SEND(f) do { \ (f)->out.send_ctx.buf = buf; \ (f)->out.send_ctx.len = (unsigned) len; \ (f)->out.send_ctx.flags = flags; \ (f)->mask |= EVENT_SEND; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_WRITE)) { SET_SEND(fe); fe->busy |= EVENT_BUSY_WRITE; - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); fe->busy &= ~EVENT_BUSY_WRITE; } else { FILE_ALLOC(fe, 0, fe->fd); SET_SEND(fe); - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); file_event_unrefer(fe); } return ret; @@ -262,9 +267,8 @@ ssize_t fiber_sendto(FILE_EVENT *fe, const void *buf, size_t len, CLR_POLLING(fe); #if defined(HAS_IO_URING) && defined(IO_URING_HAS_SENDTO) - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_SENDTO(f) do { \ +# define SET_SENDTO(f) do { \ (f)->out.sendto_ctx.buf = buf; \ (f)->out.sendto_ctx.len = (unsigned) len; \ (f)->out.sendto_ctx.flags = flags; \ @@ -273,19 +277,20 @@ ssize_t fiber_sendto(FILE_EVENT *fe, const void *buf, size_t len, (f)->mask |= EVENT_SENDTO; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_WRITE)) { SET_SENDTO(fe); fe->busy |= EVENT_BUSY_WRITE; - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); fe->busy &= ~EVENT_BUSY_WRITE; } else { FILE_ALLOC(fe, 0, fe->fd); SET_SENDTO(fe); - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); file_event_unrefer(fe); } return ret; @@ -312,32 +317,33 @@ ssize_t fiber_sendto(FILE_EVENT *fe, const void *buf, size_t len, } #ifdef SYS_UNIX + ssize_t fiber_sendmsg(FILE_EVENT *fe, const struct msghdr *msg, int flags) { CLR_POLLING(fe); #if defined(HAS_IO_URING) - if (EVENT_IS_IO_URING(fiber_io_event())) { -#define SET_SENDMSG(f) do { \ +# define SET_SENDMSG(f) do { \ (f)->out.sendmsg_ctx.msg = msg; \ (f)->out.sendmsg_ctx.flags = flags; \ (f)->mask |= EVENT_SENDMSG; \ } while (0) + if (EVENT_IS_IO_URING(fiber_io_event())) { int ret; if (!(fe->busy & EVENT_BUSY_WRITE)) { SET_SENDMSG(fe); fe->busy |= EVENT_BUSY_WRITE; - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); fe->busy &= ~EVENT_BUSY_WRITE; } else { FILE_ALLOC(fe, 0, fe->fd); SET_SENDMSG(fe); - ret = iocp_wait_write(fe); + ret = uring_wait_write(fe); file_event_unrefer(fe); } return ret; @@ -358,6 +364,7 @@ ssize_t fiber_sendmsg(FILE_EVENT *fe, const struct msghdr *msg, int flags) } # ifdef HAS_MMSG + int fiber_sendmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec, unsigned int vlen, int flags) { diff --git a/lib_fiber/c/src/hook/file.c b/lib_fiber/c/src/hook/file.c index fcc0798ad..353eab068 100644 --- a/lib_fiber/c/src/hook/file.c +++ b/lib_fiber/c/src/hook/file.c @@ -504,7 +504,7 @@ ssize_t pwrite(int fd, const void *buf, size_t count, off_t offset) fe->fd = fd; fe->out.write_ctx.off = offset; - ret = fiber_iocp_write(fe, buf, (int) count); + ret = fiber_uring_write(fe, buf, (int) count); file_event_unrefer(fe); return ret; diff --git a/lib_fiber/c/src/hook/io.h b/lib_fiber/c/src/hook/io.h index 9e0001200..451ec1cb9 100644 --- a/lib_fiber/c/src/hook/io.h +++ b/lib_fiber/c/src/hook/io.h @@ -34,7 +34,7 @@ ssize_t fiber_recvmmsg(FILE_EVENT *fe, struct mmsghdr *msgvec, # endif // in fiber_write.c -int fiber_iocp_write(FILE_EVENT *fe, const char *buf, int len); +int fiber_uring_write(FILE_EVENT *fe, const char *buf, int len); ssize_t fiber_write(FILE_EVENT *fe, const void *buf, size_t count); ssize_t fiber_writev(FILE_EVENT *fe, const struct iovec *iov, int iovcnt); diff --git a/lib_fiber/samples-c/Makefile.in b/lib_fiber/samples-c/Makefile.in index 53d71743e..c4198f1bf 100644 --- a/lib_fiber/samples-c/Makefile.in +++ b/lib_fiber/samples-c/Makefile.in @@ -83,10 +83,10 @@ ifeq ($(findstring Linux, $(UNIXNAME)), Linux) # CFLAGS += -DLINUX2 SYSLIB += -lcrypt -rdynamic -ldl -lrt ifeq ($(HAS_IO_URING), yes) - SYSLIB += -luring-ffi + SYSLIB += -luring-ffi -Wl,-rpath,/usr/lib endif ifeq ($(has_io_uring), yes) - SYSLIB += -luring-ffi + SYSLIB += -luring-ffi -Wl,-rpath,/usr/lib endif ifeq ($(DEBUG_STACK), yes) SYSLIB += -lunwind -lunwind-generic diff --git a/lib_fiber/samples-c/client2/main.c b/lib_fiber/samples-c/client2/main.c index 92d5b5065..ac979de83 100644 --- a/lib_fiber/samples-c/client2/main.c +++ b/lib_fiber/samples-c/client2/main.c @@ -160,7 +160,7 @@ static void echo_client(SOCKET fd) ret = read(fd, buf, BUF_SIZE - 1); #endif if (ret <= 0) { - printf("read error: %s\r\n", acl_last_serror()); + printf("read error: %s, fd=%d\r\n", acl_last_serror(), fd); break; }