Skip to content

Commit

Permalink
feat: Introduce PrintSender type
Browse files Browse the repository at this point in the history
Wrap the underlying Tokio sender so we can do our own error type on it.
This updates all callers to only use the new wrapper type.
  • Loading branch information
alilleybrinker committed Nov 14, 2024
1 parent 772a9e6 commit dccbfd9
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 58 deletions.
16 changes: 7 additions & 9 deletions omnibor-cli/src/cmd/artifact/find.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ use crate::{
cli::{Config, FindArgs, SelectedHash},
error::{Error, Result},
fs::*,
print::{error::ErrorMsg, find_file::FindFileMsg, PrinterCmd},
print::{error::ErrorMsg, find_file::FindFileMsg, PrintSender, PrinterCmd},
};
use async_walkdir::WalkDir;
use futures_lite::stream::StreamExt as _;
use tokio::sync::mpsc::Sender;

/// Run the `artifact find` subcommand.
pub async fn run(tx: &Sender<PrinterCmd>, config: &Config, args: &FindArgs) -> Result<()> {
pub async fn run(tx: &PrintSender, config: &Config, args: &FindArgs) -> Result<()> {
let FindArgs { aid, path } = args;

let url = aid.url();
Expand All @@ -21,16 +20,16 @@ pub async fn run(tx: &Sender<PrinterCmd>, config: &Config, args: &FindArgs) -> R
loop {
match entries.next().await {
None => break,
Some(Err(source)) => tx
.send(PrinterCmd::msg(
Some(Err(source)) => {
tx.send(PrinterCmd::msg(
ErrorMsg::new(Error::WalkDirFailed {
path: path.to_path_buf(),
source,
}),
config.format(),
))
.await
.map_err(|_| Error::PrintChannelClose)?,
.await?
}
Some(Ok(entry)) => {
let path = &entry.path();

Expand All @@ -49,8 +48,7 @@ pub async fn run(tx: &Sender<PrinterCmd>, config: &Config, args: &FindArgs) -> R
},
config.format(),
))
.await
.map_err(|_| Error::PrintChannelClose)?;
.await?;
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions omnibor-cli/src/cmd/artifact/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ use crate::{
cli::{Config, IdArgs},
error::Result,
fs::*,
print::PrinterCmd,
print::PrintSender,
};
use tokio::sync::mpsc::Sender;

/// Run the `artifact id` subcommand.
pub async fn run(tx: &Sender<PrinterCmd>, config: &Config, args: &IdArgs) -> Result<()> {
pub async fn run(tx: &PrintSender, config: &Config, args: &IdArgs) -> Result<()> {
let mut file = open_async_file(&args.path).await?;

if file_is_dir(&file, &args.path).await? {
Expand Down
8 changes: 3 additions & 5 deletions omnibor-cli/src/cmd/debug/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
use crate::{
cli::Config,
error::{Error, Result},
print::{root_dir::RootDirMsg, PrinterCmd},
print::{root_dir::RootDirMsg, PrintSender, PrinterCmd},
};
use tokio::sync::mpsc::Sender;

/// Run the `debug config` subcommand.
pub async fn run(tx: &Sender<PrinterCmd>, config: &Config) -> Result<()> {
pub async fn run(tx: &PrintSender, config: &Config) -> Result<()> {
let root = config.dir().ok_or(Error::NoRoot)?.to_path_buf();

tx.send(PrinterCmd::msg(RootDirMsg { path: root }, config.format()))
.await
.map_err(|_| Error::PrintChannelClose)?;
.await?;

Ok(())
}
9 changes: 2 additions & 7 deletions omnibor-cli/src/cmd/manifest/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@
use crate::{
cli::{Config, ManifestAddArgs},
error::Result,
print::PrinterCmd,
print::PrintSender,
};
use tokio::sync::mpsc::Sender;

/// Run the `manifest add` subcommand.
pub async fn run(
_tx: &Sender<PrinterCmd>,
_config: &Config,
_args: &ManifestAddArgs,
) -> Result<()> {
pub async fn run(_tx: &PrintSender, _config: &Config, _args: &ManifestAddArgs) -> Result<()> {
todo!()
}
9 changes: 2 additions & 7 deletions omnibor-cli/src/cmd/manifest/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,16 @@
use crate::{
cli::{Config, ManifestCreateArgs},
error::{Error, Result},
print::PrinterCmd,
print::PrintSender,
};
use omnibor::{
embedding::NoEmbed, hashes::Sha256, storage::FileSystemStorage, InputManifestBuilder,
IntoArtifactId, RelationKind,
};
use tokio::sync::mpsc::Sender;
use tracing::info;

/// Run the `manifest create` subcommand.
pub async fn run(
_tx: &Sender<PrinterCmd>,
config: &Config,
args: &ManifestCreateArgs,
) -> Result<()> {
pub async fn run(_tx: &PrintSender, config: &Config, args: &ManifestCreateArgs) -> Result<()> {
let root = config.dir().ok_or_else(|| Error::NoRoot)?;

info!(root = %root.display());
Expand Down
9 changes: 2 additions & 7 deletions omnibor-cli/src/cmd/manifest/remove.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@
use crate::{
cli::{Config, ManifestRemoveArgs},
error::Result,
print::PrinterCmd,
print::PrintSender,
};
use tokio::sync::mpsc::Sender;

/// Run the `manifest remove` subcommand.
pub async fn run(
_tx: &Sender<PrinterCmd>,
_config: &Config,
_args: &ManifestRemoveArgs,
) -> Result<()> {
pub async fn run(_tx: &PrintSender, _config: &Config, _args: &ManifestRemoveArgs) -> Result<()> {
todo!()
}
19 changes: 9 additions & 10 deletions omnibor-cli/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
use crate::{
cli::{Format, SelectedHash},
error::{Error, Result},
print::{error::ErrorMsg, id_file::IdFileMsg, PrinterCmd},
print::{error::ErrorMsg, id_file::IdFileMsg, PrintSender, PrinterCmd},
};
use async_walkdir::{DirEntry as AsyncDirEntry, WalkDir};
use futures_lite::stream::StreamExt as _;
use omnibor::{hashes::Sha256, ArtifactId};
use std::path::Path;
use tokio::{fs::File as AsyncFile, sync::mpsc::Sender};
use tokio::fs::File as AsyncFile;
use url::Url;

// Identify, recursively, all the files under a directory.
pub async fn id_directory(
tx: &Sender<PrinterCmd>,
tx: &PrintSender,
path: &Path,
format: Format,
hash: SelectedHash,
Expand All @@ -24,16 +24,16 @@ pub async fn id_directory(
loop {
match entries.next().await {
None => break,
Some(Err(source)) => tx
.send(PrinterCmd::msg(
Some(Err(source)) => {
tx.send(PrinterCmd::msg(
ErrorMsg::new(Error::WalkDirFailed {
path: path.to_path_buf(),
source,
}),
format,
))
.await
.map_err(|_| Error::PrintChannelClose)?,
.await?
}
Some(Ok(entry)) => {
let path = &entry.path();

Expand All @@ -52,7 +52,7 @@ pub async fn id_directory(

/// Identify a single file.
pub async fn id_file(
tx: &Sender<PrinterCmd>,
tx: &PrintSender,
file: &mut AsyncFile,
path: &Path,
format: Format,
Expand All @@ -67,8 +67,7 @@ pub async fn id_file(
},
format,
))
.await
.map_err(|_| Error::PrintChannelClose)?;
.await?;

Ok(())
}
Expand Down
11 changes: 4 additions & 7 deletions omnibor-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ mod print;
use crate::{
cli::{ArtifactCommand, Command, Config, DebugCommand, ManifestCommand},
cmd::{artifact, debug, manifest},
error::{Error, Result},
print::{error::ErrorMsg, Printer, PrinterCmd},
error::Result,
print::{error::ErrorMsg, PrintSender, Printer, PrinterCmd},
};
use clap::Parser as _;
use clap_verbosity_flag::{InfoLevel, Verbosity};
use std::process::ExitCode;
use tokio::sync::mpsc::Sender;
use tracing::trace;
use tracing_subscriber::filter::EnvFilter;

Expand Down Expand Up @@ -76,7 +75,7 @@ fn adapt_level_filter(
}

/// Select and run the chosen command.
async fn run(tx: &Sender<PrinterCmd>, config: &Config) -> Result<()> {
async fn run(tx: &PrintSender, config: &Config) -> Result<()> {
match config.command() {
Command::Artifact(ref args) => match args.command() {
ArtifactCommand::Id(ref args) => artifact::id::run(tx, config, args).await?,
Expand All @@ -93,9 +92,7 @@ async fn run(tx: &Sender<PrinterCmd>, config: &Config) -> Result<()> {
}

// Ensure we always send the "End" printer command.
tx.send(PrinterCmd::End)
.await
.map_err(|_| Error::PrintChannelClose)?;
tx.send(PrinterCmd::End).await?;

Ok(())
}
17 changes: 14 additions & 3 deletions omnibor-cli/src/print/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tracing::{debug, error};
/// A handle to assist in interacting with the printer.
pub struct Printer {
/// The transmitter to send message to the task.
tx: Sender<PrinterCmd>,
tx: PrintSender,

/// The actual future to be awaited.
task: Box<dyn Future<Output = StdResult<(), JoinError>> + Unpin>,
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Printer {
});

Printer {
tx,
tx: PrintSender(tx),
task: Box::new(printer),
}
}
Expand Down Expand Up @@ -98,11 +98,22 @@ impl Printer {
}

/// Get a reference to the task transmitter.
pub fn tx(&self) -> &Sender<PrinterCmd> {
pub fn tx(&self) -> &PrintSender {
&self.tx
}
}

pub struct PrintSender(Sender<PrinterCmd>);

impl PrintSender {
pub async fn send(&self, value: PrinterCmd) -> Result<()> {
self.0
.send(value)
.await
.map_err(|_| Error::PrintChannelClose)
}
}

/// A print queue message, either an actual message or a signals to end printing.
#[derive(Debug, Clone)]
pub enum PrinterCmd {
Expand Down

0 comments on commit dccbfd9

Please sign in to comment.