Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Added the option to output contaminant reads to file #44

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 57 additions & 19 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use std::io::Write;
use std::sync::Mutex;

mod utils;
use utils::file_reader;
use utils::file_writer;

// The arguments end up in the Cli struct
#[derive(Parser, Debug)]
Expand Down Expand Up @@ -49,6 +53,10 @@ struct Cli {
#[arg(short, long, value_parser)]
contam: Option<String>,

/// Output contaminants to a file
#[arg(long, value_parser)]
output_contam: Option<String>,

/// Output the opposite of the normal results
#[arg(long)]
inverse: bool,
Expand All @@ -57,6 +65,10 @@ struct Cli {
#[arg(short = 'i', long = "input", value_parser)]
input: Option<String>,

/// Output filename [default: write to stdout]
#[arg(short = 'o', long = "output", value_parser)]
output: Option<String>,

/// Filter max GC content
#[arg(long, value_parser, default_value_t = 1.0)]
maxgc: f64,
Expand All @@ -83,15 +95,21 @@ fn main() -> Result<(), Box<dyn Error>> {
.expect("Error: Unable to build threadpool");

let mut reader = file_reader(args.input.as_ref())?;
filter(&mut reader, args);
let contam_writer = file_writer(args.output_contam.as_ref())?;
let output_writer = file_writer(args.output.as_ref())?;
filter(&mut reader, output_writer, contam_writer, args);

Ok(())
}

/// This function filters fastq on stdin based on quality, maxlength and minlength
/// and applies trimming before writting to stdout
fn filter<T>(input: &mut T, args: Cli)
where
fn filter<T>(
input: &mut T,
output_writer: Arc<Mutex<Box<dyn Write + Send>>>,
contam_writer: Arc<Mutex<Box<dyn Write + Send>>>,
args: Cli,
) where
T: Read + std::marker::Send,
{
match args.contam {
Expand Down Expand Up @@ -130,17 +148,19 @@ where
&& read_gc >= args.mingc
&& read_gc <= args.maxgc
&& read_len >= args.minlength
&& read_len <= args.maxlength
&& !is_contamination(&record.seq(), &aligner))
&& read_len <= args.maxlength)
|| (args.inverse
&& (average_quality < args.minqual
|| average_quality > args.maxqual
|| read_len < args.minlength
|| read_len > args.maxlength
|| is_contamination(&record.seq(), &aligner)))
|| read_len > args.maxlength))
{
write_record(record, &args, read_len);
output_reads_.fetch_add(1, Ordering::SeqCst);
if is_contamination(&record.seq(), &aligner) {
write_record(record, &args, contam_writer.clone(), read_len);
} else {
write_record(record, &args, output_writer.clone(), read_len);
output_reads_.fetch_add(1, Ordering::SeqCst);
}
}
}
}
Expand Down Expand Up @@ -189,7 +209,7 @@ where
|| read_len < args.minlength
|| read_len > args.maxlength))
{
write_record(record, &args, read_len);
write_record(record, &args, output_writer.clone(), read_len);
output_reads_.fetch_add(1, Ordering::SeqCst);
}
}
Expand All @@ -203,22 +223,32 @@ where
}
}

