Skip to content

Commit

Permalink
- refactored the pairwise distance calculation code, by pre-computing…
Browse files Browse the repository at this point in the history
… the

unidimensional histograms and store them instead of recalculating them each
time a distance between 2 samples is calculated. This improves CPU time and
memory consumption.
- bumped version to 0.99.7
  • Loading branch information
phauchamps committed Jan 12, 2024
1 parent affe602 commit b30e947
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 80 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: CytoMDS
Title: Low Dimensions projection of cytometry samples
Version: 0.99.6
Version: 0.99.7
Authors@R:
c(person(given = "Philippe",
family = "Hauchamps",
Expand Down
8 changes: 7 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,10 @@ for not loading the whole flowSet in memory at once.

### CytoMDS 0.99.6
- added `subset` argument in `ggplotSampleMDS()` and
`ggplotSampleMDSWrapBiplots`
`ggplotSampleMDSWrapBiplots`

### CytoMDS 0.99.7
- refactored the pairwise distance calculation code, by pre-computing the
unidimensional histograms and store them instead of recalculating them each
time a distance between 2 samples is calculated. This improves CPU time and
memory consumption.
2 changes: 1 addition & 1 deletion R/ggplots.R
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
#' extVars <- getChannelSummaryStats(
#' fsAll,
#' channels = c("FSC-A", "SSC-A"),
#' statsFUN = stats::median)
#' statFUNs = stats::median)
#'
#'
#' bp_12 <- ggplotSampleMDS(
Expand Down
215 changes: 146 additions & 69 deletions R/stats.R
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ EMDDist <- function(
# activate newer version of code
# which works only when EMD distance is based on unidimensional
# distribution distances
unidimHistograms <- FALSE
unidimHistograms <- TRUE

# validate nSamples, rowSeq and colSeq
if (!is.numeric(nSamples) || nSamples <= 0) {
Expand Down Expand Up @@ -494,11 +494,12 @@ EMDDist <- function(
}

if (!is.infinite(memSize)) {
if (!is.numeric(memSize) || memSize < 1) {
stop("memSize should be an integer >= 2")
}
if (memSize == 1){
stop("sorry but memSize of minimum 2 is required!")
minMemSize <- ifelse(
unidimHistograms,
1,
2)
if (!is.numeric(memSize) || memSize < minMemSize) {
stop("memSize should be an integer >= ", minMemSize)
}
}

Expand All @@ -510,12 +511,17 @@ EMDDist <- function(
nAvailableCores <- 1
}

#browser()

# calculate the block2D task allocations
nRowBlock <- .optimizeRowBlockNb(
rowRange = rowRange,
colRange = colRange,
nCores = nAvailableCores,
memSize = memSize
memSize = ifelse(
unidimHistograms,
Inf,
memSize)
)

blocks2D <- .generateBlocks2D(
Expand All @@ -525,18 +531,21 @@ EMDDist <- function(

if (unidimHistograms) {

rowColRange <- union(
if (verbose){
message("Pre-calculating all histograms...")
}
rowColSeqUnion <- union(
seq(rowRange[1], rowRange[2]),
seq(colRange[1], colRange[2]))

# load all flowFrame in blocks and generate histograms
if (nAvailableCores == 1){
blocks1D <- list(rowColRange)
blocks1D <- list(rowColSeqUnion)
} else {
blocks1D <- split(rowColRange,
cut(rowColRange),
blocks1D <- split(rowColSeqUnion,
cut(rowColSeqUnion,
nAvailableCores,
labels = FALSE)
labels = FALSE))
}

breaks <- seq(
Expand All @@ -551,23 +560,35 @@ EMDDist <- function(
loadFlowFrameFUN,
loadFlowFrameFUNArgs,
channels,
breaks) {
breaks,
verbose) {

#browser()

distrs <- list()
ind <- 0
for (ffIndex in block){
if (verbose) {
message("Loading file ", ffIndex, "...")
}
ind <- ind + 1
ff <- do.call(
loadFlowFrameFUN,
args = c(list(ffIndex = ffIndex),
loadFlowFrameFUNArgs))
invisible(gc()) # clean memory for ff stored in previous loop
if(!inherits(ff, "flowFrame")) {
stop("object returned by loadFlowFrameFUN function ",
"should inherit from flowCore::flowFrame")
}
if (ffIndex == block[1]) {
# check & update channels
if (ind == 1) {
# check channels
if (is.null(channels)) {
channels <- flowCore::colnames(ff)[areSignalCols(ff)]
channels <-
flowCore::colnames(ff)[areSignalCols(ff)]
} else if (is.numeric(channels)) {
channels <- flowCore::colnames(ff)[channels]
channels <-
flowCore::colnames(ff)[channels]
} else {
channels <- vapply(
channels,
Expand All @@ -576,78 +597,131 @@ EMDDist <- function(
},
FUN.VALUE = "c")
}
}

# check that all channels are present in flow frame
wrongCh <- which(! channels %in% flowCore::colnames(ff))
if (length(wrongCh) > 0) {
stop(
"found some channels that are non existent ",
"in flowframe ", ffIndex,": ",
channels[wrongCh])
} else {
wrongCh <- which(! channels%in% flowCore::colnames(ff))
if (length(wrongCh) > 0) {
stop(
"found some channels that are non existent in flowframe #",
ffIndex, ": ",
channels[wrongCh])
}
}

# take only the channels of interest for the following,
# for performance
ff <- ff[,channels]

distrs <- .unidimHistograms(
if (verbose) {
message("Calculating histogram for file ", ffIndex, "...")
}
distrs[[ind]] <- .unidimHistograms(
ff,
breaks = breaks
)

}
distrs
}

if (useBiocParallel){
distribBlockList <- bplapply(
distribBlockList <- BiocParallel::bplapply(
blocks1D,
FUN = loadFFAndCalcHistograms,
BPPARAM = BPPARAM,
BPOPTIONS = BPOPTIONS,
loadFlowFrameFUN = loadFlowFrameFUN,
loadFlowFrameFUNArgs = loadFlowFrameFUN,
loadFlowFrameFUNArgs = loadFlowFrameFUNArgs,
channels = channels,
breaks = breaks)
breaks = breaks,
verbose = verbose)
} else {
distribBlockList <- lapply(
blocks1D,
FUN = loadFFAndCalcHistograms,
loadFlowFrameFUN = loadFlowFrameFUN,
loadFlowFrameFUNArgs = loadFlowFrameFUN,
loadFlowFrameFUNArgs = loadFlowFrameFUNArgs,
channels = channels,
breaks = breaks)
breaks = breaks,
verbose = verbose)
}

# reorganize multivariate distributions in a single list
distrs <- list()
ind <- 0
for (b in seq_along(blocks1D)) {
for(i in seq_along(block1D[[b]])) {
for(i in seq_along(blocks1D[[b]])) {
ind <- ind+1
distrs[[ind]] <- distribBlockList[[b]][[i]]
}
}

if (verbose){
message("Calculating pairwise distances between histograms...")
}

handleOneBlockWithHistograms <- function(
block,
rowColSeqUnion,
breaks,
distrs,
verbose) {

#browser()

rowSeq <- seq(block$rowMin, block$rowMax)
colSeq <- seq(block$colMin, block$colMax)
nRows <- length(rowSeq)
nCols <- length(colSeq)
ffIndexes <- union(rowSeq, colSeq)

pwDist <- matrix(rep(0., nRows * nCols),
nrow = nRows)

for (i in seq_along(rowSeq)) {
for (j in seq_along(colSeq)) {
if (colSeq[j] > rowSeq[i]) {
pwDist[i,j] <- sum(.distFromUnidimHistograms(
breaks = breaks,
distr1 =
distrs[[which(rowColSeqUnion == rowSeq[i])]],
distr2 =
distrs[[which(rowColSeqUnion == colSeq[j])]]))
if (verbose) {
message(
"i = ", rowSeq[i],
"; j = ", colSeq[j],
"; dist = ", round(pwDist[i,j], 12))
}
}
}
}

# apply symmetry for block elements that are in the lower triangle
for (i in seq_along(rowSeq)) {
for (j in seq_along(colSeq)) {
if (colSeq[j] < rowSeq[i]) {
pwDist[i,j] <- pwDist[j,i]
}
}
}
pwDist
}

if (useBiocParallel) {
pwDistByBlock <- BiocParallel::bplapply(
blocks2D,
BPPARAM = BPPARAM,
BPOPTIONS = BPOPTIONS,
rowColSeqUnion = rowColSeqUnion,
FUN = handleOneBlockWithHistograms,
breaks = breaks,
distrs = distrs,
verbose = verbose)
} else {
pwDistByBlock <- lapply(
blocks2D,
rowColSeqUnion = rowColSeqUnion,
FUN = handleOneBlockWithHistograms,
breaks = breaks,
distrs = distrs,
verbose = verbose)
}
Expand All @@ -667,47 +741,52 @@ EMDDist <- function(

ffList <- list()

i <- 0
ind <- 0
for (ffIndex in ffIndexes){
i <- i+1
ffList[[i]] <- do.call(
if (verbose) {
message("Loading file ", ffIndex, "...")
}
ind <- ind+1
ff <- do.call(
loadFlowFrameFUN,
args = c(list(ffIndex = ffIndex),
loadFlowFrameFUNArgs))
if(!inherits(ffList[[i]], "flowFrame")) {
invisible(gc()) # clean memory for ff stored in previous loop
if(!inherits(ff, "flowFrame")) {
stop("object returned by loadFlowFrameFUN function should inherit ",
"from flowCore::flowFrame")
}
if (ind == 1) {
# check channels
if (is.null(channels)) {
channels <-
flowCore::colnames(ff)[areSignalCols(ff)]
} else if (is.numeric(channels)) {
channels <-
flowCore::colnames(ff)[channels]
} else {
channels <- vapply(
channels,
FUN = function(ch) {
flowCore::getChannelMarker(ff, ch)$name
},
FUN.VALUE = "c")
}
} else {
wrongCh <- which(! channels%in% flowCore::colnames(ff))
if (length(wrongCh) > 0) {
stop(
"found some channels that are non existent in flowframe #",
ffIndex, ": ",
channels[wrongCh])
}
}

# take only the channels of interest for the following,
# for performance
ffList[[ind]] <- ff[, channels]
}

fs <- methods::as(ffList,"flowSet")

# check channels
if (is.null(channels)) {
channels <- flowCore::colnames(fs)[areSignalCols(fs[[1]])]
} else if (is.numeric(channels)) {
channels <- flowCore::colnames(fs)[channels]
} else {
channels <- vapply(
channels,
FUN = function(ch) {
flowCore::getChannelMarker(fs[[1]], ch)$name
},
FUN.VALUE = "c")
}

# check that all channels are present in flowSet
wrongCh <- which(! channels %in% flowCore::colnames(fs))
if (length(wrongCh) > 0) {
stop(
"found some channels that are non existent in flowSet: ",
channels[wrongCh])
}

# take only the channels of interest for the following,
# for performance
fs <- fs[,channels]

pwDist <- matrix(rep(0., nRows * nCols),
nrow = nRows)

Expand Down Expand Up @@ -970,7 +1049,6 @@ pairwiseEMDDist <- function(
#' functions to call to calculate the statistics, or a simple function
#' This list can be named, in that case, these names will be transfered to the
#' returned value.
#' @param ... additional parameters passed to `EMDDist()`
#' @return a list of named statistic matrices.
#' In each stat matrix, the columns are the channel statistics
#' for all flowFrames of the flowSet.
Expand Down Expand Up @@ -1020,8 +1098,7 @@ getChannelSummaryStats <- function(
fs,
channels = NULL,
statFUNs = stats::median,
verbose = FALSE,
...){
verbose = FALSE){

if(!inherits(fs, "flowSet")) {
stop("fs object should inherit from flowCore::flowSet")
Expand Down
Loading

0 comments on commit b30e947

Please sign in to comment.