Skip to content

Commit

Permalink
Support AIX operating system
Browse files Browse the repository at this point in the history
AIX doesn't have epoll or kqueue API. Instead, it includes a mechanism
called 'pollset'.
  • Loading branch information
ecnelises committed Sep 26, 2022
1 parent 26941af commit e2000b7
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 6 deletions.
6 changes: 4 additions & 2 deletions src/sys/unix/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
target_os = "openbsd",
target_os = "aix"
))]
sin_len: 0,
};
Expand All @@ -120,7 +121,8 @@ pub(crate) fn socket_addr(addr: &SocketAddr) -> (SocketAddrCRepr, libc::socklen_
target_os = "ios",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd"
target_os = "openbsd",
target_os = "aix"
))]
sin6_len: 0,
#[cfg(target_os = "illumos")]
Expand Down
1 change: 1 addition & 0 deletions src/sys/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ pub fn new() -> io::Result<(Sender, Receiver)> {
target_os = "macos",
target_os = "illumos",
target_os = "redox",
target_os = "aix",
)))]
compile_error!("unsupported target for `mio::unix::pipe`");

Expand Down
7 changes: 7 additions & 0 deletions src/sys/unix/selector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ mod kqueue;
))]
pub(crate) use self::kqueue::{event, Event, Events, Selector};

#[cfg(target_os = "aix")]
mod pollset;

#[cfg(target_os = "aix")]
pub(crate) use self::pollset::{event, Event, Events, Selector};

/// Lowest file descriptor used in `Selector::try_clone`.
///
/// # Notes
Expand All @@ -42,4 +48,5 @@ pub(crate) use self::kqueue::{event, Event, Events, Selector};
/// blindly assume this to be true, which means using any one of those a select
/// could result in some interesting and unexpected errors. Avoid that by using
/// an fd that doesn't have a pre-determined usage.
#[warn(unused)]
const LOWEST_FD: libc::c_int = 3;
163 changes: 163 additions & 0 deletions src/sys/unix/selector/pollset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use crate::{Interest, Token};
use log::error;
use std::os::unix::io::{AsRawFd, RawFd};
use std::time::Duration;
use std::io;
#[cfg(debug_assertions)]
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};

#[cfg(debug_assertions)]
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);

#[derive(Debug)]
pub struct Selector {
#[cfg(debug_assertions)]
id: usize,
ps: RawFd,
#[cfg(debug_assertions)]
has_waker: AtomicBool,
}

impl Selector {
pub fn new() -> io::Result<Selector> {
syscall!(pollset_create(1024)).map(|ps| Selector {
#[cfg(debug_assertions)]
id: NEXT_ID.fetch_add(1, Ordering::Relaxed),
ps,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(false),
})
}

pub fn try_clone(&self) -> io::Result<Selector> {
syscall!(fcntl(self.ps, libc::F_DUPFD_CLOEXEC, super::LOWEST_FD)).map(|ps| Selector {
// It's the same selector, so we use the same id.
#[cfg(debug_assertions)]
id: self.id,
ps,
#[cfg(debug_assertions)]
has_waker: AtomicBool::new(self.has_waker.load(Ordering::Acquire)),
})
}

pub fn select(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
events.clear();
syscall!(pollset_poll(
self.ps,
events.as_mut_ptr(),
events.capacity() as i32,
-1,
))
.map(|n_events| {
unsafe { events.set_len(n_events as usize) };
})
}

pub fn register(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
let mut control: [libc::poll_ctl; 1] = [libc::poll_ctl {
cmd: libc::PS_ADD as i16,
events: interests_to_pollset(interests),
fd: fd as i32,
}; 1];
syscall!(pollset_ctl(self.ps, control.as_mut_ptr(), 1)).map(|_| ())
}

// TODO: PS_MOD or PS_REPLACE?
pub fn reregister(&self, fd: RawFd, token: Token, interests: Interest) -> io::Result<()> {
let mut control: [libc::poll_ctl; 1] = [libc::poll_ctl {
cmd: libc::PS_MOD as i16,
events: interests_to_pollset(interests),
fd: fd as i32,
}; 1];
syscall!(pollset_ctl(self.ps, control.as_mut_ptr(), 1)).map(|_| ())
}

pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
let mut control: [libc::poll_ctl; 1] = [libc::poll_ctl {
cmd: libc::PS_DELETE as i16,
events: 0,
fd: fd as i32,
}; 1];
syscall!(pollset_ctl(self.ps, control.as_mut_ptr(), 1)).map(|_| ())
}

#[cfg(debug_assertions)]
pub fn register_waker(&self) -> bool {
self.has_waker.swap(true, Ordering::AcqRel)
}
}

impl AsRawFd for Selector {
fn as_raw_fd(&self) -> RawFd {
self.ps
}
}

