Skip to content

Commit

Permalink
simplify and include a parallel block iterator in binsize>1 calcs
Browse files Browse the repository at this point in the history
  • Loading branch information
WardDeb committed Jan 27, 2025
1 parent fe8b70b commit 3515c83
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 87 deletions.
72 changes: 11 additions & 61 deletions src/covcalc.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use rust_htslib::bam::{self, Read, IndexedReader, record::Cigar};
use rust_htslib::bam::ext::BamRecordExtensions;
use std::collections::HashMap;
use tempfile::{Builder, TempPath};
use std::io::{BufWriter, Write};
use std::cmp::min;
use std::fmt;
use ndarray::Array1;
use rayon::prelude::*;
use std::collections::HashSet;

pub fn parse_regions(regions: &Vec<(String, u32, u32)>, bam_ifile: Vec<&str>) -> (Vec<Region>, HashMap<String, u32>) {
// Takes a vector of regions, and a bam reference
Expand Down Expand Up @@ -172,41 +175,14 @@ pub fn bam_pileup<'a>(
readlens.push(record.seq_len() as u32);
}
}

let blocks = bam_blocks(record);
// keep a record of bin indices that have been updated already
let mut changedbins: Vec<u32> = Vec::new();
for block in blocks.into_iter() {
// Don't count blocks that exceed the chromosome
if block.0 >= region.2 {
continue;
}
binix = block.0 / binsize;
if !changedbins.contains(&binix) {
counts[binix as usize] += 1.0;
changedbins.push(binix);
}
// Don't count blocks that exceed the chromosome
if block.1 >= region.2 {
continue;
}
// Note that our blocks are strictly less then region ends, hence no problem with bin spewing at end:
// binix = [a,b) where b == region.2 would divide into binix+1 (doesn't exist).
binix = block.1 / binsize;
if !changedbins.contains(&binix) {
counts[binix as usize] += 1.0;
changedbins.push(binix);
}
}
// if changedbins contains non-continuous blocks (perhaps read length spans multiple bins), update the inbetweens
if let (Some(&min), Some(&max)) = (changedbins.iter().min(), changedbins.iter().max()) {
for ix in min..=max {
if !changedbins.contains(&ix) {
counts[ix as usize] += 1.0;
changedbins.push(ix);
}
}
}
let indices: HashSet<usize> = record.aligned_blocks()
.par_bridge()
.filter(|x| (x[1] as u32) < region.2)
.flat_map(|x| x[0] as u32..x[1] as u32)
.map(|x| (x / binsize) as usize)
.collect();
indices.into_iter()
.for_each(|ix| counts[ix] += 1.0);
}
let mut writer = BufWriter::new(&bg);
// There are two scenarios:
Expand Down Expand Up @@ -372,32 +348,6 @@ pub fn bam_pileup<'a>(
return (tmpvec, mapped_reads, unmapped_reads, readlens, fraglens);
}

/// Extract contigious blocks from a bam record
/// Blocks are split per insertion, padding, deletion and ref skip
fn bam_blocks(rec: bam::Record) -> Vec<(u32, u32)> {
let mut blocks: Vec<(u32, u32)> = Vec::new();
let mut pos = rec.pos();

for c in rec.cigar().iter() {
match c {
Cigar::Match(len) | Cigar::Equal(len) | Cigar::Diff(len) => {
let start = pos;
let end = pos + *len as i64 -1;
blocks.push((start as u32, end as u32));
pos = end;
}
Cigar::Ins(len) | Cigar::Pad(len) => {
pos += *len as i64;
}
Cigar::Del(len) | Cigar::RefSkip(len) => {
pos += *len as i64;
}
_ => (),
}
}
return blocks;
}

