Skip to content

Commit

Permalink
feat: support asyncfd and refactor uring based poller
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah committed Jun 7, 2024
1 parent 1344092 commit e34d058
Show file tree
Hide file tree
Showing 37 changed files with 920 additions and 432 deletions.
117 changes: 33 additions & 84 deletions monoio/src/driver/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use std::{

use super::{
op::{CompletionMeta, Op, OpAble},
poll::Poll as LegacyPoll,
ready::{self, Ready},
scheduled_io::ScheduledIo,
Driver, Inner, CURRENT,
};
use crate::utils::slab::Slab;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(windows)]
Expand All @@ -26,15 +25,7 @@ mod waker;
pub(crate) use waker::UnparkHandle;

pub(crate) struct LegacyInner {
pub(crate) io_dispatch: Slab<ScheduledIo>,
#[cfg(unix)]
events: mio::Events,
#[cfg(unix)]
poll: mio::Poll,
#[cfg(windows)]
events: iocp::Events,
#[cfg(windows)]
poll: iocp::Poller,
pub(crate) poller: LegacyPoll,

#[cfg(feature = "sync")]
shared_waker: std::sync::Arc<waker::EventWaker>,
Expand Down Expand Up @@ -66,14 +57,10 @@ impl LegacyDriver {
}

pub(crate) fn new_with_entries(entries: u32) -> io::Result<Self> {
#[cfg(unix)]
let poll = mio::Poll::new()?;
#[cfg(windows)]
let poll = iocp::Poller::new()?;

let poller = LegacyPoll::with_capacity(entries as usize)?;
#[cfg(all(unix, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
poll.registry(),
poller.poll.registry(),
TOKEN_WAKEUP,
)?));
#[cfg(all(windows, feature = "sync"))]
Expand All @@ -87,15 +74,7 @@ impl LegacyDriver {
let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id);

let inner = LegacyInner {
io_dispatch: Slab::new(),
#[cfg(unix)]
events: mio::Events::with_capacity(entries as usize),
#[cfg(unix)]
poll,
#[cfg(windows)]
events: iocp::Events::with_capacity(entries as usize),
#[cfg(windows)]
poll,
poller,
#[cfg(feature = "sync")]
shared_waker,
#[cfg(feature = "sync")]
Expand Down Expand Up @@ -150,13 +129,12 @@ impl LegacyDriver {
timeout = Some(Duration::ZERO);
}

// here we borrow 2 mut self, but its safe.
let events = unsafe { &mut (*self.inner.get()).events };
match inner.poll.poll(events, timeout) {
match inner.poller.poll_inside(timeout) {
Ok(_) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
let events = &inner.poller.events;
#[cfg(unix)]
let iter = events.iter();
#[cfg(windows)]
Expand All @@ -166,103 +144,69 @@ impl LegacyDriver {

#[cfg(feature = "sync")]
if token != TOKEN_WAKEUP {
inner.dispatch(token, Ready::from_mio(event));
LegacyPoll::dispatch(
&mut inner.poller.io_dispatch,
token.0,
Ready::from_mio(event),
);
}

#[cfg(not(feature = "sync"))]
inner.dispatch(token, Ready::from_mio(event));
LegacyPoll::dispatch(
&mut inner.poller.io_dispatch,
token.0,
Ready::from_mio(event),
);
}
Ok(())
}

#[cfg(windows)]
#[inline]
pub(crate) fn register(
this: &Rc<UnsafeCell<LegacyInner>>,
state: &mut iocp::SocketState,
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
let io = ScheduledIo::default();
let token = inner.io_dispatch.insert(io);

match inner.poll.register(state, mio::Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
inner.io_dispatch.remove(token);
Err(e)
}
}
inner.poller.register(state, interest)
}

#[cfg(windows)]
#[inline]
pub(crate) fn deregister(
this: &Rc<UnsafeCell<LegacyInner>>,
token: usize,
state: &mut iocp::SocketState,
) -> io::Result<()> {
let inner = unsafe { &mut *this.get() };

// try to deregister fd first, on success we will remove it from slab.
match inner.poll.deregister(state) {
Ok(_) => {
inner.io_dispatch.remove(token);
Ok(())
}
Err(e) => Err(e),
}
inner.poller.deregister(token, state)
}

#[cfg(unix)]
#[inline]
pub(crate) fn register(
this: &Rc<UnsafeCell<LegacyInner>>,
source: &mut impl mio::event::Source,
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
let token = inner.io_dispatch.insert(ScheduledIo::new());

let registry = inner.poll.registry();
match registry.register(source, mio::Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
inner.io_dispatch.remove(token);
Err(e)
}
}
inner.poller.register(source, interest)
}

#[cfg(unix)]
#[inline]
pub(crate) fn deregister(
this: &Rc<UnsafeCell<LegacyInner>>,
token: usize,
source: &mut impl mio::event::Source,
) -> io::Result<()> {
let inner = unsafe { &mut *this.get() };

// try to deregister fd first, on success we will remove it from slab.
match inner.poll.registry().deregister(source) {
Ok(_) => {
inner.io_dispatch.remove(token);
Ok(())
}
Err(e) => Err(e),
}
inner.poller.deregister(source, token)
}
}

impl LegacyInner {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let mut sio = match self.io_dispatch.get(token.0) {
Some(io) => io,
None => {
return;
}
};
let ref_mut = sio.as_mut();
ref_mut.set_readiness(|curr| curr | ready);
ref_mut.wake(ready);
}

