I have a large gene expression matrix and would like to perform some analysis on each row (i.e. gene) at a time using parallel processing from BiocParallel
. Since the matrix can be very large, I figured that using bplapply()
with SnowParam(workers=12, "SOCK")
and a custom iterator would enable parallel processing with out copying the entire dataset in to each forked thread. Also I figured that it would be much slower to copy the entire dataset and pass the row in index withseq_len()
.
It seems like my intuition was wrong. Do you have an intuition about what is the best way to analyzed a large data in parallel while avoiding copying the entire session to each thread? Why is using DoparParam()
so much faster than SnowParam()
? I would like to use SnowParam()
because of the nice progress bar and should avoid memory copying.
Here is my code with comments and timings:
library(BiocParallel)
library(doParallel)
library(iterators)
# Simulate dataset of n samples with p genes
# Evaluate regression Y[j,] ~ x
n = 400
p = 100000
Y = matrix(rnorm(n*p), nrow=p, ncol=n)
x = rnorm(n)
# iterator for rows of matrix
exprIter = function( exprObj ){
n_features = nrow(exprObj)
xit <- icount( n_features )
nextEl <- function() {
j <- nextElem(xit)
if( is.null(j) || j > n_features){
res = NULL
}else{
res = list(y = exprObj[j,])
}
res
}
it <- list(nextElem = nextEl)
class(it) <- c("abstractiter", "iter")
it
}
# lapply
# user system elapsed
# 59.564 0.495 60.205
system.time({
res1 <- lapply( exprIter( Y ), function(data){
y = data$y
fit = lm( y ~ x)
coef(fit)
})
})
# bplapply SerialParam
# user system elapsed
# 65.808 0.883 66.851
system.time({
res2 <- bplapply( exprIter( Y ), function(data){
y = data$y
fit = lm( y ~ x)
coef(fit)
}, BPPARAM=SerialParam(progressbar=TRUE))
})
# bplapply SnowParam
# user system elapsed
# 66.276 0.757 67.195
system.time({
res3 <- bplapply( exprIter( Y ), function(data){
y = data$y
fit = lm( y ~ x)
coef(fit)
}, BPPARAM=SnowParam(workers=12, "SOCK", progressbar=TRUE))
})
# bplapply MulticoreParam
# user system elapsed
# 63.276 0.762 64.193
system.time({
res4 <- bplapply( exprIter( Y ), function(data){
y = data$y
fit = lm( y ~ x)
coef(fit)
}, BPPARAM=MulticoreParam(workers=12, progressbar=TRUE))
})
# bplapply DoparParam
# user system elapsed
# 95.607 10.180 22.896
system.time({
registerDoParallel(12)
res5 <- bplapply( exprIter( Y ), function(data){
y = data$y
fit = lm( y ~ x)
coef(fit)
}, BPPARAM=DoparParam())
})
# bplapply SnowParam with seq_len() iterator
# Updates progress bar with each iterator
# user system elapsed
# user system elapsed
# 4.732 1.912 21.603
system.time({
res6 <- bplapply( seq_len( nrow(Y) ), function(j, Y, x){
y = Y[j,]
fit = lm( y ~ x)
coef(fit)
}, BPPARAM=SnowParam(workers=12, "SOCK", progressbar=TRUE), Y=Y, x=x)
})
# try with bpiterate()
# iterator for rows of matrix
exprIter2 = function( exprObj ){
n_features = nrow(exprObj)
xit <- icountn( n_features )
nextEl <- function() {
j <- nextElem(xit)
if( is.null(j) || j > n_features){
res = NULL
}else{
res = list(y = exprObj[j,])
}
res
}
it <- list(nextElem = nextEl)
class(it) <- c("abstractiter", "iter")
it
}
# bpiterate
# Uses 12 cores, but usage of each core is often < 15%
# user system elapsed
# 59.564 0.495 60.205
system.time({
it = exprIter2( Y )
res1 <- bpiterate( it$nextElem, function(data, x){
y = data$y
fit = lm( y ~ x)
coef(fit)
}, BPPARAM=SnowParam(workers=12, "SOCK", progressbar=TRUE), x=x)
})
sessionInfo()
R version 3.5.3 (2019-03-11)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)
Matrix products: default
BLAS/LAPACK: /hpc/packages/minerva-centos7/intel/parallel_studio_xe_2019/compilers_and_libraries_2019.0.117/linux/mkl/lib/intel64_lin/libmkl_gf_lp64.so
locale:
[1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C
[3] LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8
[5] LC_MONETARY=en_US.UTF-8 LC_MESSAGES=en_US.UTF-8
[7] LC_PAPER=en_US.UTF-8 LC_NAME=C
[9] LC_ADDRESS=C LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
attached base packages:
[1] parallel stats graphics grDevices datasets utils methods
[8] base
other attached packages:
[1] BiocParallel_1.16.6 doParallel_1.0.14 iterators_1.0.10
[4] foreach_1.4.4
loaded via a namespace (and not attached):
[1] compiler_3.5.3 codetools_0.2-16
Results are similar on OS X
R version 3.5.1 (2018-07-02)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS 10.14.5
Hi Martin, Thanks for your quick response! Yes, this was just example code, so
lmFit()
would solve this specific problem, but not my real problem.Your last paragraph gets at my underlying question: How do I get
bplapply()
to run in parallel when I pass it an iterator? Can I define my iterator functionexprIter()
so thatbplapply()
behaves as expected? What method doesbplapply()
use to determine the length of the iterator?Cheers, - Gabriel
bplapply()
is really expecting a list or vector, so creating aYlist
-like object is the best bet. This makes at least two copies of the matrix in the manager, as well as one, collectively, in the workers. The worker memory consumption could be reduced, at the expense of increased communication time, using thetasks=
argument.For
bpiterate()
, a (BiocParallel) iterator might look likeThis sends row-wise slices to the workers, so the worker function has to cope with that, e.g.,
Putting this together gives
The second argument to
iter()
can be used to control the amount of data sent to each worker; the default is to divide all the rows as evenly as possible, so each worker gets one slice of the matrix. This would mean that the manager has a copy of Y, and the workers collectively have a copy of Y. Something likewould mean that Y was distributed in 1000 approximately equal sized chunks over the course of the computation, with the memory in use at any one time equal to Y on the master, plus slices of Y with nrow(Y) / 1000 on each of the workers. This seems like a moderately effective way to manage overall memory use and to provide interactive feedback (the progress bar iterates when each slice finishes, so there are 1000 progress bar updates for the scenario with
iter(Y, 1000)
; the smaller slices do imply more traffic between manager and worker, and there is an optimal size for throughput.Likely one wants to add
reduce.in.order = TRUE
tobpiterate()
, otherwise results are presented in the order received. A complete solution might beI really appreciate your detailed answer
Cheers, - Gabriel