Skip to content

Commit

Permalink
[Fix] Corrupt implementation of line_piper
Browse files Browse the repository at this point in the history
  • Loading branch information
sisungo committed Feb 26, 2024
1 parent 4e5977e commit d38c808
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 99 deletions.
20 changes: 0 additions & 20 deletions airupfx/airupfx-ace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,26 +221,6 @@ impl Child {
},
}
}

#[inline]
pub fn stdout(&self) -> Option<Arc<airupfx_io::LinePiper>> {
match self {
Self::Async(child) => child.stdout(),
Self::AlwaysSuccess(child) => child.stdout(),
Self::Process(proc) => proc.stdout(),
Self::Builtin(_) => None,
}
}

#[inline]
pub fn stderr(&self) -> Option<Arc<airupfx_io::LinePiper>> {
match self {
Self::Async(child) => child.stderr(),
Self::AlwaysSuccess(child) => child.stderr(),
Self::Process(proc) => proc.stderr(),
Self::Builtin(_) => None,
}
}
}
impl From<airupfx_process::Child> for Child {
fn from(value: airupfx_process::Child) -> Self {
Expand Down
87 changes: 44 additions & 43 deletions airupfx/airupfx-io/src/line_piper.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{future::Future, pin::Pin};
use tokio::{
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader},
io::{AsyncRead, AsyncReadExt},
sync::mpsc,
task::JoinHandle,
};
Expand All @@ -18,20 +18,7 @@ pub struct LinePiper {
impl LinePiper {
pub fn new(reader: impl AsyncRead + Unpin + Send + 'static) -> Self {
let (tx, rx) = mpsc::channel(4);
let join_handle = LinePiperEntity::new(reader, tx).start();
Self {
rx: rx.into(),
join_handle,
}
}

pub fn with_callback(
reader: impl AsyncRead + Unpin + Send + 'static,
callback: Box<dyn Callback>,
) -> Self {
let (tx, rx) = mpsc::channel(4);
let join_handle: JoinHandle<()> =
LinePiperEntity::with_callback(reader, tx, callback).start();
let join_handle = LinePiperEntity::new(reader, Box::new(ChannelCallback::new(tx))).start();
Self {
rx: rx.into(),
join_handle,
Expand All @@ -48,26 +35,17 @@ impl Drop for LinePiper {
}
}

pub fn set_callback(reader: impl AsyncRead + Unpin + Send + 'static, callback: Box<dyn Callback>) {
LinePiperEntity::new(reader, callback).start();
}

struct LinePiperEntity<R> {
reader: R,
tx: mpsc::Sender<Vec<u8>>,
callback: Option<Box<dyn Callback>>,
callback: Box<dyn Callback>,
}
impl<R: AsyncRead + Unpin + Send + 'static> LinePiperEntity<R> {
fn new(reader: R, tx: mpsc::Sender<Vec<u8>>) -> Self {
Self {
reader,
tx,
callback: None,
}
}

fn with_callback(reader: R, tx: mpsc::Sender<Vec<u8>>, callback: Box<dyn Callback>) -> Self {
Self {
reader,
tx,
callback: Some(callback),
}
fn new(reader: R, callback: Box<dyn Callback>) -> Self {
Self { reader, callback }
}

fn start(self) -> JoinHandle<()> {
Expand All @@ -76,21 +54,44 @@ impl<R: AsyncRead + Unpin + Send + 'static> LinePiperEntity<R> {
})
}

async fn run(self) -> Option<()> {
let mut buf = Vec::new();
let mut buf_reader = BufReader::new(self.reader);
async fn run(mut self) -> Option<()> {
let mut buf = Box::new([0u8; 4096]);
let mut position = 0;
loop {
let mut limited = (&mut buf_reader).take(1024 * 4);
limited.read_until(b'\n', &mut buf).await.ok()?;
match &self.callback {
Some(callback) => {
callback.invoke(&buf).await;
let pos = loop {
let count = self.reader.read(&mut buf[position..]).await.ok()?;
position += count;
if let Some(pos) = &buf[..position].iter().position(|x| b"\n\r\0".contains(x)) {
break *pos;
}
None => {
self.tx.send(buf.clone()).await.ok()?;
assert!(position <= 4096);
if position == 4096 {
break 4096;
}
}
buf.clear();
};
self.callback.invoke(&buf[..pos]).await;
position = 0;
}
}
}

#[derive(Clone)]
struct ChannelCallback {
tx: mpsc::Sender<Vec<u8>>,
}
impl ChannelCallback {
fn new(tx: mpsc::Sender<Vec<u8>>) -> Self {
Self { tx }
}
}
impl Callback for ChannelCallback {
fn clone_boxed(&self) -> Box<dyn Callback> {
Box::new(self.clone())
}

fn invoke<'a>(&'a self, a: &'a [u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
Box::pin(async {
self.tx.send(a.to_vec()).await.ok();
})
}
}
18 changes: 1 addition & 17 deletions airupfx/airupfx-process/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ cfg_if::cfg_if! {
}
}

use airupfx_io::line_piper::{Callback as LinePiperCallback, LinePiper};
use airupfx_io::line_piper::Callback as LinePiperCallback;
use std::{
convert::Infallible,
ffi::OsString,
ops::{Deref, DerefMut},
path::PathBuf,
sync::Arc,
};

/// Returns `true` if supervising `forking` services are supported on the system.
Expand Down Expand Up @@ -136,16 +135,6 @@ impl Child {
pub fn kill(&self) -> std::io::Result<()> {
self.0.kill()
}

/// Returns a reference to the `stdout` piper handle of the child process.
pub fn stdout(&self) -> Option<Arc<LinePiper>> {
self.0.stdout()
}

/// Returns a reference to the `stderr` piper handle of the child process.
pub fn stderr(&self) -> Option<Arc<LinePiper>> {
self.0.stderr()
}
}
impl From<sys::Child> for Child {
fn from(inner: sys::Child) -> Self {
Expand All @@ -159,9 +148,6 @@ pub enum Stdio {
#[default]
Inherit,

/// A new pipe should be arranged to connect the parent and child processes.
Piped,

/// Similar to [`Stdio::Piped`], but a callback is called on each line.
Callback(Box<dyn LinePiperCallback>),

Expand All @@ -172,7 +158,6 @@ impl Clone for Stdio {
fn clone(&self) -> Self {
match self {
Self::Inherit => Self::Inherit,
Self::Piped => Self::Piped,
Self::Callback(c) => Self::Callback(c.clone_boxed()),
Self::File(f) => Self::File(f.clone()),
}
Expand All @@ -182,7 +167,6 @@ impl Stdio {
pub async fn to_std(&self) -> std::io::Result<std::process::Stdio> {
Ok(match self {
Self::Inherit => std::process::Stdio::inherit(),
Self::Piped => std::process::Stdio::piped(),
Self::Callback(_) => std::process::Stdio::piped(),
Self::File(path) => tokio::fs::File::options()
.append(true)
Expand Down
33 changes: 14 additions & 19 deletions airupfx/airupfx-process/src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

use super::{CommandEnv, ExitStatus, Stdio, Wait};
use ahash::AHashMap;
use airupfx_io::LinePiper;
use airupfx_io::line_piper;
use std::{
cmp,
convert::Infallible,
os::unix::process::CommandExt as _,
sync::{Arc, OnceLock, RwLock},
sync::{OnceLock, RwLock},
};
use tokio::{signal::unix::SignalKind, sync::watch};

Expand Down Expand Up @@ -111,9 +111,8 @@ impl ExitStatusExt for ExitStatus {
macro_rules! map_stdio {
($fx:expr, $std:expr) => {
match &$fx {
Stdio::Piped => LinePiper::new($std),
Stdio::Callback(c) => LinePiper::with_callback($std, c.clone_boxed()),
_ => LinePiper::new($std),
Stdio::Callback(c) => line_piper::set_callback($std, c.clone_boxed()),
_ => (),
}
};
}
Expand All @@ -123,8 +122,6 @@ macro_rules! map_stdio {
pub(crate) struct Child {
pid: Pid,
wait_queue: watch::Receiver<Option<Wait>>,
stdout: Option<Arc<LinePiper>>,
stderr: Option<Arc<LinePiper>>,
}
impl Child {
pub(crate) const fn id(&self) -> Pid {
Expand All @@ -133,21 +130,21 @@ impl Child {

fn from_std(env: &CommandEnv, c: std::process::Child) -> Self {
let pid = c.id();
let stdout = c
if let Some(x) = c
.stdout
.and_then(|x| tokio::process::ChildStdout::from_std(x).ok())
.map(|x| map_stdio!(env.stdout, x))
.map(Arc::new);
let stderr = c
{
map_stdio!(env.stdout, x);
}
if let Some(x) = c
.stderr
.and_then(|x| tokio::process::ChildStderr::from_std(x).ok())
.map(|x| map_stdio!(env.stderr, x))
.map(Arc::new);
{
map_stdio!(env.stderr, x);
}
Self {
pid: pid as _,
wait_queue: child_queue().subscribe(pid as _),
stdout,
stderr,
}
}

Expand All @@ -159,8 +156,6 @@ impl Child {
Self {
pid,
wait_queue: child_queue().subscribe(pid),
stdout: None,
stderr: None,
}
}

Expand Down Expand Up @@ -189,13 +184,13 @@ impl Child {
self.send_signal(libc::SIGKILL)
}

pub(crate) fn stdout(&self) -> Option<Arc<LinePiper>> {
/*pub(crate) fn stdout(&self) -> Option<Arc<LinePiper>> {
self.stdout.clone()
}
pub(crate) fn stderr(&self) -> Option<Arc<LinePiper>> {
self.stderr.clone()
}
}*/
}
impl Drop for Child {
fn drop(&mut self) {
Expand Down

0 comments on commit d38c808

Please sign in to comment.