pub struct Alignmentfilters {
pub minmappingquality: u8,
pub samflaginclude: u16,
Expand Down
53 changes: 27 additions & 26 deletions src/multibamsummary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use itertools::{multizip, multiunzip, izip};
use std::io::prelude::*;
use std::io::BufReader;
use std::io::{BufReader, BufWriter};
use std::fs::File;
use ndarray::{Array1, Array2, stack, Axis};
use std::io;
Expand Down Expand Up @@ -167,6 +167,18 @@ pub fn r_mbams(
println!("Define output file");
}

let mut cfile: Option<BufWriter<File>> = None;
if out_raw_counts != "None" {
cfile = Some(BufWriter::new(File::create(out_raw_counts).unwrap()));
// Write the header to the file.
let mut headstr = String::new();
headstr.push_str("#\'chr\'\t\'start\'\t\'end\'");
for label in bamlabels.iter() {
headstr.push_str(&format!("\t\'{}\'", label));
}
writeln!(cfile.as_mut().unwrap(), "{}", headstr).unwrap();
}

// Collate the coverage files into a matrix.
let its: Vec<_> = covcalcs.iter().map(|x| x.into_iter()).collect();
let zips = TempZip { iterators: its };
Expand All @@ -176,10 +188,11 @@ pub fn r_mbams(
let zips_vec: Vec<_> = zips.collect();
let matvec: Array2<f32> = pool.install(|| {
let matvecs: Vec<Array1<f32>> = zips_vec
.par_iter()
.iter()
.flat_map(|c| {
let readers: Vec<_> = c.iter().map(|x| BufReader::new(File::open(x).unwrap()).lines()).collect();
let readers: Vec<_> = c.par_iter().map(|x| BufReader::new(File::open(x).unwrap()).lines()).collect();
let mut _matvec: Vec<Array1<f32>> = Vec::new();
let mut _regions: Vec<(String, u32, u32)> = Vec::new();
for mut _l in (TempZip { iterators: readers }) {
// unwrap all lines in _l
let lines: Vec<_> = _l
Expand All @@ -189,6 +202,16 @@ pub fn r_mbams(
.map(|x: Vec<&str> | ( x[0].to_string(), x[1].parse::<u32>().unwrap(), x[2].parse::<u32>().unwrap(), x[3].parse::<f32>().unwrap() ) )
.collect();
let arrline = Array1::from(lines.iter().map(|x| x.3).collect::<Vec<_>>());
// If cfile is define, write the raw counts to file.
if let Some(ref mut file) = cfile {
let mut regstr = String::new();
regstr.push_str(&format!("{}\t{}\t{}",lines[0].0, lines[0].1, lines[0].2));
// push values from array
for val in arrline.iter() {
regstr.push_str(&format!("\t{}", val));
}
writeln!(file, "{}", regstr).unwrap();
}
_matvec.push(arrline);
}
_matvec
Expand All @@ -207,36 +230,14 @@ pub fn r_mbams(
writeln!(sf_file, "{}\t{}", label, sf).unwrap();
}
}
let mut npz = NpzWriter::new(File::create(ofile).unwrap());
let mut npz = NpzWriter::new_compressed(File::create(ofile).unwrap());
npz.add_array("matrix", &matvec).unwrap();
npz.add_array("labels", &bamlabels_arr).unwrap();
npz.finish().unwrap();
if verbose {
println!("Matrix written.");
}

// If raw counts are required, write them to file.
if out_raw_counts != "None" {
let mut cfile = File::create(out_raw_counts).unwrap();
// Create the header
let mut headstr = String::new();
headstr.push_str("#\'chr\'\t\'start\'\t\'end\'");
for label in bamlabels.iter() {
headstr.push_str(&format!("\t\'{}\'", label));
}
writeln!(cfile, "{}", headstr).unwrap();
for (i, region) in regions.iter().enumerate() {
let mut line = String::new();
line.push_str(&format!("{}\t{}\t{}", region.chrom, region.get_startu(), region.get_endu()));
for j in 0..bamlabels.len() {
line.push_str(&format!("\t{}", matvec[[i, j]]));
}
writeln!(cfile, "{}", line).unwrap();
}
}
if verbose {
println!("Counts written.");
}
Ok(())
}

Expand Down

0 comments on commit 3515c83

Please sign in to comment.