diff --git a/monoio-compat/src/lib.rs b/monoio-compat/src/lib.rs index 39e87af0..c79e580a 100644 --- a/monoio-compat/src/lib.rs +++ b/monoio-compat/src/lib.rs @@ -22,7 +22,7 @@ pub type UnixStreamCompat = StreamWrapper; #[cfg(test)] mod tests { - #[monoio::test_all] + #[monoio::test_all(internal = true)] async fn test_rw() { let listener = monoio::net::TcpListener::bind("127.0.0.1:0").unwrap(); let addr = listener.local_addr().unwrap(); @@ -48,7 +48,7 @@ mod tests { client.await; } - #[monoio::test_all] + #[monoio::test_all(internal = true)] async fn test_rw_unsafe() { let listener = monoio::net::TcpListener::bind("127.0.0.1:0").unwrap(); let addr = listener.local_addr().unwrap(); diff --git a/monoio-macros/Cargo.toml b/monoio-macros/Cargo.toml index f7edcc48..54605ccf 100644 --- a/monoio-macros/Cargo.toml +++ b/monoio-macros/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" name = "monoio-macros" readme = "README.md" repository = "https://github.com/bytedance/monoio" -version = "0.1.0" +version = "0.1.1" [lib] proc-macro = true diff --git a/monoio-macros/src/entry.rs b/monoio-macros/src/entry.rs index 7efa0e69..bd8d731f 100644 --- a/monoio-macros/src/entry.rs +++ b/monoio-macros/src/entry.rs @@ -15,6 +15,7 @@ struct FinalConfig { timer_enabled: Option, threads: Option, driver: DriverType, + internal: Option, } /// Config used in case of the attribute not being able to build a valid config @@ -23,6 +24,7 @@ const DEFAULT_ERROR_CONFIG: FinalConfig = FinalConfig { timer_enabled: None, threads: None, driver: DriverType::Fusion, + internal: None, }; struct Configuration { @@ -30,6 +32,7 @@ struct Configuration { timer_enabled: Option<(bool, Span)>, threads: Option<(u32, Span)>, driver: Option<(DriverType, Span)>, + internal: Option<(bool, Span)>, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -46,6 +49,7 @@ impl Configuration { timer_enabled: None, threads: None, driver: None, + internal: None, } } @@ -59,6 +63,16 @@ impl Configuration { Ok(()) } + fn set_internal(&mut self, enabled: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.internal.is_some() { + return Err(syn::Error::new(span, "`internal` set multiple times.")); + } + + let enabled = parse_bool(enabled, span, "internal")?; + self.internal = Some((enabled, span)); + Ok(()) + } + fn set_threads(&mut self, threads: syn::Lit, span: Span) -> Result<(), syn::Error> { if self.threads.is_some() { return Err(syn::Error::new(span, "`threads` set multiple times.")); @@ -98,6 +112,7 @@ impl Configuration { timer_enabled: self.timer_enabled.map(|(t, _)| t), threads: self.threads.map(|(t, _)| t), driver: self.driver.map(|(d, _)| d).unwrap_or(DriverType::Fusion), + internal: self.internal.map(|(t, _)| t), }) } } @@ -199,6 +214,10 @@ fn build_config(input: syn::ItemFn, args: AttributeArgs) -> Result config.set_driver(lit.clone(), syn::spanned::Spanned::span(lit))?, + "internal" => { + config.set_internal(lit.clone(), syn::spanned::Spanned::span(lit))? + } + name => { let msg = format!( "Unknown attribute {name} is specified; expected one of: \ @@ -349,7 +368,7 @@ fn parse_knobs(mut input: syn::ItemFn, is_test: bool, config: FinalConfig) -> To } else { quote! {} }; - let cfg_attr = if is_test { + let cfg_attr = if is_test && config.internal == Some(true) { match config.driver { DriverType::Legacy => quote! { #[cfg(feature = "legacy")] diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index e7cbc77c..88a74c3a 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0" name = "monoio" readme = "../README.md" repository = "https://github.com/bytedance/monoio" -version = "0.2.3" +version = "0.2.4" # common dependencies [dependencies] @@ -44,7 +44,7 @@ windows-sys = { version = "0.48.0", features = [ "Win32_Networking_WinSock", "Win32_System_IO", "Win32_Storage_FileSystem", - "Win32_Security" + "Win32_Security", ] } # unix dependencies @@ -89,4 +89,12 @@ poll-io = ["tokio", "mio"] signal = ["ctrlc", "sync"] signal-termination = ["signal", "ctrlc/termination"] # by default both iouring and legacy are enabled -default = ["async-cancel", "bytes", "iouring", "legacy", "macros", "utils"] +default = [ + "async-cancel", + "bytes", + "iouring", + "legacy", + "macros", + "utils", + "poll-io", +] diff --git a/monoio/src/driver/legacy/iocp/core.rs b/monoio/src/driver/legacy/iocp/core.rs index d871fd10..0c5cb4f1 100644 --- a/monoio/src/driver/legacy/iocp/core.rs +++ b/monoio/src/driver/legacy/iocp/core.rs @@ -47,7 +47,7 @@ impl CompletionPort { GetQueuedCompletionStatusEx( self.handle, entries.as_mut_ptr(), - std::cmp::min(entries.len(), u32::max_value() as usize) as u32, + std::cmp::min(entries.len(), u32::MAX as usize) as u32, &mut count, duration_millis(timeout), 0, diff --git a/monoio/src/driver/legacy/mod.rs b/monoio/src/driver/legacy/mod.rs index 9d0f796e..ed19db75 100644 --- a/monoio/src/driver/legacy/mod.rs +++ b/monoio/src/driver/legacy/mod.rs @@ -10,11 +10,10 @@ use std::{ use super::{ op::{CompletionMeta, Op, OpAble}, + poll::Poll as LegacyPoll, ready::{self, Ready}, - scheduled_io::ScheduledIo, Driver, Inner, CURRENT, }; -use crate::utils::slab::Slab; #[allow(missing_docs, unreachable_pub, dead_code, unused_imports)] #[cfg(windows)] @@ -26,15 +25,7 @@ mod waker; pub(crate) use waker::UnparkHandle; pub(crate) struct LegacyInner { - pub(crate) io_dispatch: Slab, - #[cfg(unix)] - events: mio::Events, - #[cfg(unix)] - poll: mio::Poll, - #[cfg(windows)] - events: iocp::Events, - #[cfg(windows)] - poll: iocp::Poller, + pub(crate) poller: LegacyPoll, #[cfg(feature = "sync")] shared_waker: std::sync::Arc, @@ -66,14 +57,10 @@ impl LegacyDriver { } pub(crate) fn new_with_entries(entries: u32) -> io::Result { - #[cfg(unix)] - let poll = mio::Poll::new()?; - #[cfg(windows)] - let poll = iocp::Poller::new()?; - + let poller = LegacyPoll::with_capacity(entries as usize)?; #[cfg(all(unix, feature = "sync"))] let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new( - poll.registry(), + poller.poll.registry(), TOKEN_WAKEUP, )?)); #[cfg(all(windows, feature = "sync"))] @@ -87,15 +74,7 @@ impl LegacyDriver { let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); let inner = LegacyInner { - io_dispatch: Slab::new(), - #[cfg(unix)] - events: mio::Events::with_capacity(entries as usize), - #[cfg(unix)] - poll, - #[cfg(windows)] - events: iocp::Events::with_capacity(entries as usize), - #[cfg(windows)] - poll, + poller, #[cfg(feature = "sync")] shared_waker, #[cfg(feature = "sync")] @@ -150,13 +129,12 @@ impl LegacyDriver { timeout = Some(Duration::ZERO); } - // here we borrow 2 mut self, but its safe. - let events = unsafe { &mut (*self.inner.get()).events }; - match inner.poll.poll(events, timeout) { + match inner.poller.poll_inside(timeout) { Ok(_) => {} Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} Err(e) => return Err(e), } + let events = &inner.poller.events; #[cfg(unix)] let iter = events.iter(); #[cfg(windows)] @@ -166,103 +144,69 @@ impl LegacyDriver { #[cfg(feature = "sync")] if token != TOKEN_WAKEUP { - inner.dispatch(token, Ready::from_mio(event)); + LegacyPoll::dispatch( + &mut inner.poller.io_dispatch, + token.0, + Ready::from_mio(event), + ); } #[cfg(not(feature = "sync"))] - inner.dispatch(token, Ready::from_mio(event)); + LegacyPoll::dispatch( + &mut inner.poller.io_dispatch, + token.0, + Ready::from_mio(event), + ); } Ok(()) } #[cfg(windows)] + #[inline] pub(crate) fn register( this: &Rc>, state: &mut iocp::SocketState, interest: mio::Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; - let io = ScheduledIo::default(); - let token = inner.io_dispatch.insert(io); - - match inner.poll.register(state, mio::Token(token), interest) { - Ok(_) => Ok(token), - Err(e) => { - inner.io_dispatch.remove(token); - Err(e) - } - } + inner.poller.register(state, interest) } #[cfg(windows)] + #[inline] pub(crate) fn deregister( this: &Rc>, token: usize, state: &mut iocp::SocketState, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; - - // try to deregister fd first, on success we will remove it from slab. - match inner.poll.deregister(state) { - Ok(_) => { - inner.io_dispatch.remove(token); - Ok(()) - } - Err(e) => Err(e), - } + inner.poller.deregister(token, state) } #[cfg(unix)] + #[inline] pub(crate) fn register( this: &Rc>, source: &mut impl mio::event::Source, interest: mio::Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; - let token = inner.io_dispatch.insert(ScheduledIo::new()); - - let registry = inner.poll.registry(); - match registry.register(source, mio::Token(token), interest) { - Ok(_) => Ok(token), - Err(e) => { - inner.io_dispatch.remove(token); - Err(e) - } - } + inner.poller.register(source, interest) } #[cfg(unix)] + #[inline] pub(crate) fn deregister( this: &Rc>, token: usize, source: &mut impl mio::event::Source, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; - - // try to deregister fd first, on success we will remove it from slab. - match inner.poll.registry().deregister(source) { - Ok(_) => { - inner.io_dispatch.remove(token); - Ok(()) - } - Err(e) => Err(e), - } + inner.poller.deregister(source, token) } } impl LegacyInner { - fn dispatch(&mut self, token: mio::Token, ready: Ready) { - let mut sio = match self.io_dispatch.get(token.0) { - Some(io) => io, - None => { - return; - } - }; - let ref_mut = sio.as_mut(); - ref_mut.set_readiness(|curr| curr | ready); - ref_mut.wake(ready); - } - pub(crate) fn poll_op( this: &Rc>, data: &mut T, @@ -282,7 +226,11 @@ impl LegacyInner { }; // wait io ready and do syscall - let mut scheduled_io = inner.io_dispatch.get(index).expect("scheduled_io lost"); + let mut scheduled_io = inner + .poller + .io_dispatch + .get(index) + .expect("scheduled_io lost"); let ref_mut = scheduled_io.as_mut(); let readiness = ready!(ref_mut.poll_readiness(cx, direction)); @@ -316,15 +264,16 @@ impl LegacyInner { pub(crate) fn cancel_op( this: &Rc>, - index: usize, + token: usize, direction: ready::Direction, ) { let inner = unsafe { &mut *this.get() }; let ready = match direction { ready::Direction::Read => Ready::READ_CANCELED, ready::Direction::Write => Ready::WRITE_CANCELED, + ready::Direction::ReadOrWrite => Ready::CANCELED, }; - inner.dispatch(mio::Token(index), ready); + LegacyPoll::dispatch(&mut inner.poller.io_dispatch, token, ready); } pub(crate) fn submit_with_data( diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index 3e7c86cd..3773f689 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -1,7 +1,7 @@ /// Monoio Driver. #[allow(dead_code)] pub(crate) mod op; -#[cfg(all(feature = "poll-io", unix))] +#[cfg(any(feature = "legacy", feature = "poll-io"))] pub(crate) mod poll; #[cfg(any(feature = "legacy", feature = "poll-io"))] pub(crate) mod ready; diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index 3a1e60ef..b136b384 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -13,7 +13,7 @@ mod accept; mod connect; mod fsync; mod open; -mod poll; +pub(crate) mod poll; mod read; mod recv; mod send; diff --git a/monoio/src/driver/op/accept.rs b/monoio/src/driver/op/accept.rs index 0b17fd50..f379d923 100644 --- a/monoio/src/driver/op/accept.rs +++ b/monoio/src/driver/op/accept.rs @@ -76,7 +76,7 @@ impl OpAble for Accept { let addr = self.addr.0.as_mut_ptr() as *mut _; let len = &mut self.addr.1; - syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET) + unsafe { syscall!(accept(fd as _, addr, len), PartialEq::eq, INVALID_SOCKET) } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] @@ -102,12 +102,14 @@ impl OpAble for Accept { target_os = "netbsd", target_os = "openbsd" ))] - return syscall_u32!(accept4( - fd, - addr, - len, - libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, - )); + return unsafe { + syscall_u32!(accept4( + fd, + addr, + len, + libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, + )) + }; // But not all platforms have the `accept4(2)` call. Luckily BSD (derived) // OSes inherit the non-blocking flag from the listener, so we just have to @@ -119,14 +121,16 @@ impl OpAble for Accept { target_os = "redox" ))] return { - let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32; - syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC)) - .and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK))) - .map_err(|e| { - let _ = syscall_u32!(close(stream_fd)); - e - })?; - Ok(stream_fd as _) + unsafe { + let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32; + syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC)) + .and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK))) + .map_err(|e| { + let _ = syscall_u32!(close(stream_fd)); + e + })?; + Ok(stream_fd as _) + } }; } } diff --git a/monoio/src/driver/op/close.rs b/monoio/src/driver/op/close.rs index 0ac7a731..714b4138 100644 --- a/monoio/src/driver/op/close.rs +++ b/monoio/src/driver/op/close.rs @@ -47,9 +47,9 @@ impl OpAble for Close { #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { #[cfg(unix)] - return crate::syscall_u32!(close(self.fd)); + return unsafe { crate::syscall_u32!(close(self.fd)) }; #[cfg(windows)] - return syscall!(closesocket(self.fd as _), PartialEq::ne, 0); + return unsafe { syscall!(closesocket(self.fd as _), PartialEq::ne, 0) }; } } diff --git a/monoio/src/driver/op/connect.rs b/monoio/src/driver/op/connect.rs index 1fa923e9..151b355c 100644 --- a/monoio/src/driver/op/connect.rs +++ b/monoio/src/driver/op/connect.rs @@ -70,27 +70,31 @@ impl OpAble for Connect { endpoints.sae_dstaddr = self.socket_addr.as_ptr(); endpoints.sae_dstaddrlen = self.socket_addr_len; - return match crate::syscall_u32!(connectx( - self.fd.raw_fd(), - &endpoints as *const _, - libc::SAE_ASSOCID_ANY, - libc::CONNECT_DATA_IDEMPOTENT | libc::CONNECT_RESUME_ON_READ_WRITE, - std::ptr::null(), - 0, - std::ptr::null_mut(), - std::ptr::null_mut(), - )) { + return match unsafe { + crate::syscall_u32!(connectx( + self.fd.raw_fd(), + &endpoints as *const _, + libc::SAE_ASSOCID_ANY, + libc::CONNECT_DATA_IDEMPOTENT | libc::CONNECT_RESUME_ON_READ_WRITE, + std::ptr::null(), + 0, + std::ptr::null_mut(), + std::ptr::null_mut(), + )) + } { Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), _ => Ok(self.fd.raw_fd() as u32), }; } #[cfg(unix)] - match crate::syscall_u32!(connect( - self.fd.raw_fd(), - self.socket_addr.as_ptr(), - self.socket_addr_len, - )) { + match unsafe { + crate::syscall_u32!(connect( + self.fd.raw_fd(), + self.socket_addr.as_ptr(), + self.socket_addr_len, + )) + } { Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), _ => Ok(self.fd.raw_fd() as u32), } @@ -159,11 +163,13 @@ impl OpAble for ConnectUnix { #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { - match crate::syscall_u32!(connect( - self.fd.raw_fd(), - &self.socket_addr.0 as *const _ as *const _, - self.socket_addr.1 - )) { + match unsafe { + crate::syscall_u32!(connect( + self.fd.raw_fd(), + &self.socket_addr.0 as *const _ as *const _, + self.socket_addr.1 + )) + } { Err(err) if err.raw_os_error() != Some(libc::EINPROGRESS) => Err(err), _ => Ok(self.fd.raw_fd() as u32), } diff --git a/monoio/src/driver/op/fsync.rs b/monoio/src/driver/op/fsync.rs index 0dfd1707..dcafbc8d 100644 --- a/monoio/src/driver/op/fsync.rs +++ b/monoio/src/driver/op/fsync.rs @@ -57,22 +57,26 @@ impl OpAble for Fsync { #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - syscall!( - FlushFileBuffers(self.fd.as_raw_handle() as _), - PartialEq::eq, - 0 - ) + unsafe { + syscall!( + FlushFileBuffers(self.fd.as_raw_handle() as _), + PartialEq::eq, + 0 + ) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { - #[cfg(target_os = "linux")] - if self.data_sync { - syscall_u32!(fdatasync(self.fd.raw_fd())) - } else { + unsafe { + #[cfg(target_os = "linux")] + if self.data_sync { + syscall_u32!(fdatasync(self.fd.raw_fd())) + } else { + syscall_u32!(fsync(self.fd.raw_fd())) + } + #[cfg(not(target_os = "linux"))] syscall_u32!(fsync(self.fd.raw_fd())) } - #[cfg(not(target_os = "linux"))] - syscall_u32!(fsync(self.fd.raw_fd())) } } diff --git a/monoio/src/driver/op/open.rs b/monoio/src/driver/op/open.rs index 1cb81dbd..78a43212 100644 --- a/monoio/src/driver/op/open.rs +++ b/monoio/src/driver/op/open.rs @@ -70,27 +70,31 @@ impl OpAble for Open { #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] fn legacy_call(&mut self) -> io::Result { - syscall_u32!(open( - self.path.as_c_str().as_ptr(), - self.flags, - self.mode as libc::c_int - )) + unsafe { + syscall_u32!(open( + self.path.as_c_str().as_ptr(), + self.flags, + self.mode as libc::c_int + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - syscall!( - CreateFileW( - self.path.as_c_str().as_ptr().cast(), - self.opts.access_mode()?, - self.opts.share_mode, - self.opts.security_attributes, - self.opts.creation_mode()?, - self.opts.get_flags_and_attributes(), - 0, - ), - PartialEq::eq, - INVALID_HANDLE_VALUE - ) + unsafe { + syscall!( + CreateFileW( + self.path.as_c_str().as_ptr().cast(), + self.opts.access_mode()?, + self.opts.share_mode, + self.opts.security_attributes, + self.opts.creation_mode()?, + self.opts.get_flags_and_attributes(), + 0, + ), + PartialEq::eq, + INVALID_HANDLE_VALUE + ) + } } } diff --git a/monoio/src/driver/op/poll.rs b/monoio/src/driver/op/poll.rs index 56aaae0e..c272f44b 100644 --- a/monoio/src/driver/op/poll.rs +++ b/monoio/src/driver/op/poll.rs @@ -16,13 +16,49 @@ use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; +/// Interest for PollAdd and AsyncFd. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub(crate) enum PollAddInterest { + Read, + Write, + ReadOrWrite, +} + +impl PollAddInterest { + #[cfg(unix)] + pub(crate) fn to_flags(self) -> i16 { + match self { + PollAddInterest::Read => libc::POLLIN, + PollAddInterest::Write => libc::POLLOUT, + PollAddInterest::ReadOrWrite => libc::POLLIN | libc::POLLOUT, + } + } + + #[cfg(windows)] + pub(crate) fn to_flags(self) -> i16 { + match self { + PollAddInterest::Read => POLLIN, + PollAddInterest::Write => POLLOUT, + PollAddInterest::ReadOrWrite => POLLIN | POLLOUT, + } + } + + #[cfg(any(feature = "legacy", feature = "poll-io"))] + pub(crate) fn to_direction(self) -> Direction { + match self { + PollAddInterest::Read => Direction::Read, + PollAddInterest::Write => Direction::Write, + PollAddInterest::ReadOrWrite => Direction::ReadOrWrite, + } + } +} + pub(crate) struct PollAdd { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. #[allow(unused)] fd: SharedFd, - // true: read; false: write - is_read: bool, + interest: PollAddInterest, #[cfg(any(feature = "legacy", feature = "poll-io"))] relaxed: bool, } @@ -31,7 +67,7 @@ impl Op { pub(crate) fn poll_read(fd: &SharedFd, _relaxed: bool) -> io::Result> { Op::submit_with(PollAdd { fd: fd.clone(), - is_read: true, + interest: PollAddInterest::Read, #[cfg(any(feature = "legacy", feature = "poll-io"))] relaxed: _relaxed, }) @@ -40,7 +76,20 @@ impl Op { pub(crate) fn poll_write(fd: &SharedFd, _relaxed: bool) -> io::Result> { Op::submit_with(PollAdd { fd: fd.clone(), - is_read: false, + interest: PollAddInterest::Write, + #[cfg(any(feature = "legacy", feature = "poll-io"))] + relaxed: _relaxed, + }) + } + + pub(crate) fn poll_with_interest( + fd: &SharedFd, + interest: PollAddInterest, + _relaxed: bool, + ) -> io::Result> { + Op::submit_with(PollAdd { + fd: fd.clone(), + interest, #[cfg(any(feature = "legacy", feature = "poll-io"))] relaxed: _relaxed, }) @@ -55,47 +104,28 @@ impl Op { impl OpAble for PollAdd { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { - opcode::PollAdd::new( - types::Fd(self.fd.raw_fd()), - if self.is_read { - libc::POLLIN as _ - } else { - libc::POLLOUT as _ - }, - ) - .build() + opcode::PollAdd::new(types::Fd(self.fd.raw_fd()), self.interest.to_flags() as _).build() } #[cfg(any(feature = "legacy", feature = "poll-io"))] #[inline] fn legacy_interest(&self) -> Option<(Direction, usize)> { - self.fd.registered_index().map(|idx| { - ( - if self.is_read { - Direction::Read - } else { - Direction::Write - }, - idx, - ) - }) + self.fd + .registered_index() + .map(|idx| (self.interest.to_direction(), idx)) } - #[cfg(all(any(feature = "legacy", feature = "poll-io"), not(windows)))] + #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { if !self.relaxed { use std::{io::ErrorKind, os::fd::AsRawFd}; let mut pollfd = libc::pollfd { fd: self.fd.as_raw_fd(), - events: if self.is_read { - libc::POLLIN as _ - } else { - libc::POLLOUT as _ - }, + events: self.interest.to_flags(), revents: 0, }; - let ret = crate::syscall_u32!(poll(&mut pollfd as *mut _, 1, 0))?; + let ret = unsafe { crate::syscall_u32!(poll(&mut pollfd as *mut _, 1, 0))? }; if ret == 0 { return Err(ErrorKind::WouldBlock.into()); } @@ -108,11 +138,7 @@ impl OpAble for PollAdd { if !self.relaxed { let mut pollfd = WSAPOLLFD { fd: self.fd.as_raw_socket() as _, - events: if self.is_read { - POLLIN as _ - } else { - POLLOUT as _ - }, + events: self.interest.to_flags(), revents: 0, }; let ret = unsafe { WSAPoll(&mut pollfd as *mut _, 1, 0) }; diff --git a/monoio/src/driver/op/read.rs b/monoio/src/driver/op/read.rs index eff9d727..fedd435a 100644 --- a/monoio/src/driver/op/read.rs +++ b/monoio/src/driver/op/read.rs @@ -85,21 +85,23 @@ impl OpAble for Read { let fd = self.fd.as_raw_fd(); let seek_offset = libc::off_t::try_from(self.offset) .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - #[cfg(not(target_os = "macos"))] - return syscall_u32!(pread64( - fd, - self.buf.write_ptr() as _, - self.buf.bytes_total(), - seek_offset as _ - )); - - #[cfg(target_os = "macos")] - return syscall_u32!(pread( - fd, - self.buf.write_ptr() as _, - self.buf.bytes_total(), - seek_offset - )); + unsafe { + #[cfg(not(target_os = "macos"))] + return syscall_u32!(pread64( + fd, + self.buf.write_ptr() as _, + self.buf.bytes_total(), + seek_offset as _ + )); + + #[cfg(target_os = "macos")] + return syscall_u32!(pread( + fd, + self.buf.write_ptr() as _, + self.buf.bytes_total(), + seek_offset + )); + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] @@ -177,11 +179,13 @@ impl OpAble for ReadVec { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { - syscall_u32!(readv( - self.fd.raw_fd(), - self.buf_vec.write_iovec_ptr(), - self.buf_vec.write_iovec_len().min(i32::MAX as usize) as _ - )) + unsafe { + syscall_u32!(readv( + self.fd.raw_fd(), + self.buf_vec.write_iovec_ptr(), + self.buf_vec.write_iovec_len().min(i32::MAX as usize) as _ + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index a995695d..e6f1ff27 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -99,27 +99,31 @@ impl OpAble for Recv { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recv( - fd, - self.buf.write_ptr() as _, - self.buf.bytes_total().min(u32::MAX as usize), - 0 - )) + unsafe { + syscall_u32!(recv( + fd, + self.buf.write_ptr() as _, + self.buf.bytes_total().min(u32::MAX as usize), + 0 + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_socket(); - syscall!( - recv( - fd as _, - self.buf.write_ptr(), - self.buf.bytes_total().min(i32::MAX as usize) as _, + unsafe { + syscall!( + recv( + fd as _, + self.buf.write_ptr(), + self.buf.bytes_total().min(i32::MAX as usize) as _, + 0 + ), + PartialOrd::lt, 0 - ), - PartialOrd::lt, - 0 - ) + ) + } } } @@ -238,7 +242,7 @@ impl OpAble for RecvMsg { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recvmsg(fd, &mut *self.info.2, 0)) + unsafe { syscall_u32!(recvmsg(fd, &mut *self.info.2, 0)) } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] @@ -356,6 +360,6 @@ impl OpAble for RecvMsgUnix { #[cfg(any(feature = "legacy", feature = "poll-io"))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) + unsafe { syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) } } } diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index 3c5f99bc..3b746772 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -101,22 +101,26 @@ impl OpAble for Send { #[cfg(not(target_os = "linux"))] let flags = 0; - syscall_u32!(send( - fd, - self.buf.read_ptr() as _, - self.buf.bytes_init(), - flags - )) + unsafe { + syscall_u32!(send( + fd, + self.buf.read_ptr() as _, + self.buf.bytes_init(), + flags + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_socket(); - syscall!( - send(fd as _, self.buf.read_ptr(), self.buf.bytes_init() as _, 0), - PartialOrd::lt, - 0 - ) + unsafe { + syscall!( + send(fd as _, self.buf.read_ptr(), self.buf.bytes_init() as _, 0), + PartialOrd::lt, + 0 + ) + } } } @@ -212,7 +216,7 @@ impl OpAble for SendMsg { #[cfg(not(target_os = "linux"))] const FLAGS: libc::c_int = 0; let fd = self.fd.as_raw_fd(); - syscall_u32!(sendmsg(fd, &*self.info.2, FLAGS)) + unsafe { syscall_u32!(sendmsg(fd, &*self.info.2, FLAGS)) } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] @@ -316,6 +320,6 @@ impl OpAble for SendMsgUnix { #[cfg(not(target_os = "linux"))] const FLAGS: libc::c_int = 0; let fd = self.fd.as_raw_fd(); - syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) + unsafe { syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) } } } diff --git a/monoio/src/driver/op/splice.rs b/monoio/src/driver/op/splice.rs index 21940576..fb3bfc83 100644 --- a/monoio/src/driver/op/splice.rs +++ b/monoio/src/driver/op/splice.rs @@ -94,13 +94,15 @@ impl OpAble for Splice { let fd_out = self.fd_out.as_raw_fd(); let off_in = std::ptr::null_mut::(); let off_out = std::ptr::null_mut::(); - syscall_u32!(splice( - fd_in, - off_in, - fd_out, - off_out, - self.len as usize, - FLAG - )) + unsafe { + syscall_u32!(splice( + fd_in, + off_in, + fd_out, + off_out, + self.len as usize, + FLAG + )) + } } } diff --git a/monoio/src/driver/op/write.rs b/monoio/src/driver/op/write.rs index f32295ce..1761e887 100644 --- a/monoio/src/driver/op/write.rs +++ b/monoio/src/driver/op/write.rs @@ -69,21 +69,23 @@ impl OpAble for Write { let fd = self.fd.as_raw_fd(); let seek_offset = libc::off_t::try_from(self.offset) .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - #[cfg(not(target_os = "macos"))] - return syscall_u32!(pwrite64( - fd, - self.buf.read_ptr() as _, - self.buf.bytes_init(), - seek_offset as _ - )); - - #[cfg(target_os = "macos")] - return syscall_u32!(pwrite( - fd, - self.buf.read_ptr() as _, - self.buf.bytes_init(), - seek_offset - )); + unsafe { + #[cfg(not(target_os = "macos"))] + return syscall_u32!(pwrite64( + fd, + self.buf.read_ptr() as _, + self.buf.bytes_init(), + seek_offset as _ + )); + + #[cfg(target_os = "macos")] + return syscall_u32!(pwrite( + fd, + self.buf.read_ptr() as _, + self.buf.bytes_init(), + seek_offset + )); + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] @@ -166,25 +168,29 @@ impl OpAble for WriteVec { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { - syscall_u32!(writev( - self.fd.raw_fd(), - self.buf_vec.read_iovec_ptr(), - self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _ - )) + unsafe { + syscall_u32!(writev( + self.fd.raw_fd(), + self.buf_vec.read_iovec_ptr(), + self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _ + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { let mut bytes_sent = 0; - syscall_u32!(WSASend( - self.fd.raw_socket() as _, - self.buf_vec.read_wsabuf_ptr(), - self.buf_vec.read_wsabuf_len() as _, - &mut bytes_sent, - 0, - std::ptr::null_mut(), - None, - )) - .map(|_| bytes_sent) + unsafe { + syscall_u32!(WSASend( + self.fd.raw_socket() as _, + self.buf_vec.read_wsabuf_ptr(), + self.buf_vec.read_wsabuf_len() as _, + &mut bytes_sent, + 0, + std::ptr::null_mut(), + None, + )) + .map(|_| bytes_sent) + } } } diff --git a/monoio/src/driver/poll.rs b/monoio/src/driver/poll.rs index e0ae0b41..5f942f01 100644 --- a/monoio/src/driver/poll.rs +++ b/monoio/src/driver/poll.rs @@ -1,14 +1,27 @@ use std::{io, task::Context, time::Duration}; -use super::{ready::Direction, scheduled_io::ScheduledIo}; -use crate::{driver::op::CompletionMeta, utils::slab::Slab}; +#[cfg(windows)] +use super::legacy::iocp; +use super::{ + ready::{Direction, Ready}, + scheduled_io::ScheduledIo, +}; +#[cfg(feature = "poll-io")] +use crate::driver::op::CompletionMeta; +use crate::utils::slab::Slab; /// Poller with io dispatch. // TODO: replace legacy impl with this Poll. pub(crate) struct Poll { pub(crate) io_dispatch: Slab, - poll: mio::Poll, - events: mio::Events, + #[cfg(unix)] + pub(crate) events: mio::Events, + #[cfg(unix)] + pub(crate) poll: mio::Poll, + #[cfg(windows)] + pub(crate) events: iocp::Events, + #[cfg(windows)] + pub(crate) poll: iocp::Poller, } impl Poll { @@ -16,11 +29,49 @@ impl Poll { pub(crate) fn with_capacity(capacity: usize) -> io::Result { Ok(Self { io_dispatch: Slab::new(), - poll: mio::Poll::new()?, + #[cfg(unix)] events: mio::Events::with_capacity(capacity), + #[cfg(windows)] + events: iocp::Events::with_capacity(capacity), + #[cfg(unix)] + poll: mio::Poll::new()?, + #[cfg(windows)] + poll: iocp::Poller::new()?, }) } + #[allow(unused)] + #[inline] + pub(crate) fn clear_readiness(io_dispatch: &mut Slab, token: usize, clear: Ready) { + let mut sio = match io_dispatch.get(token) { + Some(io) => io, + None => { + return; + } + }; + let ref_mut = sio.as_mut(); + ref_mut.set_readiness(|curr| curr - clear); + } + + #[inline] + pub(crate) fn dispatch(io_dispatch: &mut Slab, token: usize, ready: Ready) { + let mut sio = match io_dispatch.get(token) { + Some(io) => io, + None => { + return; + } + }; + let ref_mut = sio.as_mut(); + ref_mut.set_readiness(|curr| curr | ready); + ref_mut.wake(ready); + } + + #[inline] + pub(crate) fn poll_inside(&mut self, timeout: Option) -> io::Result<()> { + self.poll.poll(&mut self.events, timeout) + } + + #[cfg(all(feature = "poll-io", target_os = "linux"))] #[inline] pub(crate) fn tick(&mut self, timeout: Option) -> io::Result<()> { match self.poll.poll(&mut self.events, timeout) { @@ -41,6 +92,41 @@ impl Poll { Ok(()) } + #[cfg(windows)] + pub(crate) fn register( + &mut self, + state: &mut iocp::SocketState, + interest: mio::Interest, + ) -> io::Result { + let io = ScheduledIo::default(); + let token = self.io_dispatch.insert(io); + + match self.poll.register(state, mio::Token(token), interest) { + Ok(_) => Ok(token), + Err(e) => { + self.io_dispatch.remove(token); + Err(e) + } + } + } + + #[cfg(windows)] + pub(crate) fn deregister( + &mut self, + token: usize, + state: &mut iocp::SocketState, + ) -> io::Result<()> { + // try to deregister fd first, on success we will remove it from slab. + match self.poll.deregister(state) { + Ok(_) => { + self.io_dispatch.remove(token); + Ok(()) + } + Err(e) => Err(e), + } + } + + #[cfg(unix)] pub(crate) fn register( &mut self, source: &mut impl mio::event::Source, @@ -57,6 +143,7 @@ impl Poll { } } + #[cfg(unix)] pub(crate) fn deregister( &mut self, source: &mut impl mio::event::Source, @@ -71,6 +158,7 @@ impl Poll { } } + #[cfg(feature = "poll-io")] #[inline] pub(crate) fn poll_syscall( &mut self, @@ -98,6 +186,19 @@ impl Poll { }), } } + + #[allow(unused)] + #[inline] + pub(crate) fn poll_readiness( + &mut self, + cx: &mut Context<'_>, + token: usize, + direction: Direction, + ) -> std::task::Poll { + let mut scheduled_io = self.io_dispatch.get(token).expect("scheduled_io lost"); + let ref_mut = scheduled_io.as_mut(); + ref_mut.poll_readiness(cx, direction) + } } #[cfg(unix)] diff --git a/monoio/src/driver/ready.rs b/monoio/src/driver/ready.rs index 6262359a..6f38026f 100644 --- a/monoio/src/driver/ready.rs +++ b/monoio/src/driver/ready.rs @@ -13,37 +13,41 @@ const WRITE_CANCELED: u8 = 0b10_0000; /// Describes the readiness state of an I/O resources. /// /// `Ready` tracks which operation an I/O resource is ready to perform. -#[cfg_attr(docsrs, doc(cfg(feature = "net")))] #[derive(Clone, Copy, PartialEq, PartialOrd, Eq)] -pub(crate) struct Ready(u8); +#[allow(unreachable_pub)] +pub struct Ready(u8); +#[allow(unreachable_pub)] impl Ready { /// Returns the empty `Ready` set. - pub(crate) const EMPTY: Ready = Ready(0); + pub const EMPTY: Ready = Ready(0); /// Returns a `Ready` representing readable readiness. - pub(crate) const READABLE: Ready = Ready(READABLE); + pub const READABLE: Ready = Ready(READABLE); /// Returns a `Ready` representing writable readiness. - pub(crate) const WRITABLE: Ready = Ready(WRITABLE); + pub const WRITABLE: Ready = Ready(WRITABLE); /// Returns a `Ready` representing read closed readiness. - pub(crate) const READ_CLOSED: Ready = Ready(READ_CLOSED); + pub const READ_CLOSED: Ready = Ready(READ_CLOSED); /// Returns a `Ready` representing write closed readiness. - pub(crate) const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); + pub const WRITE_CLOSED: Ready = Ready(WRITE_CLOSED); /// Returns a `Ready` representing read canceled readiness. - pub(crate) const READ_CANCELED: Ready = Ready(READ_CANCELED); + pub const READ_CANCELED: Ready = Ready(READ_CANCELED); /// Returns a `Ready` representing write canceled readiness. - pub(crate) const WRITE_CANCELED: Ready = Ready(WRITE_CANCELED); + pub const WRITE_CANCELED: Ready = Ready(WRITE_CANCELED); /// Returns a `Ready` representing read or write canceled readiness. - pub(crate) const CANCELED: Ready = Ready(READ_CANCELED | WRITE_CANCELED); + pub const CANCELED: Ready = Ready(READ_CANCELED | WRITE_CANCELED); - pub(crate) const READ_ALL: Ready = Ready(READABLE | READ_CLOSED | READ_CANCELED); - pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED); + /// All read related readiness. + pub const READ_ALL: Ready = Ready(READABLE | READ_CLOSED | READ_CANCELED); + + /// All write related readiness. + pub const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED); #[cfg(windows)] pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready { @@ -73,7 +77,7 @@ impl Ready { pub(crate) fn from_mio(event: &mio::event::Event) -> Ready { let mut ready = Ready::EMPTY; - #[cfg(all(target_os = "freebsd", feature = "net"))] + #[cfg(target_os = "freebsd")] { if event.is_aio() { ready |= Ready::READABLE; @@ -104,32 +108,32 @@ impl Ready { } /// Returns true if `Ready` is the empty set. - pub(crate) fn is_empty(self) -> bool { + pub fn is_empty(self) -> bool { self == Ready::EMPTY } /// Returns `true` if the value includes `readable`. - pub(crate) fn is_readable(self) -> bool { + pub fn is_readable(self) -> bool { !(self & Ready::READ_ALL).is_empty() } /// Returns `true` if the value includes writable `readiness`. - pub(crate) fn is_writable(self) -> bool { + pub fn is_writable(self) -> bool { !(self & Ready::WRITE_ALL).is_empty() } /// Returns `true` if the value includes read-closed `readiness`. - pub(crate) fn is_read_closed(self) -> bool { + pub fn is_read_closed(self) -> bool { self.contains(Ready::READ_CLOSED) } /// Returns `true` if the value includes write-closed `readiness`. - pub(crate) fn is_write_closed(self) -> bool { + pub fn is_write_closed(self) -> bool { self.contains(Ready::WRITE_CLOSED) } - #[allow(dead_code)] - pub(crate) fn is_canceled(self) -> bool { + /// Returns `true` if the value includes canceled. + pub fn is_canceled(self) -> bool { !(self & Ready::CANCELED).is_empty() } @@ -138,7 +142,7 @@ impl Ready { /// `other` may represent more than one readiness operations, in which case /// the function only returns true if `self` contains all readiness /// specified in `other`. - pub(crate) fn contains>(self, other: T) -> bool { + pub fn contains>(self, other: T) -> bool { let other = other.into(); (self & other) == other } @@ -231,6 +235,7 @@ impl fmt::Debug for Ready { pub(crate) enum Direction { Read, Write, + ReadOrWrite, } impl Direction { @@ -238,6 +243,14 @@ impl Direction { match self { Direction::Read => Ready::READABLE | Ready::READ_CLOSED | Ready::READ_CANCELED, Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED | Ready::WRITE_CANCELED, + Direction::ReadOrWrite => { + Ready::READABLE + | Ready::WRITABLE + | Ready::READ_CLOSED + | Ready::WRITE_CLOSED + | Ready::READ_CANCELED + | Ready::WRITE_CANCELED + } } } } diff --git a/monoio/src/driver/scheduled_io.rs b/monoio/src/driver/scheduled_io.rs index d164a1d3..5ac75e31 100644 --- a/monoio/src/driver/scheduled_io.rs +++ b/monoio/src/driver/scheduled_io.rs @@ -6,9 +6,11 @@ pub(crate) struct ScheduledIo { readiness: Ready, /// Waker used for AsyncRead. - reader: Option, + r_waiter: Option, /// Waker used for AsyncWrite. - writer: Option, + w_waiter: Option, + /// Waker requires read or write. + rw_waiter: Option, } impl Default for ScheduledIo { @@ -22,8 +24,9 @@ impl ScheduledIo { pub(crate) const fn new() -> Self { Self { readiness: Ready::EMPTY, - reader: None, - writer: None, + r_waiter: None, + w_waiter: None, + rw_waiter: None, } } @@ -40,15 +43,28 @@ impl ScheduledIo { #[inline] pub(crate) fn wake(&mut self, ready: Ready) { - if ready.is_readable() { - if let Some(waker) = self.reader.take() { - waker.wake(); - } + macro_rules! try_wake { + ($w: expr) => { + if let Some(waker) = $w.take() { + waker.wake(); + } + }; } - if ready.is_writable() { - if let Some(waker) = self.writer.take() { - waker.wake(); + match (ready.is_readable(), ready.is_writable()) { + (true, true) => { + try_wake!(self.r_waiter); + try_wake!(self.w_waiter); + try_wake!(self.rw_waiter); + } + (true, false) => { + try_wake!(self.r_waiter); + try_wake!(self.rw_waiter); + } + (false, true) => { + try_wake!(self.w_waiter); + try_wake!(self.rw_waiter); } + _ => (), } } @@ -74,19 +90,19 @@ impl ScheduledIo { #[inline] pub(crate) fn set_waker(&mut self, cx: &mut Context<'_>, direction: Direction) { - let slot = match direction { - Direction::Read => &mut self.reader, - Direction::Write => &mut self.writer, - }; - match slot { - Some(existing) => { - if !existing.will_wake(cx.waker()) { + macro_rules! set_waker_slot { + ($slot: expr) => { + if let Some(existing) = $slot { existing.clone_from(cx.waker()); + } else { + *$slot = Some(cx.waker().clone()); } - } - None => { - *slot = Some(cx.waker().clone()); - } + }; } + match direction { + Direction::Read => set_waker_slot!(&mut self.r_waiter), + Direction::Write => set_waker_slot!(&mut self.w_waiter), + Direction::ReadOrWrite => set_waker_slot!(&mut self.rw_waiter), + }; } } diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index 0cf550c4..d0891971 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -24,6 +24,8 @@ struct Inner { // Waker to notify when the close operation completes. state: UnsafeCell, + + close_on_drop: UnsafeCell, } enum State { @@ -45,7 +47,7 @@ impl State { // TODO: only Init state can convert? if matches!(state, UringState::Init) { let mut source = mio::unix::SourceFd(&fd); - crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK))?; + unsafe { crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK))? }; let reg = CURRENT .with(|inner| match inner { #[cfg(all(target_os = "linux", feature = "iouring"))] @@ -58,7 +60,7 @@ impl State { crate::driver::Inner::Legacy(_) => panic!("unexpected legacy runtime"), }) .map_err(|e| { - let _ = crate::syscall!(fcntl(fd, libc::F_SETFL, 0)); + let _ = unsafe { crate::syscall!(fcntl(fd, libc::F_SETFL, 0)) }; e })?; *state = UringState::Legacy(Some(reg)); @@ -87,7 +89,7 @@ impl State { return Err(io::Error::new(io::ErrorKind::Other, "empty token")); }; let mut source = mio::unix::SourceFd(&fd); - crate::syscall!(fcntl(fd, libc::F_SETFL, 0))?; + unsafe { crate::syscall!(fcntl(fd, libc::F_SETFL, 0))? }; CURRENT .with(|inner| match inner { #[cfg(all(target_os = "linux", feature = "iouring"))] @@ -98,7 +100,7 @@ impl State { crate::driver::Inner::Legacy(_) => panic!("unexpected legacy runtime"), }) .map_err(|e| { - let _ = crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK)); + let _ = unsafe { crate::syscall!(fcntl(fd, libc::F_SETFL, libc::O_NONBLOCK)) }; e })?; *self = State::Uring(UringState::Init); @@ -161,7 +163,7 @@ impl AsRawHandle for SharedFd { impl SharedFd { #[cfg(unix)] #[allow(unreachable_code, unused)] - pub(crate) fn new(fd: RawFd) -> io::Result { + pub(crate) fn new(fd: RawFd) -> io::Result { enum Reg { Uring, #[cfg(feature = "poll-io")] @@ -171,7 +173,7 @@ impl SharedFd { #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] let state = match CURRENT.with(|inner| match inner { - super::Inner::Uring(inner) => match FORCE_LEGACY { + super::Inner::Uring(inner) => match PREFER_LEGACY { false => Reg::Uring, true => { #[cfg(feature = "poll-io")] @@ -237,12 +239,13 @@ impl SharedFd { inner: Rc::new(Inner { fd, state: UnsafeCell::new(state), + close_on_drop: UnsafeCell::new(true), }), }) } #[cfg(windows)] - pub(crate) fn new(fd: RawSocket) -> io::Result { + pub(crate) fn new(fd: RawSocket) -> io::Result { const RW_INTERESTS: mio::Interest = mio::Interest::READABLE.add(mio::Interest::WRITABLE); let mut fd = RawFd::new(fd); @@ -262,6 +265,7 @@ impl SharedFd { inner: Rc::new(Inner { fd, state: UnsafeCell::new(state), + close_on_drop: UnsafeCell::new(true), }), }) } @@ -287,10 +291,17 @@ impl SharedFd { inner: Rc::new(Inner { fd, state: UnsafeCell::new(state), + close_on_drop: UnsafeCell::new(true), }), } } + pub(crate) fn set_close_on_drop(&self, close: bool) { + unsafe { + *self.inner.close_on_drop.get() = close; + } + } + #[cfg(windows)] #[allow(unreachable_code, unused)] pub(crate) fn new_without_register(fd: RawSocket) -> SharedFd { @@ -302,6 +313,7 @@ impl SharedFd { inner: Rc::new(Inner { fd: RawFd::new(fd), state: UnsafeCell::new(state), + close_on_drop: UnsafeCell::new(true), }), } } @@ -454,23 +466,26 @@ impl SharedFd { #[cfg(feature = "poll-io")] #[inline] pub(crate) fn cvt_poll(&mut self) -> io::Result<()> { - let state = unsafe { &mut *self.inner.state.get() }; #[cfg(unix)] - let r = state.cvt_uring_poll(self.inner.fd); + { + let state = unsafe { &mut *self.inner.state.get() }; + state.cvt_uring_poll(self.inner.fd) + } + #[cfg(windows)] - let r = Ok(()); - r + Ok(()) } #[cfg(feature = "poll-io")] #[inline] pub(crate) fn cvt_comp(&mut self) -> io::Result<()> { - let state = unsafe { &mut *self.inner.state.get() }; #[cfg(unix)] - let r = state.cvt_comp(self.inner.fd); + { + let state = unsafe { &mut *self.inner.state.get() }; + state.cvt_comp(self.inner.fd) + } #[cfg(windows)] - let r = Ok(()); - r + Ok(()) } } @@ -526,18 +541,22 @@ impl Drop for Inner { fn drop(&mut self) { let fd = self.fd; let state = unsafe { &mut *self.state.get() }; + let close_on_drop = unsafe { *self.close_on_drop.get() }; #[allow(unreachable_patterns)] match state { #[cfg(all(target_os = "linux", feature = "iouring"))] State::Uring(UringState::Init) | State::Uring(UringState::Waiting(..)) => { + if !close_on_drop { + return; + } if super::op::Op::close(fd).is_err() { let _ = unsafe { std::fs::File::from_raw_fd(fd) }; }; } #[cfg(feature = "legacy")] - State::Legacy(idx) => drop_legacy(fd, *idx), + State::Legacy(idx) => drop_legacy(fd, *idx, close_on_drop), #[cfg(all(target_os = "linux", feature = "iouring", feature = "poll-io"))] - State::Uring(UringState::Legacy(idx)) => drop_uring_legacy(fd, *idx), + State::Uring(UringState::Legacy(idx)) => drop_uring_legacy(fd, *idx, close_on_drop), _ => {} } } @@ -545,7 +564,7 @@ impl Drop for Inner { #[allow(unused_mut)] #[cfg(feature = "legacy")] -fn drop_legacy(mut fd: RawFd, idx: Option) { +fn drop_legacy(mut fd: RawFd, idx: Option, close_on_drop: bool) { if CURRENT.is_set() { CURRENT.with(|inner| { #[cfg(any(all(target_os = "linux", feature = "iouring"), feature = "legacy"))] @@ -569,6 +588,9 @@ fn drop_legacy(mut fd: RawFd, idx: Option) { } }) } + if !close_on_drop { + return; + } #[cfg(all(unix, feature = "legacy"))] let _ = unsafe { std::fs::File::from_raw_fd(fd) }; #[cfg(all(windows, feature = "legacy"))] @@ -576,7 +598,7 @@ fn drop_legacy(mut fd: RawFd, idx: Option) { } #[cfg(feature = "poll-io")] -fn drop_uring_legacy(fd: RawFd, idx: Option) { +fn drop_uring_legacy(fd: RawFd, _idx: Option, close_on_drop: bool) { if CURRENT.is_set() { CURRENT.with(|inner| { match inner { @@ -587,7 +609,7 @@ fn drop_uring_legacy(fd: RawFd, idx: Option) { #[cfg(all(target_os = "linux", feature = "iouring"))] super::Inner::Uring(inner) => { // deregister it from driver(Poll and slab) and close fd - if let Some(idx) = idx { + if let Some(idx) = _idx { let mut source = mio::unix::SourceFd(&fd); let _ = super::IoUringDriver::deregister_poll_io(inner, &mut source, idx); } @@ -595,6 +617,9 @@ fn drop_uring_legacy(fd: RawFd, idx: Option) { } }) } + if !close_on_drop { + return; + } #[cfg(unix)] let _ = unsafe { std::fs::File::from_raw_fd(fd) }; #[cfg(windows)] diff --git a/monoio/src/driver/uring/mod.rs b/monoio/src/driver/uring/mod.rs index e170d12a..49ba23f8 100644 --- a/monoio/src/driver/uring/mod.rs +++ b/monoio/src/driver/uring/mod.rs @@ -61,7 +61,7 @@ pub(crate) struct UringInner { ops: Ops, #[cfg(feature = "poll-io")] - poll: super::poll::Poll, + pub(crate) poller: super::poll::Poll, #[cfg(feature = "poll-io")] poller_installed: bool, @@ -106,7 +106,7 @@ impl IoUringDriver { let inner = Rc::new(UnsafeCell::new(UringInner { #[cfg(feature = "poll-io")] - poll: super::poll::Poll::with_capacity(entries as usize)?, + poller: super::poll::Poll::with_capacity(entries as usize)?, #[cfg(feature = "poll-io")] poller_installed: false, ops: Ops::new(), @@ -129,7 +129,7 @@ impl IoUringDriver { // Create eventfd and register it to the ring. let waker = { - let fd = crate::syscall!(eventfd(0, libc::EFD_CLOEXEC))?; + let fd = unsafe { crate::syscall!(eventfd(0, libc::EFD_CLOEXEC))? }; unsafe { use std::os::unix::io::FromRawFd; std::fs::File::from_raw_fd(fd) @@ -142,7 +142,7 @@ impl IoUringDriver { #[cfg(feature = "poll-io")] poller_installed: false, #[cfg(feature = "poll-io")] - poll: super::poll::Poll::with_capacity(entries as usize)?, + poller: super::poll::Poll::with_capacity(entries as usize)?, ops: Ops::new(), ext_arg: uring.params().is_feature_ext_arg(), uring, @@ -269,7 +269,7 @@ impl IoUringDriver { // 2.1 install poller #[cfg(feature = "poll-io")] if !inner.poller_installed { - self.install_poller(inner, inner.poll.as_raw_fd()); + self.install_poller(inner, inner.poller.as_raw_fd()); } // 2.2 install eventfd and timeout @@ -329,7 +329,7 @@ impl IoUringDriver { interest: mio::Interest, ) -> io::Result { let inner = unsafe { &mut *this.get() }; - inner.poll.register(source, interest) + inner.poller.register(source, interest) } #[cfg(feature = "poll-io")] @@ -340,7 +340,7 @@ impl IoUringDriver { token: usize, ) -> io::Result<()> { let inner = unsafe { &mut *this.get() }; - inner.poll.deregister(source, token) + inner.poller.deregister(source, token) } } @@ -388,7 +388,7 @@ impl UringInner { #[cfg(feature = "poll-io")] POLLER_USERDATA => { self.poller_installed = false; - self.poll.tick(Some(Duration::ZERO))?; + self.poller.tick(Some(Duration::ZERO))?; } _ if index >= MIN_REVERSED_USERDATA => (), _ => self.ops.complete(index as _, resultify(&cqe), cqe.flags()), @@ -501,7 +501,7 @@ impl UringInner { // wait io ready and do syscall inner - .poll + .poller .poll_syscall(cx, index, direction, || OpAble::legacy_call(data)) } diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs index 20f16fc5..f770877d 100644 --- a/monoio/src/driver/util.rs +++ b/monoio/src/driver/util.rs @@ -32,7 +32,7 @@ pub(super) fn timespec(duration: std::time::Duration) -> io_uring::types::Timesp #[macro_export] macro_rules! syscall { ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ - let res = unsafe { libc::$fn($($arg, )*) }; + let res = libc::$fn($($arg, )*); if res == -1 { Err(std::io::Error::last_os_error()) } else { @@ -46,7 +46,7 @@ macro_rules! syscall { #[macro_export] macro_rules! syscall { ($fn: ident ( $($arg: expr),* $(,)* ), $err_test: path, $err_value: expr) => {{ - let res = unsafe { $fn($($arg, )*) }; + let res = $fn($($arg, )*); if $err_test(&res, &$err_value) { Err(std::io::Error::last_os_error()) } else { @@ -60,9 +60,9 @@ macro_rules! syscall { macro_rules! syscall_u32 { ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ #[cfg(windows)] - let res = unsafe { $fn($($arg, )*) }; + let res = $fn($($arg, )*); #[cfg(unix)] - let res = unsafe { libc::$fn($($arg, )*) }; + let res = libc::$fn($($arg, )*); if res < 0 { Err(std::io::Error::last_os_error()) } else { diff --git a/monoio/src/io/async_fd.rs b/monoio/src/io/async_fd.rs new file mode 100644 index 00000000..a96c920d --- /dev/null +++ b/monoio/src/io/async_fd.rs @@ -0,0 +1,297 @@ +use std::{ + io, + os::fd::AsRawFd, + task::{Context, Poll}, +}; + +pub use crate::driver::ready::Ready; +use crate::driver::{poll::Poll as LegacyPoll, ready::Direction, shared_fd::SharedFd, CURRENT}; + +/// Associates an IO object backed by a Unix file descriptor with the async runtime. +pub struct AsyncFd { + _fd: SharedFd, + token: usize, + inner: Option, +} + +/// Represents an IO-ready event detected on a particular file descriptor that +/// has not yet been acknowledged. This is a `must_use` structure to help ensure +/// that you do not forget to explicitly clear (or not clear) the event. +#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method \ + on ReadyGuard"] +pub struct AsyncFdReadyGuard<'a, T: AsRawFd> { + async_fd: &'a AsyncFd, + event: Ready, + token: usize, +} + +/// Represents an IO-ready event detected on a particular file descriptor that +/// has not yet been acknowledged. This is a `must_use` structure to help ensure +/// that you do not forget to explicitly clear (or not clear) the event. +#[must_use = "You must explicitly choose whether to clear the readiness state by calling a method \ + on ReadyGuard"] +pub struct AsyncFdReadyMutGuard<'a, T: AsRawFd> { + async_fd: &'a mut AsyncFd, + event: Ready, + token: usize, +} + +impl AsyncFd { + /// Creates an [`AsyncFd`] backed by (and taking ownership of) an object + /// implementing [`AsRawFd`]. The backing file descriptor is cached at the + /// time of creation. + #[inline] + pub fn new(inner: T) -> io::Result { + let fd = SharedFd::new::(inner.as_raw_fd())?; + let token = fd.registered_index().expect("registered index must exist"); + fd.set_close_on_drop(false); + Ok(Self { + _fd: fd, + token, + inner: Some(inner), + }) + } + + /// Returns a shared reference to the backing object of this [`AsyncFd`]. + #[inline] + pub fn get_ref(&self) -> &T { + self.inner.as_ref().unwrap() + } + + /// Returns a mutable reference to the backing object of this [`AsyncFd`]. + #[inline] + pub fn get_mut(&mut self) -> &mut T { + self.inner.as_mut().unwrap() + } + + /// Deregisters this file descriptor and returns ownership of the backing + /// object. + pub fn into_inner(mut self) -> T { + self.inner.take().unwrap() + } + + /// Polls for read readiness. + #[inline] + pub fn poll_read_ready<'a>( + &'a self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_ready_with_direction(cx, Direction::Read) + } + + /// Polls for read readiness. + #[inline] + pub fn poll_read_ready_mut<'a>( + &'a mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_ready_with_direction_mut(cx, Direction::Read) + } + + /// Polls for write readiness. + #[inline] + pub fn poll_write_ready<'a>( + &'a self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_ready_with_direction(cx, Direction::Write) + } + + /// Polls for write readiness. + #[inline] + pub fn poll_write_ready_mut<'a>( + &'a mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_ready_with_direction_mut(cx, Direction::Write) + } + + #[inline] + fn poll_ready_with_direction<'a>( + &'a self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll>> { + let r = CURRENT.with(|c| match c { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(inner) => unsafe { + (*inner.get()) + .poller + .poll_readiness(cx, self.token, direction) + }, + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(inner) => unsafe { + (*inner.get()) + .poller + .poll_readiness(cx, self.token, direction) + }, + }); + let event = ready!(r); + + Poll::Ready(Ok(AsyncFdReadyGuard { + async_fd: self, + event, + token: self.token, + })) + } + + #[inline] + fn poll_ready_with_direction_mut<'a>( + &'a mut self, + cx: &mut Context<'_>, + direction: Direction, + ) -> Poll>> { + let token = self.token; + let r = CURRENT.with(|c| match c { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(inner) => unsafe { + (*inner.get()).poller.poll_readiness(cx, token, direction) + }, + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(inner) => unsafe { + (*inner.get()).poller.poll_readiness(cx, token, direction) + }, + }); + let event = ready!(r); + + Poll::Ready(Ok(AsyncFdReadyMutGuard { + async_fd: self, + event, + token, + })) + } +} + +impl<'a, T: AsRawFd> AsyncFdReadyGuard<'a, T> { + /// Indicates to the rumtime that the file descriptor is no longer ready. + /// All internal readiness flags will be cleared. + pub fn clear_ready(&mut self) { + CURRENT.with(|c| match c { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + self.event, + ) + }, + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + self.event, + ) + }, + }); + } + + /// Indicates to the rumtime that the file descriptor is no longer ready. + /// The internal readiness flag will be cleared + pub fn clear_ready_matching(&mut self, clear: Ready) { + CURRENT.with(|c| match c { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + clear, + ) + }, + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + clear, + ) + }, + }); + } + + /// Performs the provided IO operation. + pub fn try_io( + &mut self, + f: impl FnOnce(&'a AsyncFd) -> io::Result, + ) -> Result, TryIoError> { + let result = f(self.async_fd); + + match result { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + self.clear_ready(); + Err(TryIoError(())) + } + result => Ok(result), + } + } +} + +impl<'a, T: AsRawFd> AsyncFdReadyMutGuard<'a, T> { + /// Indicates to the rumtime that the file descriptor is no longer ready. + /// All internal readiness flags will be cleared. + pub fn clear_ready(&mut self) { + CURRENT.with(|c| match c { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + self.event, + ) + }, + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + self.event, + ) + }, + }); + } + + /// Indicates to the rumtime that the file descriptor is no longer ready. + /// The internal readiness flag will be cleared + pub fn clear_ready_matching(&mut self, clear: Ready) { + CURRENT.with(|c| match c { + #[cfg(all(target_os = "linux", feature = "iouring"))] + crate::driver::Inner::Uring(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + clear, + ) + }, + #[cfg(feature = "legacy")] + crate::driver::Inner::Legacy(inner) => unsafe { + LegacyPoll::clear_readiness( + &mut (*inner.get()).poller.io_dispatch, + self.token, + clear, + ) + }, + }); + } + + /// Performs the provided IO operation. + pub fn try_io( + &mut self, + f: impl FnOnce(&mut AsyncFd) -> io::Result, + ) -> Result, TryIoError> { + let result = f(self.async_fd); + + match result { + Err(err) if err.kind() == io::ErrorKind::WouldBlock => { + self.clear_ready(); + Err(TryIoError(())) + } + result => Ok(result), + } + } +} + +/// The error type returned by [`try_io`]. +/// +/// This error indicates that the IO resource returned a [`WouldBlock`] error. +#[derive(Debug)] +pub struct TryIoError(()); diff --git a/monoio/src/io/mod.rs b/monoio/src/io/mod.rs index 7330c94b..f59131b4 100644 --- a/monoio/src/io/mod.rs +++ b/monoio/src/io/mod.rs @@ -2,6 +2,8 @@ mod async_buf_read; mod async_buf_read_ext; +#[cfg(all(unix, feature = "poll-io"))] +mod async_fd; mod async_read_rent; mod async_read_rent_ext; mod async_rent_cancelable; @@ -18,6 +20,8 @@ pub mod splice; pub use async_buf_read::AsyncBufRead; pub use async_buf_read_ext::AsyncBufReadExt; +#[cfg(all(unix, feature = "poll-io"))] +pub use async_fd::{AsyncFd, AsyncFdReadyGuard, Ready, TryIoError}; pub use async_read_rent::{AsyncReadRent, AsyncReadRentAt}; pub use async_read_rent_ext::AsyncReadRentExt; pub use async_rent_cancelable::{CancelableAsyncReadRent, CancelableAsyncWriteRent}; diff --git a/monoio/src/macros/select.rs b/monoio/src/macros/select.rs index 8736ac8f..e34dbab9 100644 --- a/monoio/src/macros/select.rs +++ b/monoio/src/macros/select.rs @@ -38,11 +38,11 @@ /// polled. /// 3. Concurrently await on the results for all remaining ``s. 4. Once an `` returns a value, attempt to -/// apply the value to the provided ``, if the pattern matches, -/// evaluate `` and return. If the pattern **does not** match, -/// disable the current branch and for the remainder of the current call to -/// `select!`. Continue from step 3. 5. If **all** branches are disabled, -/// evaluate the `else` expression. If no else branch is provided, panic. +/// apply the value to the provided ``, if the pattern matches, +/// evaluate `` and return. If the pattern **does not** match, +/// disable the current branch and for the remainder of the current call to +/// `select!`. Continue from step 3. 5. If **all** branches are disabled, +/// evaluate the `else` expression. If no else branch is provided, panic. /// /// # Runtime characteristics /// diff --git a/monoio/src/net/mod.rs b/monoio/src/net/mod.rs index 6f4eb5c9..3007c3bf 100644 --- a/monoio/src/net/mod.rs +++ b/monoio/src/net/mod.rs @@ -53,11 +53,11 @@ pub(crate) fn new_socket( // Gives a warning for platforms without SOCK_NONBLOCK. #[allow(clippy::let_and_return)] #[cfg(unix)] - let socket = crate::syscall!(socket(domain, socket_type, 0)); + let socket = unsafe { crate::syscall!(socket(domain, socket_type, 0)) }; // Mimic `libstd` and set `SO_NOSIGPIPE` on apple systems. #[cfg(target_vendor = "apple")] - let socket = socket.and_then(|socket| { + let socket = socket.and_then(|socket| unsafe { crate::syscall!(setsockopt( socket, libc::SOL_SOCKET, @@ -71,18 +71,20 @@ pub(crate) fn new_socket( // Darwin doesn't have SOCK_NONBLOCK or SOCK_CLOEXEC. #[cfg(any(target_os = "ios", target_os = "macos"))] let socket = socket.and_then(|socket| { - // For platforms that don't support flags in socket, we need to - // set the flags ourselves. - crate::syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK)) - .and_then(|_| { - crate::syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket) - }) - .map_err(|e| { - // If either of the `fcntl` calls failed, ensure the socket is - // closed and return the error. - let _ = crate::syscall!(close(socket)); - e - }) + unsafe { + // For platforms that don't support flags in socket, we need to + // set the flags ourselves. + crate::syscall!(fcntl(socket, libc::F_SETFL, libc::O_NONBLOCK)) + .and_then(|_| { + crate::syscall!(fcntl(socket, libc::F_SETFD, libc::FD_CLOEXEC)).map(|_| socket) + }) + .map_err(|e| { + // If either of the `fcntl` calls failed, ensure the socket is + // closed and return the error. + let _ = crate::syscall!(close(socket)); + e + }) + } }); socket @@ -100,29 +102,29 @@ pub(crate) fn new_socket( domain: ADDRESS_FAMILY, socket_type: WINSOCK_SOCKET_TYPE, ) -> std::io::Result { - let _: i32 = crate::syscall!( - WSAStartup(MAKEWORD(2, 2), std::ptr::null_mut()), - PartialEq::eq, - NO_ERROR as _ - )?; - let socket = crate::syscall!( - socket(domain as _, socket_type, 0), - PartialEq::eq, - INVALID_SOCKET - )?; - crate::syscall!( - ioctlsocket(socket, FIONBIO, &mut 1), - PartialEq::ne, - NO_ERROR as _ - ) - .map(|_: i32| socket as RawSocket) - .map_err(|e| { - // If either of the `ioctlsocket` calls failed, ensure the socket is - // closed and return the error. - unsafe { + unsafe { + let _: i32 = crate::syscall!( + WSAStartup(MAKEWORD(2, 2), std::ptr::null_mut()), + PartialEq::eq, + NO_ERROR as _ + )?; + let socket = crate::syscall!( + socket(domain as _, socket_type, 0), + PartialEq::eq, + INVALID_SOCKET + )?; + crate::syscall!( + ioctlsocket(socket, FIONBIO, &mut 1), + PartialEq::ne, + NO_ERROR as _ + ) + .map(|_: i32| socket as RawSocket) + .map_err(|e| { + // If either of the `ioctlsocket` calls failed, ensure the socket is + // closed and return the error. closesocket(socket); WSACleanup(); - } - e - }) + e + }) + } } diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index c1bfb2dc..eb16e86f 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -147,7 +147,7 @@ impl TcpStream { crate::driver::Inner::Legacy(inner) => { let idx = stream.fd.registered_index().unwrap(); if let Some(mut readiness) = - unsafe { &mut *inner.get() }.io_dispatch.get(idx) + unsafe { &mut *inner.get() }.poller.io_dispatch.get(idx) { readiness.set_writable(); } diff --git a/monoio/src/net/tcp/tfo/linux.rs b/monoio/src/net/tcp/tfo/linux.rs index bea85ed6..f34c2215 100644 --- a/monoio/src/net/tcp/tfo/linux.rs +++ b/monoio/src/net/tcp/tfo/linux.rs @@ -11,13 +11,15 @@ thread_local! { /// Call before listen. pub(crate) fn set_tcp_fastopen(fd: &S, fast_open: i32) -> io::Result<()> { - crate::syscall!(setsockopt( - fd.as_raw_fd(), - libc::SOL_TCP, - libc::TCP_FASTOPEN, - &fast_open as *const _ as *const libc::c_void, - std::mem::size_of::() as libc::socklen_t - ))?; + unsafe { + crate::syscall!(setsockopt( + fd.as_raw_fd(), + libc::SOL_TCP, + libc::TCP_FASTOPEN, + &fast_open as *const _ as *const libc::c_void, + std::mem::size_of::() as libc::socklen_t + ))? + }; Ok(()) } @@ -26,13 +28,15 @@ pub(crate) fn set_tcp_fastopen(fd: &S, fast_open: i32) -> io::Result pub(crate) fn set_tcp_fastopen_connect(fd: &S) -> io::Result<()> { const ENABLED: libc::c_int = 0x1; - crate::syscall!(setsockopt( - fd.as_raw_fd(), - libc::SOL_TCP, - libc::TCP_FASTOPEN_CONNECT, - &ENABLED as *const _ as *const libc::c_void, - std::mem::size_of::() as libc::socklen_t - ))?; + unsafe { + crate::syscall!(setsockopt( + fd.as_raw_fd(), + libc::SOL_TCP, + libc::TCP_FASTOPEN_CONNECT, + &ENABLED as *const _ as *const libc::c_void, + std::mem::size_of::() as libc::socklen_t + ))? + }; Ok(()) } diff --git a/monoio/src/net/tcp/tfo/macos.rs b/monoio/src/net/tcp/tfo/macos.rs index 3465fe54..ed3f66e9 100644 --- a/monoio/src/net/tcp/tfo/macos.rs +++ b/monoio/src/net/tcp/tfo/macos.rs @@ -3,13 +3,15 @@ use std::{io, os::fd::AsRawFd}; /// Call before listen. pub(crate) fn set_tcp_fastopen(fd: &S) -> io::Result<()> { const ENABLED: libc::c_int = 0x1; - crate::syscall!(setsockopt( - fd.as_raw_fd(), - libc::IPPROTO_TCP, - libc::TCP_FASTOPEN, - &ENABLED as *const _ as *const libc::c_void, - std::mem::size_of::() as libc::socklen_t - ))?; + unsafe { + crate::syscall!(setsockopt( + fd.as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_FASTOPEN, + &ENABLED as *const _ as *const libc::c_void, + std::mem::size_of::() as libc::socklen_t + ))? + }; Ok(()) } @@ -19,12 +21,14 @@ pub(crate) fn set_tcp_fastopen_force_enable(fd: &S) -> io::Result<() const TCP_FASTOPEN_FORCE_ENABLE: libc::c_int = 0x218; const ENABLED: libc::c_int = 0x1; - crate::syscall!(setsockopt( - fd.as_raw_fd(), - libc::IPPROTO_TCP, - TCP_FASTOPEN_FORCE_ENABLE, - &ENABLED as *const _ as *const libc::c_void, - std::mem::size_of::() as libc::socklen_t - ))?; + unsafe { + crate::syscall!(setsockopt( + fd.as_raw_fd(), + libc::IPPROTO_TCP, + TCP_FASTOPEN_FORCE_ENABLE, + &ENABLED as *const _ as *const libc::c_void, + std::mem::size_of::() as libc::socklen_t + ))? + }; Ok(()) } diff --git a/monoio/src/net/unix/async_fd.rs b/monoio/src/net/unix/async_fd.rs new file mode 100644 index 00000000..e69de29b diff --git a/monoio/src/net/unix/pipe.rs b/monoio/src/net/unix/pipe.rs index 760a0332..51c5510b 100644 --- a/monoio/src/net/unix/pipe.rs +++ b/monoio/src/net/unix/pipe.rs @@ -29,9 +29,11 @@ pub fn new_pipe() -> io::Result<(Pipe, Pipe)> { 0 } }; - #[cfg(target_os = "linux")] - crate::syscall!(pipe2(pipes.as_mut_ptr() as _, flag))?; - #[cfg(not(target_os = "linux"))] - crate::syscall!(pipe(pipes.as_mut_ptr() as _))?; - Ok((Pipe::from_raw_fd(pipes[0]), Pipe::from_raw_fd(pipes[1]))) + unsafe { + #[cfg(target_os = "linux")] + crate::syscall!(pipe2(pipes.as_mut_ptr() as _, flag))?; + #[cfg(not(target_os = "linux"))] + crate::syscall!(pipe(pipes.as_mut_ptr() as _))?; + Ok((Pipe::from_raw_fd(pipes[0]), Pipe::from_raw_fd(pipes[1]))) + } } diff --git a/monoio/src/net/unix/seq_packet/listener.rs b/monoio/src/net/unix/seq_packet/listener.rs index 0fa3ca5c..fe419f2c 100644 --- a/monoio/src/net/unix/seq_packet/listener.rs +++ b/monoio/src/net/unix/seq_packet/listener.rs @@ -26,8 +26,11 @@ impl UnixSeqpacketListener { pub fn bind_with_backlog>(path: P, backlog: libc::c_int) -> io::Result { let (addr, addr_len) = socket_addr(path.as_ref())?; let socket = new_socket(libc::AF_UNIX, libc::SOCK_SEQPACKET)?; - crate::syscall!(bind(socket, &addr as *const _ as *const _, addr_len))?; - crate::syscall!(listen(socket, backlog))?; + unsafe { + crate::syscall!(bind(socket, &addr as *const _ as *const _, addr_len))?; + crate::syscall!(listen(socket, backlog))?; + } + Ok(Self { fd: SharedFd::new::(socket)?, }) diff --git a/monoio/src/net/unix/socket_addr.rs b/monoio/src/net/unix/socket_addr.rs index 38db6bb3..272f4ba8 100644 --- a/monoio/src/net/unix/socket_addr.rs +++ b/monoio/src/net/unix/socket_addr.rs @@ -233,15 +233,19 @@ where }; let mut fds = [-1; 2]; - crate::syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))?; + unsafe { crate::syscall!(socketpair(libc::AF_UNIX, flags, 0, fds.as_mut_ptr()))? }; let pair = unsafe { (T::from_raw_fd(fds[0]), T::from_raw_fd(fds[1])) }; Ok(pair) } pub(crate) fn local_addr(socket: RawFd) -> io::Result { - SocketAddr::new(|sockaddr, socklen| crate::syscall!(getsockname(socket, sockaddr, socklen))) + SocketAddr::new(|sockaddr, socklen| unsafe { + crate::syscall!(getsockname(socket, sockaddr, socklen)) + }) } pub(crate) fn peer_addr(socket: RawFd) -> io::Result { - SocketAddr::new(|sockaddr, socklen| crate::syscall!(getpeername(socket, sockaddr, socklen))) + SocketAddr::new(|sockaddr, socklen| unsafe { + crate::syscall!(getpeername(socket, sockaddr, socklen)) + }) } diff --git a/monoio/src/time/driver/entry.rs b/monoio/src/time/driver/entry.rs index 83555749..9b912f0b 100644 --- a/monoio/src/time/driver/entry.rs +++ b/monoio/src/time/driver/entry.rs @@ -525,6 +525,7 @@ impl TimerHandle { unsafe { self.inner.as_ref().sync_when() } } + #[allow(unused)] pub(super) unsafe fn is_pending(&self) -> bool { unsafe { self.inner.as_ref().state.is_pending() } } diff --git a/monoio/src/time/driver/mod.rs b/monoio/src/time/driver/mod.rs index c270d4d7..9ab4c9f0 100644 --- a/monoio/src/time/driver/mod.rs +++ b/monoio/src/time/driver/mod.rs @@ -2,7 +2,6 @@ // in the future, this will change to the reverse. For now, suppress this // warning and generally stick with being explicit about unsafety. #![allow(unused_unsafe)] -#![cfg_attr(not(feature = "rt"), allow(dead_code))] //! Time driver @@ -179,6 +178,7 @@ where /// can either be created directly or the `Handle` instance can be passed to /// `with_default`, setting the timer as the default timer for the execution /// context. + #[allow(unused)] pub(crate) fn handle(&self) -> Handle { self.handle.clone() } diff --git a/monoio/src/time/driver/wheel/level.rs b/monoio/src/time/driver/wheel/level.rs index 3d53aec1..28414ea3 100644 --- a/monoio/src/time/driver/wheel/level.rs +++ b/monoio/src/time/driver/wheel/level.rs @@ -257,7 +257,7 @@ fn slot_for(duration: u64, level: usize) -> usize { ((duration >> (level * 6)) % LEVEL_MULT as u64) as usize } -#[cfg(all(test, not(loom)))] +#[cfg(test)] mod test { use super::*; diff --git a/monoio/src/time/driver/wheel/mod.rs b/monoio/src/time/driver/wheel/mod.rs index 5003e83b..24d0f8a6 100644 --- a/monoio/src/time/driver/wheel/mod.rs +++ b/monoio/src/time/driver/wheel/mod.rs @@ -303,7 +303,7 @@ fn level_for(elapsed: u64, when: u64) -> usize { significant / 6 } -#[cfg(all(test, not(loom)))] +#[cfg(test)] mod test { use super::*; diff --git a/monoio/src/utils/slab.rs b/monoio/src/utils/slab.rs index 58461137..651823dd 100644 --- a/monoio/src/utils/slab.rs +++ b/monoio/src/utils/slab.rs @@ -336,7 +336,7 @@ impl Drop for Page { } else { // slow drop to_drop.set_len(self.initialized); - std::mem::transmute::<_, Vec>>(to_drop); + std::mem::transmute::>>, Vec>>(to_drop); } } } diff --git a/monoio/tests/async_fd.rs b/monoio/tests/async_fd.rs new file mode 100644 index 00000000..f41e0a10 --- /dev/null +++ b/monoio/tests/async_fd.rs @@ -0,0 +1,7 @@ +#[cfg(all(feature = "poll-io", unix))] +#[monoio::test_all(internal = true)] +async fn test_async_fn() { + // use libc to create a eventfd + use libc::{eventfd, EFD_NONBLOCK}; + let eventfd = unsafe { eventfd(0, EFD_NONBLOCK) }; +} diff --git a/monoio/tests/buf_writter.rs b/monoio/tests/buf_writter.rs index c1f9e00a..35b36f29 100644 --- a/monoio/tests/buf_writter.rs +++ b/monoio/tests/buf_writter.rs @@ -3,7 +3,7 @@ use monoio::{ net::{TcpListener, TcpStream}, }; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn ensure_buf_writter_write_properly() { let srv = TcpListener::bind("127.0.0.1:0").unwrap(); let addr = srv.local_addr().unwrap(); diff --git a/monoio/tests/ctrlc_legacy.rs b/monoio/tests/ctrlc_legacy.rs index 2a0fd53b..156ba9ec 100644 --- a/monoio/tests/ctrlc_legacy.rs +++ b/monoio/tests/ctrlc_legacy.rs @@ -1,5 +1,5 @@ #[cfg(feature = "signal")] -#[monoio::test(driver = "legacy")] +#[monoio::test(driver = "legacy", internal = true)] async fn test_ctrlc_legacy() { use libc::{getpid, kill, SIGINT}; use monoio::utils::CtrlC; diff --git a/monoio/tests/ctrlc_uring.rs b/monoio/tests/ctrlc_uring.rs index 602b963d..7e0f0207 100644 --- a/monoio/tests/ctrlc_uring.rs +++ b/monoio/tests/ctrlc_uring.rs @@ -1,5 +1,5 @@ #[cfg(feature = "signal")] -#[monoio::test(driver = "uring")] +#[monoio::test(driver = "uring", internal = true)] async fn test_ctrlc_uring() { use libc::{getpid, kill, SIGINT}; use monoio::utils::CtrlC; diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs index 13fe492e..947dca57 100644 --- a/monoio/tests/fs_file.rs +++ b/monoio/tests/fs_file.rs @@ -20,7 +20,7 @@ async fn read_hello(file: &File) { assert_eq!(&buf, &HELLO[..n]); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn basic_read() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); @@ -30,7 +30,7 @@ async fn basic_read() { read_hello(&file).await; } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn basic_read_exact() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); @@ -47,7 +47,7 @@ async fn basic_read_exact() { assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn basic_write() { let tempfile = tempfile(); @@ -59,7 +59,7 @@ async fn basic_write() { assert_eq!(file, HELLO); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn basic_write_all() { let tempfile = tempfile(); @@ -71,7 +71,7 @@ async fn basic_write_all() { assert_eq!(file, HELLO); } -#[monoio::test(driver = "uring")] +#[monoio::test(driver = "uring", internal = true)] async fn cancel_read() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); @@ -85,7 +85,7 @@ async fn cancel_read() { read_hello(&file).await; } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn explicit_close() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); @@ -102,7 +102,7 @@ async fn explicit_close() { assert_invalid_fd(fd, tempfile.as_file().metadata().unwrap()); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn drop_open() { let tempfile = tempfile(); @@ -137,7 +137,7 @@ fn drop_off_runtime() { assert_invalid_fd(fd, tempfile.as_file().metadata().unwrap()); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn sync_doesnt_kill_anything() { let tempfile = tempfile(); diff --git a/monoio/tests/tcp_accept.rs b/monoio/tests/tcp_accept.rs index df6236c3..f28dec4a 100644 --- a/monoio/tests/tcp_accept.rs +++ b/monoio/tests/tcp_accept.rs @@ -5,7 +5,7 @@ use monoio::net::{TcpListener, TcpStream}; macro_rules! test_accept { ($(($ident:ident, $target:expr),)*) => { $( - #[monoio::test_all] + #[monoio::test_all(internal = true)] async fn $ident() { let listener = TcpListener::bind($target).unwrap(); let addr = listener.local_addr().unwrap(); diff --git a/monoio/tests/tcp_connect.rs b/monoio/tests/tcp_connect.rs index 29c9a191..10780c00 100644 --- a/monoio/tests/tcp_connect.rs +++ b/monoio/tests/tcp_connect.rs @@ -5,7 +5,7 @@ use monoio::net::{TcpListener, TcpStream}; macro_rules! test_connect_ip { ($(($ident:ident, $target:expr, $addr_f:path),)*) => { $( - #[monoio::test_all] + #[monoio::test_all(internal = true)] async fn $ident() { let listener = TcpListener::bind($target).unwrap(); let addr = listener.local_addr().unwrap(); @@ -50,7 +50,7 @@ test_connect_ip! { macro_rules! test_connect { ($(($ident:ident, $mapping:tt),)*) => { $( - #[monoio::test_all] + #[monoio::test_all(internal = true)] async fn $ident() { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); #[allow(clippy::redundant_closure_call)] @@ -94,7 +94,7 @@ test_connect! { })), } -#[monoio::test_all(timer_enabled = true)] +#[monoio::test_all(timer_enabled = true, internal = true)] async fn connect_timeout_dst() { let drop_flag = DropFlag::default(); let drop_flag_copy = drop_flag.clone(); @@ -113,12 +113,12 @@ async fn connect_timeout_dst() { drop_flag.assert_dropped(); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn connect_invalid_dst() { assert!(TcpStream::connect("127.0.0.1:1").await.is_err()); } -#[monoio::test_all(timer_enabled = true)] +#[monoio::test_all(timer_enabled = true, internal = true)] async fn cancel_read() { use monoio::io::CancelableAsyncReadRent; @@ -135,7 +135,7 @@ async fn cancel_read() { assert!(res.is_err()); } -#[monoio::test_all(timer_enabled = true)] +#[monoio::test_all(timer_enabled = true, internal = true)] async fn cancel_select() { use std::pin::pin; diff --git a/monoio/tests/tcp_echo.rs b/monoio/tests/tcp_echo.rs index d3aca3a1..5645964e 100644 --- a/monoio/tests/tcp_echo.rs +++ b/monoio/tests/tcp_echo.rs @@ -3,7 +3,7 @@ use monoio::{ net::{TcpListener, TcpStream}, }; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn echo_server() { const ITER: usize = 1024; @@ -66,7 +66,7 @@ async fn echo_server() { } } -#[monoio::test_all(timer_enabled = true)] +#[monoio::test_all(timer_enabled = true, internal = true)] async fn rw_able() { let listener = TcpListener::bind("127.0.0.1:0").unwrap(); let listener_addr = listener.local_addr().unwrap(); @@ -100,7 +100,7 @@ async fn rw_able() { assert!(conn.readable(false).await.is_ok()); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn echo_tfo() { use std::net::SocketAddr; diff --git a/monoio/tests/tcp_into_split.rs b/monoio/tests/tcp_into_split.rs index 4fc10124..f4ca556f 100644 --- a/monoio/tests/tcp_into_split.rs +++ b/monoio/tests/tcp_into_split.rs @@ -9,7 +9,7 @@ use monoio::{ try_join, }; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn split() -> Result<()> { const MSG: &[u8] = b"split"; @@ -50,7 +50,7 @@ async fn split() -> Result<()> { Ok(()) } -#[monoio::test_all(enable_timer = true)] +#[monoio::test_all(enable_timer = true, internal = true)] async fn reunite() -> Result<()> { let listener = net::TcpListener::bind("127.0.0.1:0")?; let addr = listener.local_addr()?; @@ -78,7 +78,7 @@ async fn reunite() -> Result<()> { } /// Test that dropping the write half actually closes the stream. -#[monoio::test_all(enable_timer = true, entries = 1024)] +#[monoio::test_all(enable_timer = true, entries = 1024, internal = true)] async fn drop_write() -> Result<()> { const MSG: &[u8] = b"split"; diff --git a/monoio/tests/tcp_split.rs b/monoio/tests/tcp_split.rs index 4342b456..389c6219 100644 --- a/monoio/tests/tcp_split.rs +++ b/monoio/tests/tcp_split.rs @@ -8,7 +8,7 @@ use monoio::{ net::TcpStream, }; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn split() -> Result<()> { const MSG: &[u8] = b"split"; diff --git a/monoio/tests/udp.rs b/monoio/tests/udp.rs index 595e6398..c2863cc7 100644 --- a/monoio/tests/udp.rs +++ b/monoio/tests/udp.rs @@ -1,6 +1,6 @@ use monoio::net::udp::UdpSocket; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn connect() { const MSG: &str = "foo bar baz"; @@ -20,7 +20,7 @@ async fn connect() { assert_eq!(active.peer_addr().unwrap(), passive_addr); } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn send_to() { const MSG: &str = "foo bar baz"; @@ -57,7 +57,7 @@ async fn send_to() { must_success!(passive3.recv_from(vec![0; 20]).await, active_addr); } -#[monoio::test_all(timer_enabled = true)] +#[monoio::test_all(timer_enabled = true, internal = true)] async fn rw_able() { const MSG: &str = "foo bar baz"; @@ -79,7 +79,7 @@ async fn rw_able() { assert!(passive.readable(false).await.is_ok()); } -#[monoio::test_all(timer_enabled = true)] +#[monoio::test_all(timer_enabled = true, internal = true)] async fn cancel_recv_from() { let passive = UdpSocket::bind("127.0.0.1:0").unwrap(); let canceller = monoio::io::Canceller::new(); diff --git a/monoio/tests/uds_cred.rs b/monoio/tests/uds_cred.rs index 717d6589..dbd4c012 100644 --- a/monoio/tests/uds_cred.rs +++ b/monoio/tests/uds_cred.rs @@ -2,7 +2,7 @@ use libc::{getegid, geteuid}; use monoio::net::UnixStream; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn test_socket_pair() { let (a, b) = UnixStream::pair().unwrap(); let cred_a = a.peer_cred().unwrap(); diff --git a/monoio/tests/uds_split.rs b/monoio/tests/uds_split.rs index 350a51c4..bdf87df4 100644 --- a/monoio/tests/uds_split.rs +++ b/monoio/tests/uds_split.rs @@ -10,7 +10,7 @@ use monoio::{ /// Verifies that the implementation of `AsyncWrite::poll_shutdown` shutdowns /// the stream for writing by reading to the end of stream on the other side of /// the connection. -#[monoio::test_all(entries = 1024)] +#[monoio::test_all(entries = 1024, internal = true)] async fn split() -> std::io::Result<()> { let (a, b) = UnixStream::pair()?; diff --git a/monoio/tests/uds_stream.rs b/monoio/tests/uds_stream.rs index 5d6c3772..56c9ea01 100644 --- a/monoio/tests/uds_stream.rs +++ b/monoio/tests/uds_stream.rs @@ -5,7 +5,7 @@ use monoio::{ net::{UnixListener, UnixStream}, }; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn accept_read_write() -> std::io::Result<()> { let dir = tempfile::Builder::new() .prefix("monoio-uds-tests") @@ -42,7 +42,7 @@ async fn accept_read_write() -> std::io::Result<()> { Ok(()) } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn shutdown() -> std::io::Result<()> { let dir = tempfile::Builder::new() .prefix("monoio-uds-tests") diff --git a/monoio/tests/unix_datagram.rs b/monoio/tests/unix_datagram.rs index 67fec883..d64aecbd 100644 --- a/monoio/tests/unix_datagram.rs +++ b/monoio/tests/unix_datagram.rs @@ -1,7 +1,7 @@ #![cfg(unix)] use monoio::net::unix::UnixDatagram; -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn accept_send_recv() -> std::io::Result<()> { let dir = tempfile::Builder::new() .prefix("monoio-unix-datagram-tests") @@ -25,7 +25,7 @@ async fn accept_send_recv() -> std::io::Result<()> { Ok(()) } -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn addr_type() -> std::io::Result<()> { let dir = tempfile::Builder::new() .prefix("monoio-unix-datagram-tests") diff --git a/monoio/tests/unix_seqpacket.rs b/monoio/tests/unix_seqpacket.rs index 65fc21ef..5e133d76 100644 --- a/monoio/tests/unix_seqpacket.rs +++ b/monoio/tests/unix_seqpacket.rs @@ -1,5 +1,5 @@ #[cfg(target_os = "linux")] -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn test_seqpacket() -> std::io::Result<()> { use monoio::net::unix::{UnixSeqpacket, UnixSeqpacketListener}; diff --git a/monoio/tests/zero_copy.rs b/monoio/tests/zero_copy.rs index 6abed191..f63011d3 100644 --- a/monoio/tests/zero_copy.rs +++ b/monoio/tests/zero_copy.rs @@ -1,5 +1,5 @@ #[cfg(all(target_os = "linux", feature = "splice"))] -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn zero_copy_for_tcp() { use monoio::{ buf::IoBufMut, @@ -29,7 +29,7 @@ async fn zero_copy_for_tcp() { } #[cfg(all(target_os = "linux", feature = "splice"))] -#[monoio::test_all] +#[monoio::test_all(internal = true)] async fn zero_copy_for_uds() { use monoio::{ buf::IoBufMut,