fn write_record(record: fastq::Record, args: &Cli, read_len: usize) {
fn write_record(
record: fastq::Record,
args: &Cli,
writer: Arc<Mutex<Box<dyn std::io::Write + Send>>>,
read_len: usize,
) {
// Check if a description attribute is present, taken from the bio-rust code to format fastq
let header = match record.desc() {
Some(d) => format!("{} {}", record.id(), d),
None => record.id().to_owned(),
};
// Print out the records passing the filters, applying trimming on seq and qual
// Could consider to use unsafe `from_utf8_unchecked`
println!(
"@{}\n{}\n+\n{}",
let output_str = format!(
"@{}\n{}\n+\n{}\n",
header,
std::str::from_utf8(&record.seq()[args.headcrop..read_len - args.tailcrop])
.expect("ERROR: problem writing fastq seq"),
std::str::from_utf8(&record.qual()[args.headcrop..read_len - args.tailcrop])
.expect("ERROR: problem writing fastq qual")
);
writer
.lock()
.unwrap()
.write_all(output_str.as_bytes())
.unwrap();
}

/// This function calculates the average quality of a read, and does this correctly
Expand Down Expand Up @@ -290,19 +320,23 @@ fn test_ave_qual() {
fn test_filter() {
filter(
&mut std::fs::File::open("test-data/test.fastq").unwrap(),
Arc::new(Mutex::new(Box::new(std::io::stdout()))),
Arc::new(Mutex::new(Box::new(std::io::stdout()))),
Cli {
minlength: 100,
maxlength: 100000,
minqual: 5.0,
maxqual: 200.0,
minlength: 100,
maxlength: 100000,
headcrop: 10,
tailcrop: 10,
threads: 1,
contam: None,
output_contam: None,
inverse: false,
input: None,
mingc: 0.0,
output: None,
maxgc: 1.0,
mingc: 0.0,
},
);
}
Expand Down Expand Up @@ -335,19 +369,23 @@ fn test_no_contam() {
fn test_filter_with_contam() {
filter(
&mut std::fs::File::open("test-data/test.fastq").unwrap(),
Arc::new(Mutex::new(Box::new(std::io::stdout()))),
Arc::new(Mutex::new(Box::new(std::io::stdout()))),
Cli {
minlength: 100,
maxlength: 100000,
minqual: 5.0,
maxqual: 100.0,
minlength: 100,
maxlength: 100000,
headcrop: 10,
tailcrop: 10,
threads: 1,
contam: Some("test-data/random_contam.fa".to_owned()),
output_contam: None,
inverse: false,
input: None,
mingc: 0.0,
output: None,
maxgc: 1.0,
mingc: 0.0,
},
);
}
Expand Down
46 changes: 43 additions & 3 deletions src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
use flate2::{write, Compression};
use std::ffi::OsStr;
use std::fs::File;
use std::io::{BufWriter, Read, Write};
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::{
error::Error,
fs::File,
io::{self, prelude::*, BufRead, BufReader},
path::Path,
io::{self, BufRead, BufReader},
};

const MAGIC_MAX_LEN: usize = 6;
Expand Down Expand Up @@ -55,6 +59,42 @@ fn is_xz<P: AsRef<Path> + Copy>(file_name: P) -> Result<bool, Box<dyn Error>> {
.is_some_and(|ext| ext == "xz"))
}

// Creates a handy writer to output to either a file or stdout (and automatically compresses if the file extension is .gz)
pub fn file_writer<P>(
file_out: Option<P>,
) -> Result<Arc<Mutex<Box<dyn Write + Send>>>, Box<dyn Error>>
where
P: AsRef<Path> + Copy,
{
if let Some(file_name) = file_out {
let file_name = file_name.as_ref();
let file = match File::create(&file_name) {
Err(why) => panic!("couldn't open {}: {}", file_name.display(), why.to_string()),
Ok(file) => file,
};

if file_name.extension() == Some(OsStr::new("gz")) {
Ok(Arc::new(Mutex::new(Box::new(BufWriter::with_capacity(
128 * 1024,
write::GzEncoder::new(file, Compression::default()),
)))))
} else {
Ok(Arc::new(Mutex::new(Box::new(BufWriter::with_capacity(
128 * 1024,
file,
)))))
}
} else {
if atty::is(atty::Stream::Stdout) {
eprintln!("Warning: no redirection detected, not writing anywhere");
Ok(Arc::new(Mutex::new(Box::new(io::sink()))))
} else {
let fp = BufWriter::new(io::stdout());
Ok(Arc::new(Mutex::new(Box::new(fp))))
}
}
}

pub fn file_reader<P>(file_in: Option<P>) -> Result<Box<dyn BufRead + Send>, Box<dyn Error>>
where
P: AsRef<Path> + Copy,
Expand Down
Loading