From 3515c835724c9a12fac99b44843e46737c86089e Mon Sep 17 00:00:00 2001 From: WardDeb Date: Mon, 27 Jan 2025 12:43:13 +0100 Subject: [PATCH] simplify and include a parallel block iterator in binsize>1 calcs --- src/covcalc.rs | 72 +++++++----------------------------------- src/multibamsummary.rs | 53 ++++++++++++++++--------------- 2 files changed, 38 insertions(+), 87 deletions(-) diff --git a/src/covcalc.rs b/src/covcalc.rs index 4b2b06c5c4..0bbf510c88 100644 --- a/src/covcalc.rs +++ b/src/covcalc.rs @@ -1,10 +1,13 @@ use rust_htslib::bam::{self, Read, IndexedReader, record::Cigar}; +use rust_htslib::bam::ext::BamRecordExtensions; use std::collections::HashMap; use tempfile::{Builder, TempPath}; use std::io::{BufWriter, Write}; use std::cmp::min; use std::fmt; use ndarray::Array1; +use rayon::prelude::*; +use std::collections::HashSet; pub fn parse_regions(regions: &Vec<(String, u32, u32)>, bam_ifile: Vec<&str>) -> (Vec, HashMap) { // Takes a vector of regions, and a bam reference @@ -172,41 +175,14 @@ pub fn bam_pileup<'a>( readlens.push(record.seq_len() as u32); } } - - let blocks = bam_blocks(record); - // keep a record of bin indices that have been updated already - let mut changedbins: Vec = Vec::new(); - for block in blocks.into_iter() { - // Don't count blocks that exceed the chromosome - if block.0 >= region.2 { - continue; - } - binix = block.0 / binsize; - if !changedbins.contains(&binix) { - counts[binix as usize] += 1.0; - changedbins.push(binix); - } - // Don't count blocks that exceed the chromosome - if block.1 >= region.2 { - continue; - } - // Note that our blocks are strictly less then region ends, hence no problem with bin spewing at end: - // binix = [a,b) where b == region.2 would divide into binix+1 (doesn't exist). - binix = block.1 / binsize; - if !changedbins.contains(&binix) { - counts[binix as usize] += 1.0; - changedbins.push(binix); - } - } - // if changedbins contains non-continuous blocks (perhaps read length spans multiple bins), update the inbetweens - if let (Some(&min), Some(&max)) = (changedbins.iter().min(), changedbins.iter().max()) { - for ix in min..=max { - if !changedbins.contains(&ix) { - counts[ix as usize] += 1.0; - changedbins.push(ix); - } - } - } + let indices: HashSet = record.aligned_blocks() + .par_bridge() + .filter(|x| (x[1] as u32) < region.2) + .flat_map(|x| x[0] as u32..x[1] as u32) + .map(|x| (x / binsize) as usize) + .collect(); + indices.into_iter() + .for_each(|ix| counts[ix] += 1.0); } let mut writer = BufWriter::new(&bg); // There are two scenarios: @@ -372,32 +348,6 @@ pub fn bam_pileup<'a>( return (tmpvec, mapped_reads, unmapped_reads, readlens, fraglens); } -/// Extract contigious blocks from a bam record -/// Blocks are split per insertion, padding, deletion and ref skip -fn bam_blocks(rec: bam::Record) -> Vec<(u32, u32)> { - let mut blocks: Vec<(u32, u32)> = Vec::new(); - let mut pos = rec.pos(); - - for c in rec.cigar().iter() { - match c { - Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => { - let start = pos; - let end = pos + *len as i64 -1; - blocks.push((start as u32, end as u32)); - pos = end; - } - Cigar::Ins(len) | Cigar::Pad(len) => { - pos += *len as i64; - } - Cigar::Del(len) | Cigar::RefSkip(len) => { - pos += *len as i64; - } - _ => (), - } - } - return blocks; -} - pub struct Alignmentfilters { pub minmappingquality: u8, pub samflaginclude: u16, diff --git a/src/multibamsummary.rs b/src/multibamsummary.rs index aa109c54d1..7e96b43f04 100644 --- a/src/multibamsummary.rs +++ b/src/multibamsummary.rs @@ -4,7 +4,7 @@ use rayon::prelude::*; use rayon::ThreadPoolBuilder; use itertools::{multizip, multiunzip, izip}; use std::io::prelude::*; -use std::io::BufReader; +use std::io::{BufReader, BufWriter}; use std::fs::File; use ndarray::{Array1, Array2, stack, Axis}; use std::io; @@ -167,6 +167,18 @@ pub fn r_mbams( println!("Define output file"); } + let mut cfile: Option> = None; + if out_raw_counts != "None" { + cfile = Some(BufWriter::new(File::create(out_raw_counts).unwrap())); + // Write the header to the file. + let mut headstr = String::new(); + headstr.push_str("#\'chr\'\t\'start\'\t\'end\'"); + for label in bamlabels.iter() { + headstr.push_str(&format!("\t\'{}\'", label)); + } + writeln!(cfile.as_mut().unwrap(), "{}", headstr).unwrap(); + } + // Collate the coverage files into a matrix. let its: Vec<_> = covcalcs.iter().map(|x| x.into_iter()).collect(); let zips = TempZip { iterators: its }; @@ -176,10 +188,11 @@ pub fn r_mbams( let zips_vec: Vec<_> = zips.collect(); let matvec: Array2 = pool.install(|| { let matvecs: Vec> = zips_vec - .par_iter() + .iter() .flat_map(|c| { - let readers: Vec<_> = c.iter().map(|x| BufReader::new(File::open(x).unwrap()).lines()).collect(); + let readers: Vec<_> = c.par_iter().map(|x| BufReader::new(File::open(x).unwrap()).lines()).collect(); let mut _matvec: Vec> = Vec::new(); + let mut _regions: Vec<(String, u32, u32)> = Vec::new(); for mut _l in (TempZip { iterators: readers }) { // unwrap all lines in _l let lines: Vec<_> = _l @@ -189,6 +202,16 @@ pub fn r_mbams( .map(|x: Vec<&str> | ( x[0].to_string(), x[1].parse::().unwrap(), x[2].parse::().unwrap(), x[3].parse::().unwrap() ) ) .collect(); let arrline = Array1::from(lines.iter().map(|x| x.3).collect::>()); + // If cfile is define, write the raw counts to file. + if let Some(ref mut file) = cfile { + let mut regstr = String::new(); + regstr.push_str(&format!("{}\t{}\t{}",lines[0].0, lines[0].1, lines[0].2)); + // push values from array + for val in arrline.iter() { + regstr.push_str(&format!("\t{}", val)); + } + writeln!(file, "{}", regstr).unwrap(); + } _matvec.push(arrline); } _matvec @@ -207,7 +230,7 @@ pub fn r_mbams( writeln!(sf_file, "{}\t{}", label, sf).unwrap(); } } - let mut npz = NpzWriter::new(File::create(ofile).unwrap()); + let mut npz = NpzWriter::new_compressed(File::create(ofile).unwrap()); npz.add_array("matrix", &matvec).unwrap(); npz.add_array("labels", &bamlabels_arr).unwrap(); npz.finish().unwrap(); @@ -215,28 +238,6 @@ pub fn r_mbams( println!("Matrix written."); } - // If raw counts are required, write them to file. - if out_raw_counts != "None" { - let mut cfile = File::create(out_raw_counts).unwrap(); - // Create the header - let mut headstr = String::new(); - headstr.push_str("#\'chr\'\t\'start\'\t\'end\'"); - for label in bamlabels.iter() { - headstr.push_str(&format!("\t\'{}\'", label)); - } - writeln!(cfile, "{}", headstr).unwrap(); - for (i, region) in regions.iter().enumerate() { - let mut line = String::new(); - line.push_str(&format!("{}\t{}\t{}", region.chrom, region.get_startu(), region.get_endu())); - for j in 0..bamlabels.len() { - line.push_str(&format!("\t{}", matvec[[i, j]])); - } - writeln!(cfile, "{}", line).unwrap(); - } - } - if verbose { - println!("Counts written."); - } Ok(()) }