diff --git a/airupfx/airupfx-ace/src/lib.rs b/airupfx/airupfx-ace/src/lib.rs index 8390139..5e9d77f 100644 --- a/airupfx/airupfx-ace/src/lib.rs +++ b/airupfx/airupfx-ace/src/lib.rs @@ -221,26 +221,6 @@ impl Child { }, } } - - #[inline] - pub fn stdout(&self) -> Option> { - match self { - Self::Async(child) => child.stdout(), - Self::AlwaysSuccess(child) => child.stdout(), - Self::Process(proc) => proc.stdout(), - Self::Builtin(_) => None, - } - } - - #[inline] - pub fn stderr(&self) -> Option> { - match self { - Self::Async(child) => child.stderr(), - Self::AlwaysSuccess(child) => child.stderr(), - Self::Process(proc) => proc.stderr(), - Self::Builtin(_) => None, - } - } } impl From for Child { fn from(value: airupfx_process::Child) -> Self { diff --git a/airupfx/airupfx-io/src/line_piper.rs b/airupfx/airupfx-io/src/line_piper.rs index 2db5ea8..66d432b 100644 --- a/airupfx/airupfx-io/src/line_piper.rs +++ b/airupfx/airupfx-io/src/line_piper.rs @@ -1,6 +1,6 @@ use std::{future::Future, pin::Pin}; use tokio::{ - io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}, + io::{AsyncRead, AsyncReadExt}, sync::mpsc, task::JoinHandle, }; @@ -18,20 +18,7 @@ pub struct LinePiper { impl LinePiper { pub fn new(reader: impl AsyncRead + Unpin + Send + 'static) -> Self { let (tx, rx) = mpsc::channel(4); - let join_handle = LinePiperEntity::new(reader, tx).start(); - Self { - rx: rx.into(), - join_handle, - } - } - - pub fn with_callback( - reader: impl AsyncRead + Unpin + Send + 'static, - callback: Box, - ) -> Self { - let (tx, rx) = mpsc::channel(4); - let join_handle: JoinHandle<()> = - LinePiperEntity::with_callback(reader, tx, callback).start(); + let join_handle = LinePiperEntity::new(reader, Box::new(ChannelCallback::new(tx))).start(); Self { rx: rx.into(), join_handle, @@ -48,26 +35,17 @@ impl Drop for LinePiper { } } +pub fn set_callback(reader: impl AsyncRead + Unpin + Send + 'static, callback: Box) { + LinePiperEntity::new(reader, callback).start(); +} + struct LinePiperEntity { reader: R, - tx: mpsc::Sender>, - callback: Option>, + callback: Box, } impl LinePiperEntity { - fn new(reader: R, tx: mpsc::Sender>) -> Self { - Self { - reader, - tx, - callback: None, - } - } - - fn with_callback(reader: R, tx: mpsc::Sender>, callback: Box) -> Self { - Self { - reader, - tx, - callback: Some(callback), - } + fn new(reader: R, callback: Box) -> Self { + Self { reader, callback } } fn start(self) -> JoinHandle<()> { @@ -76,21 +54,44 @@ impl LinePiperEntity { }) } - async fn run(self) -> Option<()> { - let mut buf = Vec::new(); - let mut buf_reader = BufReader::new(self.reader); + async fn run(mut self) -> Option<()> { + let mut buf = Box::new([0u8; 4096]); + let mut position = 0; loop { - let mut limited = (&mut buf_reader).take(1024 * 4); - limited.read_until(b'\n', &mut buf).await.ok()?; - match &self.callback { - Some(callback) => { - callback.invoke(&buf).await; + let pos = loop { + let count = self.reader.read(&mut buf[position..]).await.ok()?; + position += count; + if let Some(pos) = &buf[..position].iter().position(|x| b"\n\r\0".contains(x)) { + break *pos; } - None => { - self.tx.send(buf.clone()).await.ok()?; + assert!(position <= 4096); + if position == 4096 { + break 4096; } - } - buf.clear(); + }; + self.callback.invoke(&buf[..pos]).await; + position = 0; } } } + +#[derive(Clone)] +struct ChannelCallback { + tx: mpsc::Sender>, +} +impl ChannelCallback { + fn new(tx: mpsc::Sender>) -> Self { + Self { tx } + } +} +impl Callback for ChannelCallback { + fn clone_boxed(&self) -> Box { + Box::new(self.clone()) + } + + fn invoke<'a>(&'a self, a: &'a [u8]) -> Pin + Send + 'a>> { + Box::pin(async { + self.tx.send(a.to_vec()).await.ok(); + }) + } +} diff --git a/airupfx/airupfx-process/src/lib.rs b/airupfx/airupfx-process/src/lib.rs index e749250..b157785 100644 --- a/airupfx/airupfx-process/src/lib.rs +++ b/airupfx/airupfx-process/src/lib.rs @@ -9,13 +9,12 @@ cfg_if::cfg_if! { } } -use airupfx_io::line_piper::{Callback as LinePiperCallback, LinePiper}; +use airupfx_io::line_piper::Callback as LinePiperCallback; use std::{ convert::Infallible, ffi::OsString, ops::{Deref, DerefMut}, path::PathBuf, - sync::Arc, }; /// Returns `true` if supervising `forking` services are supported on the system. @@ -136,16 +135,6 @@ impl Child { pub fn kill(&self) -> std::io::Result<()> { self.0.kill() } - - /// Returns a reference to the `stdout` piper handle of the child process. - pub fn stdout(&self) -> Option> { - self.0.stdout() - } - - /// Returns a reference to the `stderr` piper handle of the child process. - pub fn stderr(&self) -> Option> { - self.0.stderr() - } } impl From for Child { fn from(inner: sys::Child) -> Self { @@ -159,9 +148,6 @@ pub enum Stdio { #[default] Inherit, - /// A new pipe should be arranged to connect the parent and child processes. - Piped, - /// Similar to [`Stdio::Piped`], but a callback is called on each line. Callback(Box), @@ -172,7 +158,6 @@ impl Clone for Stdio { fn clone(&self) -> Self { match self { Self::Inherit => Self::Inherit, - Self::Piped => Self::Piped, Self::Callback(c) => Self::Callback(c.clone_boxed()), Self::File(f) => Self::File(f.clone()), } @@ -182,7 +167,6 @@ impl Stdio { pub async fn to_std(&self) -> std::io::Result { Ok(match self { Self::Inherit => std::process::Stdio::inherit(), - Self::Piped => std::process::Stdio::piped(), Self::Callback(_) => std::process::Stdio::piped(), Self::File(path) => tokio::fs::File::options() .append(true) diff --git a/airupfx/airupfx-process/src/unix.rs b/airupfx/airupfx-process/src/unix.rs index 274ed34..fe1de16 100644 --- a/airupfx/airupfx-process/src/unix.rs +++ b/airupfx/airupfx-process/src/unix.rs @@ -9,12 +9,12 @@ use super::{CommandEnv, ExitStatus, Stdio, Wait}; use ahash::AHashMap; -use airupfx_io::LinePiper; +use airupfx_io::line_piper; use std::{ cmp, convert::Infallible, os::unix::process::CommandExt as _, - sync::{Arc, OnceLock, RwLock}, + sync::{OnceLock, RwLock}, }; use tokio::{signal::unix::SignalKind, sync::watch}; @@ -111,9 +111,8 @@ impl ExitStatusExt for ExitStatus { macro_rules! map_stdio { ($fx:expr, $std:expr) => { match &$fx { - Stdio::Piped => LinePiper::new($std), - Stdio::Callback(c) => LinePiper::with_callback($std, c.clone_boxed()), - _ => LinePiper::new($std), + Stdio::Callback(c) => line_piper::set_callback($std, c.clone_boxed()), + _ => (), } }; } @@ -123,8 +122,6 @@ macro_rules! map_stdio { pub(crate) struct Child { pid: Pid, wait_queue: watch::Receiver>, - stdout: Option>, - stderr: Option>, } impl Child { pub(crate) const fn id(&self) -> Pid { @@ -133,21 +130,21 @@ impl Child { fn from_std(env: &CommandEnv, c: std::process::Child) -> Self { let pid = c.id(); - let stdout = c + if let Some(x) = c .stdout .and_then(|x| tokio::process::ChildStdout::from_std(x).ok()) - .map(|x| map_stdio!(env.stdout, x)) - .map(Arc::new); - let stderr = c + { + map_stdio!(env.stdout, x); + } + if let Some(x) = c .stderr .and_then(|x| tokio::process::ChildStderr::from_std(x).ok()) - .map(|x| map_stdio!(env.stderr, x)) - .map(Arc::new); + { + map_stdio!(env.stderr, x); + } Self { pid: pid as _, wait_queue: child_queue().subscribe(pid as _), - stdout, - stderr, } } @@ -159,8 +156,6 @@ impl Child { Self { pid, wait_queue: child_queue().subscribe(pid), - stdout: None, - stderr: None, } } @@ -189,13 +184,13 @@ impl Child { self.send_signal(libc::SIGKILL) } - pub(crate) fn stdout(&self) -> Option> { + /*pub(crate) fn stdout(&self) -> Option> { self.stdout.clone() } pub(crate) fn stderr(&self) -> Option> { self.stderr.clone() - } + }*/ } impl Drop for Child { fn drop(&mut self) {