Skip to content

Commit

Permalink
Fix memory leaks reported by valgrind (#109)
Browse files Browse the repository at this point in the history
* fix some memory leaks

* opps, min_mapq was missing 🤦‍♂️

* dont pass full sites gr to each worker

* clean up more memory leaks reported by valgrind
  • Loading branch information
kriemo authored Sep 15, 2023
1 parent 8ab8443 commit 22cfd50
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 43 deletions.
2 changes: 1 addition & 1 deletion R/differential_editing.R
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ find_scde_sites <- function(
assay.type = "nAlt",
BPPARAM = BPPARAM)

grps <- unique(sce[[group]])
grps <- unique(as.character(sce[[group]]))
grp_pairs <- as.data.frame(t(utils::combn(grps, 2)))
colnames(grp_pairs) <- c("first", "second")

Expand Down
2 changes: 1 addition & 1 deletion R/pileup.R
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ setup_valid_regions <- function(bam, chroms, region = NULL, fasta = NULL) {
msg <- paste(missing_chroms, collapse = "\n")
cli::cli_warn(c(
"the following chromosomes are not present in the fasta file:",
"msg"
"{msg}"
))
chroms_to_process <- setdiff(chroms_to_process, missing_chroms)
}
Expand Down
8 changes: 5 additions & 3 deletions R/sc-pileup.R
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,7 @@ pileup_cells <- function(bamfiles,
umi_tag <- check_tag(umi_tag)

valid_regions <- setup_valid_regions(bamfiles[[1]], chroms)
chroms_to_process <- valid_regions$chroms

chroms_to_process <- intersect(valid_regions$chroms, unique(seqnames(sites)))
sites <- sites[seqnames(sites) %in% chroms_to_process, ]

if (verbose) cli::cli_alert("Beginning pileup")
Expand Down Expand Up @@ -197,13 +196,15 @@ pileup_cells <- function(bamfiles,
)
} else {
# operate in parallel over each chromosome
sites_grl <- split(sites, seqnames(sites))
sites_grl <- sites_grl[intersect(chroms_to_process, names(sites_grl))]
sces <- bpmapply(get_sc_pileup,
chrom = chroms_to_process,
id = chroms_to_process,
site = sites_grl,
MoreArgs = list(
bamfn = bf,
index = bfi,
sites = sites,
barcodes = cell_barcodes,
outfile_prefix = output_directory,
cb_tag = cb_tag,
Expand Down Expand Up @@ -292,6 +293,7 @@ get_sc_pileup <- function(bamfn, index, id, sites, barcodes,
plp_outfns,
umi_tag,
pe,
fp[["min_mapq"]],
max(fp$int_args["min_variant_reads"], fp$int_args["min_depth"])
)

Expand Down
4 changes: 2 additions & 2 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
/* .Call calls */
extern SEXP get_region(SEXP);
extern SEXP pileup(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP scpileup(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
extern SEXP scpileup(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);

static const R_CallMethodDef CallEntries[] = {
{".get_region", (DL_FUNC) &get_region, 1},
{".pileup",(DL_FUNC) &pileup, 13},
{".scpileup",(DL_FUNC) &scpileup, 13},
{".scpileup",(DL_FUNC) &scpileup, 14},
{NULL, NULL, 0}
};

Expand Down
57 changes: 38 additions & 19 deletions src/plp.c
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,8 @@ static int store_counts(PLP_DATA pd, pcounts* pc, const char* ctig,
nv = kh_value((pc + i)->pc->var, k);
vf = (double) nv / (pc + i)->pc->total;
if (vf < conf->min_af) {
char *key = (char *)kh_key((pc + i)->pc->var, k);
free(key);
kh_del(str2intmap, (pc + i)->pc->var, k);
}
}
Expand All @@ -387,6 +389,8 @@ static int store_counts(PLP_DATA pd, pcounts* pc, const char* ctig,
nv = kh_value((pc + i)->mc->var, k);
vf = (double) nv / (pc + i)->mc->total;
if (vf < conf->min_af) {
char *key = (char *)kh_key((pc + i)->mc->var, k);
free(key);
kh_del(str2intmap, (pc + i)->mc->var, k);
}
}
Expand Down Expand Up @@ -631,7 +635,7 @@ static int run_pileup(char** cbampaths, char** cindexes,

const bam_pileup1_t** plp;
mplp_ref_t mp_ref = MPLP_REF_INIT;
bam_mplp_t iter;
bam_mplp_t iter = NULL;
bam_hdr_t* h = NULL; /* header of first file in input list */
char* ref;

Expand Down Expand Up @@ -664,33 +668,46 @@ static int run_pileup(char** cbampaths, char** cindexes,
data[i] = calloc(1, sizeof(mplp_aux_t));
data[i]->fp = sam_open(cbampaths[i], "rb");
if (!data[i]->fp) {
Rf_error("[raer internal] failed to open %s: %s\n",
REprintf("[raer internal] failed to open %s: %s\n",
cbampaths[i], strerror(errno));
ret = -1;
goto fail;
}
if (conf->fai_fname) {
if (hts_set_fai_filename(data[i]->fp, conf->fai_fname) != 0) {
Rf_error("[raer internal] failed to process %s: %s\n",
REprintf("[raer internal] failed to process %s: %s\n",
conf->fai_fname, strerror(errno));
ret = -1;
goto fail;
}
}
data[i]->conf = conf;
data[i]->ref = &mp_ref;

h_tmp = sam_hdr_read(data[i]->fp);
if (!h_tmp) {
Rf_error("[raer internal] fail to read the header of %s\n", cbampaths[i]);
REprintf("[raer internal] fail to read the header of %s\n", cbampaths[i]);
ret = -1;
goto fail;
}

if (conf->reg) {
hts_idx_t* idx = NULL;
idx = sam_index_load2(data[i]->fp, cbampaths[i], cindexes[i]) ;

if (idx == NULL) {
Rf_error("[raer internal] fail to load bamfile index for %s\n", cbampaths[i]);
bam_hdr_destroy(h_tmp);
REprintf("[raer internal] fail to load bamfile index for %s\n", cbampaths[i]);
ret = -1;
goto fail;
}

if ((data[i]->iter=sam_itr_querys(idx, h_tmp, conf->reg)) == 0) {
Rf_error("[raer internal] fail to parse region '%s' with %s\n", conf->reg, cbampaths[i]);
bam_hdr_destroy(h_tmp);
hts_idx_destroy(idx);
REprintf("[raer internal] fail to parse region '%s' with %s\n", conf->reg, cbampaths[i]);
ret = -1;
goto fail;
}

if (i == 0) {
Expand Down Expand Up @@ -889,13 +906,13 @@ static int run_pileup(char** cbampaths, char** cindexes,
}

fail:
if (ret >= 0) finish_PLP_DATA(pd);
finish_PLP_DATA(pd);

bam_mplp_destroy(iter);
bam_hdr_destroy(h);
if(iter) bam_mplp_destroy(iter);
if(h) bam_hdr_destroy(h);

for (i = 0; i < conf->nbam; ++i) {
sam_close(data[i]->fp);
if(data[i]->fp) sam_close(data[i]->fp);
if (data[i]->iter) hts_itr_destroy(data[i]->iter);
free(data[i]);
clear_pcounts(&plpc[i]);
Expand All @@ -908,25 +925,27 @@ static int run_pileup(char** cbampaths, char** cindexes,

}

free(data);
free(plp);
free(n_plp);
if(data) free(data);
if(plp) free(plp);
if(n_plp) free(n_plp);
free(mp_ref.ref[0]);
free(mp_ref.ref[1]);

if (pall) {
R_Free(pall->p_ref_pos);
R_Free(pall->p_alt_pos);
R_Free(pall->m_ref_pos);
R_Free(pall->m_alt_pos);
R_Free(pall->m_alt_pos);
R_Free(pall->s);
R_Free(pall);
}

R_Free(plpc);
if (pd->fps) R_Free(pd->fps);
R_Free(pd->pdat);

if (site_stats) R_Free(site_stats);
if(plpc) R_Free(plpc);
if(pd->fps) R_Free(pd->fps);
if(pd->pdat) R_Free(pd->pdat);
if(pd->sdat) R_Free(pd->sdat);
if(pd) R_Free(pd);
if(site_stats) R_Free(site_stats);

return ret;
}
Expand Down
37 changes: 22 additions & 15 deletions src/sc-plp.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ static void clear_umi(umimap_t uhash) {
}

/*! @function
@abstract free keys, values and clear cbumi_map_t hashmap
@abstract free UMI values and clear cbumi_map_t hashmap. Keys are retained to
be freed at end of pileup loop.
*/
static void clear_cb_umiset(cbumi_map_t cbhash) {
khint_t k;
Expand Down Expand Up @@ -100,6 +101,7 @@ static void free_hashmaps(cbumi_map_t cbhash, str2intmap_t cbidx) {
if (cbhash) {
for (k = kh_begin(cbhash); k < kh_end(cbhash); ++k) {
if (!kh_exist(cbhash, k)) continue;
free((char*) kh_key(cbhash, k));
cdat = kh_value(cbhash, k);
clear_umi(cdat->umi);
kh_destroy(umimap, cdat->umi);
Expand Down Expand Up @@ -248,14 +250,13 @@ static int count_record(const bam_pileup1_t* p, sc_mplp_conf_t* conf, payload_t*

if (pld->strand != strand) return (1);

cb_cpy = strdup(cb);

k = kh_get(str2intmap, conf->cbidx, cb_cpy);
k = kh_get(str2intmap, conf->cbidx, cb);
if (k == kh_end(conf->cbidx)) {
free(cb_cpy);
return (0);
}

cb_cpy = strdup(cb);

k = kh_put(cbumimap, conf->cbmap, cb_cpy, &cret);
if (cret < 0) {
free(cb_cpy);
Expand Down Expand Up @@ -729,17 +730,17 @@ static int run_scpileup(sc_mplp_conf_t* conf, char* bamfn, char* index, char* ba

fail:
if (iter) bam_mplp_destroy(iter);
bam_hdr_destroy(h);
if(h) bam_hdr_destroy(h);

for (i = 0; i < nbam; ++i) {
sam_close(data[0]->fp);
if (data[0]->iter) hts_itr_destroy(data[0]->iter);
if (data[0]->idx) hts_idx_destroy(data[0]->idx);
free(data[0]);
}
free(data);
free(plp);
free(n_plp);
if(data) free(data);
if(plp) free(plp);
if(n_plp) free(n_plp);

return ret;
}
Expand All @@ -751,7 +752,7 @@ static int set_sc_mplp_conf(sc_mplp_conf_t* conf, int nbams,
int n_outfns, char** outfns, char* qregion, regidx_t* idx,
int* i_args, double* d_args, int libtype,
int n_bcs, char** bcs, char* cbtag, char* umi,
int pe, int min_counts) {
int pe, int min_mapq, int min_counts) {
conf->is_ss2 = nbams > 1 ? 1 : 0;

conf->fps = R_Calloc(n_outfns, FILE*);
Expand Down Expand Up @@ -795,7 +796,6 @@ static int set_sc_mplp_conf(sc_mplp_conf_t* conf, int nbams,
conf->read_qual.pct = d_args[3];
conf->read_qual.minq = d_args[4];

conf->min_mq = (conf->min_mq < 0) ? 0 : conf->min_mq;
conf->min_bq = (conf->min_bq < 0) ? 0 : conf->min_bq;
conf->max_depth = (!conf->max_depth ) ? 10000: conf->max_depth ;

Expand Down Expand Up @@ -832,6 +832,7 @@ static int set_sc_mplp_conf(sc_mplp_conf_t* conf, int nbams,
}
conf->cb_tag = cbtag;
conf->pe = pe;
conf->min_mq = (min_mapq < 0) ? 0 : min_mapq;
conf->min_counts = min_counts < 0 ? 0 : min_counts;
conf->site_idx = 1;

Expand Down Expand Up @@ -859,6 +860,7 @@ static int write_all_sites(sc_mplp_conf_t* conf) {
pld->alt);

}
regitr_destroy(itr);
return (1);
}

Expand All @@ -867,7 +869,7 @@ static int write_all_sites(sc_mplp_conf_t* conf) {
*/
static void check_sc_plp_args(SEXP bampaths, SEXP indexes, SEXP qregion, SEXP lst,
SEXP barcodes, SEXP cbtag, SEXP int_args, SEXP dbl_args,
SEXP libtype, SEXP outfns, SEXP umi, SEXP pe,
SEXP libtype, SEXP outfns, SEXP umi, SEXP pe, SEXP min_mapq,
SEXP min_counts) {

if (!IS_CHARACTER(bampaths) || (LENGTH(bampaths) < 1)) {
Expand Down Expand Up @@ -923,6 +925,10 @@ static void check_sc_plp_args(SEXP bampaths, SEXP indexes, SEXP qregion, SEXP ls
Rf_error("'pe' must be logical(1)");
}

if (!IS_INTEGER(min_mapq) || (LENGTH(min_mapq) != 1)) {
Rf_error("'min_mapq' must be integer(1)");
}

if (!IS_INTEGER(min_counts) || (LENGTH(min_counts) != 1)) {
Rf_error("'min_counts' must be integer(1)");
}
Expand All @@ -935,11 +941,11 @@ static void check_sc_plp_args(SEXP bampaths, SEXP indexes, SEXP qregion, SEXP ls
SEXP scpileup(SEXP bampaths, SEXP indexes, SEXP query_region, SEXP lst,
SEXP barcodes, SEXP cbtag, SEXP int_args, SEXP dbl_args,
SEXP libtype, SEXP outfns, SEXP umi,
SEXP pe, SEXP min_counts) {
SEXP pe, SEXP min_mapq, SEXP min_counts) {

check_sc_plp_args(bampaths, indexes, query_region, lst,
barcodes, cbtag, int_args, dbl_args, libtype,
outfns, umi, pe, min_counts);
outfns, umi, pe, min_mapq, min_counts);

regidx_t* idx = regidx_build(lst, 1);
if (!idx) Rf_error("Failed to build region index");
Expand Down Expand Up @@ -985,7 +991,8 @@ SEXP scpileup(SEXP bampaths, SEXP indexes, SEXP query_region, SEXP lst,
cq_region, idx,
INTEGER(int_args), REAL(dbl_args),
INTEGER(libtype)[0], nbcs, bcs, c_cbtag, c_umi,
LOGICAL(pe)[0], INTEGER(min_counts)[0]);
LOGICAL(pe)[0], INTEGER(min_mapq)[0],
INTEGER(min_counts)[0]);

if (ret >= 0) {
// write barcodes file, all barcodes will be reported in matrix
Expand Down
4 changes: 2 additions & 2 deletions tests/testthat/test_pileup_sites.R
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ test_that("pileup regional query works", {
expect_equal(end(res), c(203, 204, 205))

# chr1 does not exist
expect_error(suppressWarnings(pileup_sites(bamfn, fafn, region = "chr1")))
expect_error(pileup_sites(bamfn, fafn, region = "chr1"))

res <- pileup_sites(bamfn, fafn, chrom = "SSR3")
expect_equal(nrow(res), 529)
})

test_that("incorrect regional query is caught", {
# will produce warning that chrHello is not in bamfile and an error
expect_error(suppressWarnings(pileup_sites(bamfn, fafn, region = "chrHello")))
expect_error(pileup_sites(bamfn, fafn, region = "chrHello"))
})

test_that("missing files are caught", {
Expand Down

0 comments on commit 22cfd50

Please sign in to comment.