From 76e26e9c3d7ba0345762819e78a5df7e34145398 Mon Sep 17 00:00:00 2001 From: Qiu Chaofan Date: Mon, 26 Sep 2022 13:24:57 +0800 Subject: [PATCH] Support AIX operating system AIX doesn't have epoll or kqueue API. Instead, it includes a mechanism called 'pollset'. --- src/sys/unix/net.rs | 6 +- src/sys/unix/pipe.rs | 1 + src/sys/unix/selector/mod.rs | 7 ++ src/sys/unix/selector/pollset.rs | 163 +++++++++++++++++++++++++++++++ src/sys/unix/tcp.rs | 3 +- src/sys/unix/uds/listener.rs | 2 + src/sys/unix/uds/mod.rs | 6 +- src/sys/unix/waker.rs | 5 + 8 files changed, 187 insertions(+), 6 deletions(-) create mode 100644 src/sys/unix/selector/pollset.rs diff --git a/src/sys/unix/net.rs b/src/sys/unix/net.rs index 78f1387b1..6e83d7f10 100644 --- a/src/sys/unix/net.rs +++ b/src/sys/unix/net.rs @@ -96,7 +96,8 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_ target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "aix" ))] sin_len: 0, }; @@ -120,7 +121,8 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_ target_os = "ios", target_os = "macos", target_os = "netbsd", - target_os = "openbsd" + target_os = "openbsd", + target_os = "aix" ))] sin6_len: 0, #[cfg(target_os = "illumos")] diff --git a/src/sys/unix/pipe.rs b/src/sys/unix/pipe.rs index 7a95b9697..4c962a9d9 100644 --- a/src/sys/unix/pipe.rs +++ b/src/sys/unix/pipe.rs @@ -195,6 +195,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> { target_os = "macos", target_os = "illumos", target_os = "redox", + target_os = "aix", )))] compile_error!("unsupported target for `mio::unix::pipe`"); diff --git a/src/sys/unix/selector/mod.rs b/src/sys/unix/selector/mod.rs index 9ae4c1416..39d65dd78 100644 --- a/src/sys/unix/selector/mod.rs +++ b/src/sys/unix/selector/mod.rs @@ -34,6 +34,12 @@ mod kqueue; ))] pub(crate) use self::kqueue::{event, Event, Events, Selector}; +#[cfg(target_os = "aix")] +mod pollset; + +#[cfg(target_os = "aix")] +pub(crate) use self::pollset::{event, Event, Events, Selector}; + /// Lowest file descriptor used in `Selector::try_clone`. /// /// # Notes @@ -42,4 +48,5 @@ pub(crate) use self::kqueue::{event, Event, Events, Selector}; /// blindly assume this to be true, which means using any one of those a select /// could result in some interesting and unexpected errors. Avoid that by using /// an fd that doesn't have a pre-determined usage. +#[warn(unused)] const LOWEST_FD: libc::c_int = 3; diff --git a/src/sys/unix/selector/pollset.rs b/src/sys/unix/selector/pollset.rs new file mode 100644 index 000000000..bf14de45a --- /dev/null +++ b/src/sys/unix/selector/pollset.rs @@ -0,0 +1,163 @@ +use crate::{Interest, Token}; +use log::error; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::time::Duration; +use std::io; +#[cfg(debug_assertions)] +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; + +#[cfg(debug_assertions)] +static NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Debug)] +pub struct Selector { + #[cfg(debug_assertions)] + id: usize, + ps: RawFd, + #[cfg(debug_assertions)] + has_waker: AtomicBool, +} + +impl Selector { + pub fn new() -> io::Result { + syscall!(pollset_create(1024)).map(|ps| Selector { + #[cfg(debug_assertions)] + id: NEXT_ID.fetch_add(1, Ordering::Relaxed), + ps, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(false), + }) + } + + pub fn try_clone(&self) -> io::Result { + syscall!(fcntl(self.ps, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ps| Selector { + // It's the same selector, so we use the same id. + #[cfg(debug_assertions)] + id: self.id, + ps, + #[cfg(debug_assertions)] + has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)), + }) + } + + pub fn select(&self, events: &mut Events, timeout: Option) -> io::Result<()> { + events.clear(); + syscall!(pollset_poll( + self.ps, + events.as_mut_ptr(), + events.capacity() as i32, + -1, + )) + .map(|n_events| { + unsafe { events.set_len(n_events as usize) }; + }) + } + + pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + let mut control: [libc::poll_ctl; 1] = [libc::poll_ctl { + cmd: libc::PS_ADD as i16, + events: interests_to_pollset(interests), + fd: fd as i32, + }; 1]; + syscall!(pollset_ctl(self.ps, control.as_mut_ptr(), 1)).map(|_| ()) + } + + // TODO: PS_MOD or PS_REPLACE? + pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> { + let mut control: [libc::poll_ctl; 1] = [libc::poll_ctl { + cmd: libc::PS_MOD as i16, + events: interests_to_pollset(interests), + fd: fd as i32, + }; 1]; + syscall!(pollset_ctl(self.ps, control.as_mut_ptr(), 1)).map(|_| ()) + } + + pub fn deregister(&self, fd: RawFd) -> io::Result<()> { + let mut control: [libc::poll_ctl; 1] = [libc::poll_ctl { + cmd: libc::PS_DELETE as i16, + events: 0, + fd: fd as i32, + }; 1]; + syscall!(pollset_ctl(self.ps, control.as_mut_ptr(), 1)).map(|_| ()) + } + + #[cfg(debug_assertions)] + pub fn register_waker(&self) -> bool { + self.has_waker.swap(true, Ordering::AcqRel) + } +} + +impl AsRawFd for Selector { + fn as_raw_fd(&self) -> RawFd { + self.ps + } +} + +impl Drop for Selector { + fn drop(&mut self) { + if let Err(err) = syscall!(pollset_destroy(self.ps)) { + error!("error closing pollset: {}", err); + } + } +} + +fn interests_to_pollset(interests: Interest) -> i16 { + let mut kind: i16 = 0; + if interests.is_readable() { + kind |= libc::POLLIN; + } + if interests.is_writable() { + kind |= libc::POLLOUT; + } + kind +} + +pub type Event = libc::pollfd; +pub type Events = Vec; + +pub mod event { + use std::fmt; + + use crate::sys::Event; + use crate::Token; + + pub fn token(event: &Event) -> Token { + Token(event.fd as usize) + } + + pub fn is_readable(event: &Event) -> bool { + (event.revents & libc::POLLIN) != 0 + } + + pub fn is_writable(event: &Event) -> bool { + (event.revents & libc::POLLOUT) != 0 + } + + pub fn is_error(event: &Event) -> bool { + false + } + + pub fn is_read_closed(event: &Event) -> bool { + false + } + + pub fn is_write_closed(event: &Event) -> bool { + false + } + + pub fn is_priority(event: &Event) -> bool { + (event.revents & libc::POLLPRI) != 0 + } + + pub fn is_aio(_: &Event) -> bool { + false + } + + pub fn is_lio(_: &Event) -> bool { + false + } + + pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result { + unimplemented!() + } +} diff --git a/src/sys/unix/tcp.rs b/src/sys/unix/tcp.rs index c4d7e9469..616fb9bda 100644 --- a/src/sys/unix/tcp.rs +++ b/src/sys/unix/tcp.rs @@ -88,7 +88,8 @@ pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream, all(target_arch = "x86", target_os = "android"), target_os = "ios", target_os = "macos", - target_os = "redox" + target_os = "redox", + target_os = "aix", ))] let stream = { syscall!(accept( diff --git a/src/sys/unix/uds/listener.rs b/src/sys/unix/uds/listener.rs index 79bd14ee0..32662f60c 100644 --- a/src/sys/unix/uds/listener.rs +++ b/src/sys/unix/uds/listener.rs @@ -43,6 +43,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So target_os = "macos", target_os = "netbsd", target_os = "redox", + target_os = "aix", // Android x86's seccomp profile forbids calls to `accept4(2)` // See https://github.com/tokio-rs/mio/issues/1445 for details all( @@ -66,6 +67,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So target_os = "macos", target_os = "netbsd", target_os = "redox", + target_os = "aix", all(target_arch = "x86", target_os = "android") ))] let socket = syscall!(accept( diff --git a/src/sys/unix/uds/mod.rs b/src/sys/unix/uds/mod.rs index 526bbdfd0..ed6971093 100644 --- a/src/sys/unix/uds/mod.rs +++ b/src/sys/unix/uds/mod.rs @@ -77,20 +77,20 @@ cfg_os_poll! { fn pair(flags: libc::c_int) -> io::Result<(T, T)> where T: FromRawFd, { - #[cfg(not(any(target_os = "ios", target_os = "macos")))] + #[cfg(not(any(target_os = "ios", target_os = "macos", target_os = "aix")))] let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC; let mut fds = [-1; 2]; syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?; let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) }; - // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. + // Darwin and AIX don't have SOCK_NONBLOCK or SOCK_CLOEXEC. // // In order to set those flags, additional `fcntl` sys calls must be // performed. If a `fnctl` fails after the sockets have been created, // the file descriptors will leak. Creating `pair` above ensures that if // there is an error, the file descriptors are closed. - #[cfg(any(target_os = "ios", target_os = "macos"))] + #[cfg(any(target_os = "ios", target_os = "macos", target_os = "aix"))] { syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?; syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?; diff --git a/src/sys/unix/waker.rs b/src/sys/unix/waker.rs index de88e3181..2307266fb 100644 --- a/src/sys/unix/waker.rs +++ b/src/sys/unix/waker.rs @@ -104,6 +104,7 @@ pub use self::kqueue::Waker; target_os = "netbsd", target_os = "openbsd", target_os = "redox", + target_os = "aix", ))] mod pipe { use crate::sys::unix::Selector; @@ -126,7 +127,10 @@ mod pipe { impl Waker { pub fn new(selector: &Selector, token: Token) -> io::Result { let mut fds = [-1; 2]; + #[cfg(not(target_os = "aix"))] syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?; + #[cfg(target_os = "aix")] + syscall!(pipe(fds.as_mut_ptr()))?; // Turn the file descriptors into files first so we're ensured // they're closed when dropped, e.g. when register below fails. let sender = unsafe { File::from_raw_fd(fds[1]) }; @@ -176,5 +180,6 @@ mod pipe { target_os = "netbsd", target_os = "openbsd", target_os = "redox", + target_os = "aix", ))] pub use self::pipe::Waker;