impl Drop for Selector {
fn drop(&mut self) {
if let Err(err) = syscall!(pollset_destroy(self.ps)) {
error!("error closing pollset: {}", err);
}
}
}

fn interests_to_pollset(interests: Interest) -> i16 {
let mut kind: i16 = 0;
if interests.is_readable() {
kind |= libc::POLLIN;
}
if interests.is_writable() {
kind |= libc::POLLOUT;
}
kind
}

pub type Event = libc::pollfd;
pub type Events = Vec<Event>;

pub mod event {
use std::fmt;

use crate::sys::Event;
use crate::Token;

pub fn token(event: &Event) -> Token {
Token(event.fd as usize)
}

pub fn is_readable(event: &Event) -> bool {
(event.revents & libc::POLLIN) != 0
}

pub fn is_writable(event: &Event) -> bool {
(event.revents & libc::POLLOUT) != 0
}

pub fn is_error(event: &Event) -> bool {
false
}

pub fn is_read_closed(event: &Event) -> bool {
false
}

pub fn is_write_closed(event: &Event) -> bool {
false
}

pub fn is_priority(event: &Event) -> bool {
(event.revents & libc::POLLPRI) != 0
}

pub fn is_aio(_: &Event) -> bool {
false
}

pub fn is_lio(_: &Event) -> bool {
false
}

pub fn debug_details(f: &mut fmt::Formatter<'_>, event: &Event) -> fmt::Result {
unimplemented!()
}
}
3 changes: 2 additions & 1 deletion src/sys/unix/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ pub(crate) fn accept(listener: &net::TcpListener) -> io::Result<(net::TcpStream,
all(target_arch = "x86", target_os = "android"),
target_os = "ios",
target_os = "macos",
target_os = "redox"
target_os = "redox",
target_os = "aix",
))]
let stream = {
syscall!(accept(
Expand Down
2 changes: 2 additions & 0 deletions src/sys/unix/uds/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
target_os = "macos",
target_os = "netbsd",
target_os = "redox",
target_os = "aix",
// Android x86's seccomp profile forbids calls to `accept4(2)`
// See https://github.com/tokio-rs/mio/issues/1445 for details
all(
Expand All @@ -66,6 +67,7 @@ pub(crate) fn accept(listener: &net::UnixListener) -> io::Result<(UnixStream, So
target_os = "macos",
target_os = "netbsd",
target_os = "redox",
target_os = "aix",
all(target_arch = "x86", target_os = "android")
))]
let socket = syscall!(accept(
Expand Down
6 changes: 3 additions & 3 deletions src/sys/unix/uds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,20 +77,20 @@ cfg_os_poll! {
fn pair<T>(flags: libc::c_int) -> io::Result<(T, T)>
where T: FromRawFd,
{
#[cfg(not(any(target_os = "ios", target_os = "macos")))]
#[cfg(not(any(target_os = "ios", target_os = "macos", target_os = "aix")))]
let flags = flags | libc::SOCK_NONBLOCK | libc::SOCK_CLOEXEC;

let mut fds = [-1; 2];
syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?;
let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) };

// Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC.
// Darwin and AIX don't have SOCK_NONBLOCK or SOCK_CLOEXEC.
//
// In order to set those flags, additional `fcntl` sys calls must be
// performed. If a `fnctl` fails after the sockets have been created,
// the file descriptors will leak. Creating `pair` above ensures that if
// there is an error, the file descriptors are closed.
#[cfg(any(target_os = "ios", target_os = "macos"))]
#[cfg(any(target_os = "ios", target_os = "macos", target_os = "aix"))]
{
syscall!(fcntl(fds[0], libc::F_SETFL, libc::O_NONBLOCK))?;
syscall!(fcntl(fds[0], libc::F_SETFD, libc::FD_CLOEXEC))?;
Expand Down
5 changes: 5 additions & 0 deletions src/sys/unix/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub use self::kqueue::Waker;
target_os = "netbsd",
target_os = "openbsd",
target_os = "redox",
target_os = "aix",
))]
mod pipe {
use crate::sys::unix::Selector;
Expand All @@ -126,7 +127,10 @@ mod pipe {
impl Waker {
pub fn new(selector: &Selector, token: Token) -> io::Result<Waker> {
let mut fds = [-1; 2];
#[cfg(not(target_os = "aix"))]
syscall!(pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC))?;
#[cfg(target_os = "aix")]
syscall!(pipe(fds.as_mut_ptr()))?;
// Turn the file descriptors into files first so we're ensured
// they're closed when dropped, e.g. when register below fails.
let sender = unsafe { File::from_raw_fd(fds[1]) };
Expand Down Expand Up @@ -176,5 +180,6 @@ mod pipe {
target_os = "netbsd",
target_os = "openbsd",
target_os = "redox",
target_os = "aix",
))]
pub use self::pipe::Waker;

0 comments on commit e2000b7

Please sign in to comment.