diff --git a/src/main.rs b/src/main.rs index 581033f..b1b6af5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,8 +9,12 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use std::io::Write; +use std::sync::Mutex; + mod utils; use utils::file_reader; +use utils::file_writer; // The arguments end up in the Cli struct #[derive(Parser, Debug)] @@ -49,6 +53,10 @@ struct Cli { #[arg(short, long, value_parser)] contam: Option, + /// Output contaminants to a file + #[arg(long, value_parser)] + output_contam: Option, + /// Output the opposite of the normal results #[arg(long)] inverse: bool, @@ -57,6 +65,10 @@ struct Cli { #[arg(short = 'i', long = "input", value_parser)] input: Option, + /// Output filename [default: write to stdout] + #[arg(short = 'o', long = "output", value_parser)] + output: Option, + /// Filter max GC content #[arg(long, value_parser, default_value_t = 1.0)] maxgc: f64, @@ -83,15 +95,21 @@ fn main() -> Result<(), Box> { .expect("Error: Unable to build threadpool"); let mut reader = file_reader(args.input.as_ref())?; - filter(&mut reader, args); + let contam_writer = file_writer(args.output_contam.as_ref())?; + let output_writer = file_writer(args.output.as_ref())?; + filter(&mut reader, output_writer, contam_writer, args); Ok(()) } /// This function filters fastq on stdin based on quality, maxlength and minlength /// and applies trimming before writting to stdout -fn filter(input: &mut T, args: Cli) -where +fn filter( + input: &mut T, + output_writer: Arc>>, + contam_writer: Arc>>, + args: Cli, +) where T: Read + std::marker::Send, { match args.contam { @@ -130,17 +148,19 @@ where && read_gc >= args.mingc && read_gc <= args.maxgc && read_len >= args.minlength - && read_len <= args.maxlength - && !is_contamination(&record.seq(), &aligner)) + && read_len <= args.maxlength) || (args.inverse && (average_quality < args.minqual || average_quality > args.maxqual || read_len < args.minlength - || read_len > args.maxlength - || is_contamination(&record.seq(), &aligner))) + || read_len > args.maxlength)) { - write_record(record, &args, read_len); - output_reads_.fetch_add(1, Ordering::SeqCst); + if is_contamination(&record.seq(), &aligner) { + write_record(record, &args, contam_writer.clone(), read_len); + } else { + write_record(record, &args, output_writer.clone(), read_len); + output_reads_.fetch_add(1, Ordering::SeqCst); + } } } } @@ -189,7 +209,7 @@ where || read_len < args.minlength || read_len > args.maxlength)) { - write_record(record, &args, read_len); + write_record(record, &args, output_writer.clone(), read_len); output_reads_.fetch_add(1, Ordering::SeqCst); } } @@ -203,7 +223,12 @@ where } } -fn write_record(record: fastq::Record, args: &Cli, read_len: usize) { +fn write_record( + record: fastq::Record, + args: &Cli, + writer: Arc>>, + read_len: usize, +) { // Check if a description attribute is present, taken from the bio-rust code to format fastq let header = match record.desc() { Some(d) => format!("{} {}", record.id(), d), @@ -211,14 +236,19 @@ fn write_record(record: fastq::Record, args: &Cli, read_len: usize) { }; // Print out the records passing the filters, applying trimming on seq and qual // Could consider to use unsafe `from_utf8_unchecked` - println!( - "@{}\n{}\n+\n{}", + let output_str = format!( + "@{}\n{}\n+\n{}\n", header, std::str::from_utf8(&record.seq()[args.headcrop..read_len - args.tailcrop]) .expect("ERROR: problem writing fastq seq"), std::str::from_utf8(&record.qual()[args.headcrop..read_len - args.tailcrop]) .expect("ERROR: problem writing fastq qual") ); + writer + .lock() + .unwrap() + .write_all(output_str.as_bytes()) + .unwrap(); } /// This function calculates the average quality of a read, and does this correctly @@ -290,19 +320,23 @@ fn test_ave_qual() { fn test_filter() { filter( &mut std::fs::File::open("test-data/test.fastq").unwrap(), + Arc::new(Mutex::new(Box::new(std::io::stdout()))), + Arc::new(Mutex::new(Box::new(std::io::stdout()))), Cli { - minlength: 100, - maxlength: 100000, minqual: 5.0, maxqual: 200.0, + minlength: 100, + maxlength: 100000, headcrop: 10, tailcrop: 10, threads: 1, contam: None, + output_contam: None, inverse: false, input: None, - mingc: 0.0, + output: None, maxgc: 1.0, + mingc: 0.0, }, ); } @@ -335,19 +369,23 @@ fn test_no_contam() { fn test_filter_with_contam() { filter( &mut std::fs::File::open("test-data/test.fastq").unwrap(), + Arc::new(Mutex::new(Box::new(std::io::stdout()))), + Arc::new(Mutex::new(Box::new(std::io::stdout()))), Cli { - minlength: 100, - maxlength: 100000, minqual: 5.0, maxqual: 100.0, + minlength: 100, + maxlength: 100000, headcrop: 10, tailcrop: 10, threads: 1, contam: Some("test-data/random_contam.fa".to_owned()), + output_contam: None, inverse: false, input: None, - mingc: 0.0, + output: None, maxgc: 1.0, + mingc: 0.0, }, ); } diff --git a/src/utils.rs b/src/utils.rs index dcb1b9f..20d4d54 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,8 +1,12 @@ +use flate2::{write, Compression}; +use std::ffi::OsStr; +use std::fs::File; +use std::io::{BufWriter, Read, Write}; +use std::path::Path; +use std::sync::{Arc, Mutex}; use std::{ error::Error, - fs::File, - io::{self, prelude::*, BufRead, BufReader}, - path::Path, + io::{self, BufRead, BufReader}, }; const MAGIC_MAX_LEN: usize = 6; @@ -55,6 +59,42 @@ fn is_xz + Copy>(file_name: P) -> Result> { .is_some_and(|ext| ext == "xz")) } +// Creates a handy writer to output to either a file or stdout (and automatically compresses if the file extension is .gz) +pub fn file_writer

( + file_out: Option

, +) -> Result>>, Box> +where + P: AsRef + Copy, +{ + if let Some(file_name) = file_out { + let file_name = file_name.as_ref(); + let file = match File::create(&file_name) { + Err(why) => panic!("couldn't open {}: {}", file_name.display(), why.to_string()), + Ok(file) => file, + }; + + if file_name.extension() == Some(OsStr::new("gz")) { + Ok(Arc::new(Mutex::new(Box::new(BufWriter::with_capacity( + 128 * 1024, + write::GzEncoder::new(file, Compression::default()), + ))))) + } else { + Ok(Arc::new(Mutex::new(Box::new(BufWriter::with_capacity( + 128 * 1024, + file, + ))))) + } + } else { + if atty::is(atty::Stream::Stdout) { + eprintln!("Warning: no redirection detected, not writing anywhere"); + Ok(Arc::new(Mutex::new(Box::new(io::sink())))) + } else { + let fp = BufWriter::new(io::stdout()); + Ok(Arc::new(Mutex::new(Box::new(fp)))) + } + } +} + pub fn file_reader

(file_in: Option

) -> Result, Box> where P: AsRef + Copy,