pub(crate) fn poll_op<T: OpAble>(
this: &Rc<UnsafeCell<Self>>,
data: &mut T,
Expand All @@ -282,7 +226,11 @@ impl LegacyInner {
};

// wait io ready and do syscall
let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost");
let mut scheduled_io = inner
.poller
.io_dispatch
.get(index)
.expect("scheduled_io lost");
let ref_mut = scheduled_io.as_mut();

let readiness = ready!(ref_mut.poll_readiness(cx, direction));
Expand Down Expand Up @@ -316,15 +264,16 @@ impl LegacyInner {

pub(crate) fn cancel_op(
this: &Rc<UnsafeCell<LegacyInner>>,
index: usize,
token: usize,
direction: ready::Direction,
) {
let inner = unsafe { &mut *this.get() };
let ready = match direction {
ready::Direction::Read => Ready::READ_CANCELED,
ready::Direction::Write => Ready::WRITE_CANCELED,
ready::Direction::ReadOrWrite => Ready::CANCELED,
};
inner.dispatch(mio::Token(index), ready);
LegacyPoll::dispatch(&mut inner.poller.io_dispatch, token, ready);
}

pub(crate) fn submit_with_data<T>(
Expand Down
2 changes: 1 addition & 1 deletion monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// Monoio Driver.
#[allow(dead_code)]
pub(crate) mod op;
#[cfg(all(feature = "poll-io", unix))]
#[cfg(any(feature = "legacy", feature = "poll-io"))]
pub(crate) mod poll;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
pub(crate) mod ready;
Expand Down
2 changes: 1 addition & 1 deletion monoio/src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod accept;
mod connect;
mod fsync;
mod open;
mod poll;
pub(crate) mod poll;
mod read;
mod recv;
mod send;
Expand Down
34 changes: 19 additions & 15 deletions monoio/src/driver/op/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl OpAble for Accept {
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;

syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET)
unsafe { syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET) }
}

#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))]
Expand All @@ -102,12 +102,14 @@ impl OpAble for Accept {
target_os = "netbsd",
target_os = "openbsd"
))]
return syscall_u32!(accept4(
fd,
addr,
len,
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
));
return unsafe {
syscall_u32!(accept4(
fd,
addr,
len,
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
))
};

// But not all platforms have the `accept4(2)` call. Luckily BSD (derived)
// OSes inherit the non-blocking flag from the listener, so we just have to
Expand All @@ -119,14 +121,16 @@ impl OpAble for Accept {
target_os = "redox"
))]
return {
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
.map_err(|e| {
let _ = syscall_u32!(close(stream_fd));
e
})?;
Ok(stream_fd as _)
unsafe {
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
.map_err(|e| {
let _ = syscall_u32!(close(stream_fd));
e
})?;
Ok(stream_fd as _)
}
};
}
}
4 changes: 2 additions & 2 deletions monoio/src/driver/op/close.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ impl OpAble for Close {
#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32> {
#[cfg(unix)]
return crate::syscall_u32!(close(self.fd));
return unsafe { crate::syscall_u32!(close(self.fd)) };

#[cfg(windows)]
return syscall!(closesocket(self.fd as _), PartialEq::ne, 0);
return unsafe { syscall!(closesocket(self.fd as _), PartialEq::ne, 0) };
}
}
46 changes: 26 additions & 20 deletions monoio/src/driver/op/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,31 @@ impl OpAble for Connect {
endpoints.sae_dstaddr = self.socket_addr.as_ptr();
endpoints.sae_dstaddrlen = self.socket_addr_len;

return match crate::syscall_u32!(connectx(
self.fd.raw_fd(),
&endpoints as *const _,
libc::SAE_ASSOCID_ANY,
libc::CONNECT_DATA_IDEMPOTENT | libc::CONNECT_RESUME_ON_READ_WRITE,
std::ptr::null(),
0,
std::ptr::null_mut(),
std::ptr::null_mut(),
)) {
return match unsafe {
crate::syscall_u32!(connectx(
self.fd.raw_fd(),
&endpoints as *const _,
libc::SAE_ASSOCID_ANY,
libc::CONNECT_DATA_IDEMPOTENT | libc::CONNECT_RESUME_ON_READ_WRITE,
std::ptr::null(),
0,
std::ptr::null_mut(),
std::ptr::null_mut(),
))
} {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
};
}

#[cfg(unix)]
match crate::syscall_u32!(connect(
self.fd.raw_fd(),
self.socket_addr.as_ptr(),
self.socket_addr_len,
)) {
match unsafe {
crate::syscall_u32!(connect(
self.fd.raw_fd(),
self.socket_addr.as_ptr(),
self.socket_addr_len,
))
} {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
}
Expand Down Expand Up @@ -159,11 +163,13 @@ impl OpAble for ConnectUnix {

#[cfg(any(feature = "legacy", feature = "poll-io"))]
fn legacy_call(&mut self) -> io::Result<u32> {
match crate::syscall_u32!(connect(
self.fd.raw_fd(),
&self.socket_addr.0 as *const _ as *const _,
self.socket_addr.1
)) {
match unsafe {
crate::syscall_u32!(connect(
self.fd.raw_fd(),
&self.socket_addr.0 as *const _ as *const _,
self.socket_addr.1
))
} {
Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err),
_ => Ok(self.fd.raw_fd() as u32),
}
Expand Down
Loading

0 comments on commit e34d058

Please sign in to comment.