Question: BiocParallel: What its the best way to pass large a dataset for parallel processing?
1
gravatar for gabriel.hoffman
12 weeks ago by
United States
gabriel.hoffman80 wrote:

I have a large gene expression matrix and would like to perform some analysis on each row (i.e. gene) at a time using parallel processing from BiocParallel. Since the matrix can be very large, I figured that using bplapply() with SnowParam(workers=12, "SOCK") and a custom iterator would enable parallel processing with out copying the entire dataset in to each forked thread. Also I figured that it would be much slower to copy the entire dataset and pass the row in index withseq_len().

It seems like my intuition was wrong. Do you have an intuition about what is the best way to analyzed a large data in parallel while avoiding copying the entire session to each thread? Why is using DoparParam() so much faster than SnowParam()? I would like to use SnowParam() because of the nice progress bar and should avoid memory copying.

Here is my code with comments and timings:

library(BiocParallel)
library(doParallel)
library(iterators)

# Simulate dataset of n samples with p genes
# Evaluate regression Y[j,] ~ x
n = 400
p = 100000
Y = matrix(rnorm(n*p), nrow=p, ncol=n)
x = rnorm(n)

# iterator for rows of matrix
exprIter = function( exprObj ){
    n_features = nrow(exprObj)
    xit <- icount( n_features )
    nextEl <- function() {
        j <- nextElem(xit)
        if( is.null(j) || j > n_features){
            res = NULL
        }else{      
            res = list(y = exprObj[j,])
         }
         res
    }
    it <- list(nextElem = nextEl)
    class(it) <- c("abstractiter", "iter")
    it
}

# lapply
#   user  system elapsed
# 59.564   0.495  60.205
system.time({
res1 <- lapply( exprIter( Y ), function(data){
    y = data$y
    fit = lm( y ~ x)
    coef(fit)
}) 
})

# bplapply SerialParam
#   user  system elapsed
# 65.808   0.883  66.851
system.time({
res2 <- bplapply( exprIter( Y ), function(data){
    y = data$y
    fit = lm( y ~ x)
    coef(fit)
}, BPPARAM=SerialParam(progressbar=TRUE)) 
})

# bplapply SnowParam
# user  system elapsed
# 66.276   0.757  67.195
system.time({
res3 <- bplapply( exprIter( Y ), function(data){
    y = data$y
    fit = lm( y ~ x)
    coef(fit)
}, BPPARAM=SnowParam(workers=12, "SOCK", progressbar=TRUE)) 
})

# bplapply MulticoreParam
# user  system elapsed
# 63.276   0.762  64.193
system.time({
res4 <- bplapply( exprIter( Y ), function(data){
    y = data$y
    fit = lm( y ~ x)
    coef(fit)
}, BPPARAM=MulticoreParam(workers=12, progressbar=TRUE)) 
})

# bplapply DoparParam
#   user  system elapsed
# 95.607  10.180  22.896
system.time({
registerDoParallel(12)
res5 <- bplapply( exprIter( Y ), function(data){
    y = data$y
    fit = lm( y ~ x)
    coef(fit)
}, BPPARAM=DoparParam()) 
})


# bplapply SnowParam with seq_len() iterator
#   Updates progress bar with each iterator
# user  system elapsed
#  user  system elapsed
# 4.732   1.912  21.603
system.time({
res6 <- bplapply( seq_len( nrow(Y) ), function(j, Y, x){
y = Y[j,]
fit = lm( y ~ x)
coef(fit)
}, BPPARAM=SnowParam(workers=12, "SOCK", progressbar=TRUE), Y=Y, x=x) 
})


# try with bpiterate()
# iterator for rows of matrix
exprIter2 = function( exprObj ){
    n_features = nrow(exprObj)
    xit <- icountn( n_features )
    nextEl <- function() {
        j <- nextElem(xit)
        if( is.null(j) || j > n_features){
            res = NULL
        }else{      
            res = list(y = exprObj[j,])
         }
         res
    }
    it <- list(nextElem = nextEl)
    class(it) <- c("abstractiter", "iter")
    it
}

# bpiterate
# Uses 12 cores, but usage of each core is often < 15%
#   user  system elapsed
# 59.564   0.495  60.205
system.time({
it = exprIter2( Y )
res1 <- bpiterate( it$nextElem, function(data, x){
    y = data$y
    fit = lm( y ~ x)
    coef(fit)
}, BPPARAM=SnowParam(workers=12, "SOCK", progressbar=TRUE), x=x) 
})
sessionInfo()
R version 3.5.3 (2019-03-11)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

Matrix products: default
BLAS/LAPACK: /hpc/packages/minerva-centos7/intel/parallel_studio_xe_2019/compilers_and_libraries_2019.0.117/linux/mkl/lib/intel64_lin/libmkl_gf_lp64.so

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C
 [3] LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8
 [5] LC_MONETARY=en_US.UTF-8    LC_MESSAGES=en_US.UTF-8
 [7] LC_PAPER=en_US.UTF-8       LC_NAME=C
 [9] LC_ADDRESS=C               LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] parallel  stats     graphics  grDevices datasets  utils     methods
[8] base

other attached packages:
[1] BiocParallel_1.16.6 doParallel_1.0.14   iterators_1.0.10
[4] foreach_1.4.4

loaded via a namespace (and not attached):
[1] compiler_3.5.3   codetools_0.2-16

Results are similar on OS X

R version 3.5.1 (2018-07-02)
Platform: x86_64-apple-darwin15.6.0 (64-bit)
Running under: macOS  10.14.5
ADD COMMENTlink modified 12 weeks ago by Martin Morgan ♦♦ 23k • written 12 weeks ago by gabriel.hoffman80
Answer: BiocParallel: What its the best way to pass large a dataset for parallel process
0
gravatar for Martin Morgan
12 weeks ago by
Martin Morgan ♦♦ 23k
United States
Martin Morgan ♦♦ 23k wrote:

As a first step, and realizing that your example could well be only a placeholder for your actual function, it usually pays to figure out whether you are doing things well-enough in R. For the case of fitting the same design matrix to different observations, using lm.fit() is about 10x faster than using lm() directly; of course limma::lmFit() is a flexible and fast approach.

fun1 <- function(y, x) {
    coef(lm(y ~ x))
}

fun2 <- function(y, X) {
    lm.fit(X, y)$coefficients
}
n = 400
p = 100000
Y = matrix(rnorm(n*p), nrow=p, ncol=n)
x = rnorm(n)
X <- cbind(1L, x)

and then

> system.time(res3 <- apply(Y, 1, fun1, x))

   user  system elapsed 
 87.737   0.219  88.018 
> system.time(res4 <- apply(Y, 1, fun2, X))

   user  system elapsed 
  8.842   0.470   9.318 
> all.equal(res3, res4, check.attributes=FALSE)
[1] TRUE

Using parallel evaluation, even with twelve cores, the speed-up is only a modest 2x

> library(parallel)
> cl <- makeCluster(detectCores())
> system.time(res5 <- parApply(cl, Y, 1, fun2, X))

   user  system elapsed 
  3.296   0.912   4.830 
> stopCluster(cl)

This is because of the cost of communicating each row of Y to the cluster, and retrieving the result from the worker is large compared to the cost of the calculation on the worker.

It's actually counter-productive to use mclapply(), because the cost of creating a 'list of rows'

    Ylist <- lapply(seq_len(nrow(Y)), function(i, Y) Y[i,], Y)
    res6 <- mclapply(Ylist, fun2, X)

outweighs the cost of sending individual rows to the workers. Nonetheless, this approach is the path to take for bplapply(), though again the speed-up is disappointing because of the cost of making a list of rows

> library(BiocParallel)
> system.time({
+     Ylist <- lapply(seq_len(nrow(Y)), function(i, Y) Y[i,], Y)
+     res7 <- simplify2array(bplapply(Ylist, fun2, X))
+ })
> 
   user  system elapsed 
 29.296   4.437   6.163 
> all.equal(res3, res7, check.attributes=FALSE)

[1] TRUE

A bpiterate() solution

iter <- function(Y) {
    force(Y)
    i <- 0L
    function() {
        if (i == nrow(Y)) {
            NULL
        } else {
            i <<- i + 1L
            Y[i,]
        }
    }
}
system.time(bpiterate(iter(Y[1:1000,]), fun2, X))

is slow, because each row is sent to the works separately, whereas bplapply(), mclapply(), and parApply() all by default divide rows into a number of blocks equal to the number of workers, sends the blocks to the workers, and then uses lapply() on the worker to process each row. This means that there are only ncore communications from the manager to the worker; I'm not exactly sure whether X is sent once to each worker, or nrow(Y) times.

For your own approach, bplapply(exprIter(Y), ...) actually results in serial evaluation. This is because bplapply() doesn't know anything special about the object returned by exprIter(), so it asks it's length(exprIter(Y)), which returns 1. Since there is only one task to do, it uses lapply() for the evaluation. This is the reason for the otherwise puzzling (on deep reflection) observation that x is not passed in most of your examples, yet is 'found' by the worker -- the worker is the master, so finds x!

ADD COMMENTlink modified 12 weeks ago • written 12 weeks ago by Martin Morgan ♦♦ 23k

Hi Martin, Thanks for your quick response! Yes, this was just example code, so lmFit() would solve this specific problem, but not my real problem.

Your last paragraph gets at my underlying question: How do I get bplapply() to run in parallel when I pass it an iterator? Can I define my iterator function exprIter() so that bplapply() behaves as expected? What method does bplapply() use to determine the length of the iterator?

Cheers, - Gabriel

ADD REPLYlink written 12 weeks ago by gabriel.hoffman80

bplapply() is really expecting a list or vector, so creating a Ylist-like object is the best bet. This makes at least two copies of the matrix in the manager, as well as one, collectively, in the workers. The worker memory consumption could be reduced, at the expense of increased communication time, using the tasks= argument.

For bpiterate(), a (BiocParallel) iterator might look like

iter <- function(Y, n_chunks = snowWorkers()) {
    idx <- parallel::splitIndices(nrow(Y), min(nrow(Y), n_chunks))
    i <- 0L
    function() {
        if (i == length(idx))
            return(NULL)
        i <<- i + 1L
        Y[ idx[[i]],, drop = FALSE ]
    }
}

This sends row-wise slices to the workers, so the worker function has to cope with that, e.g.,

fun <- function(Y, X)
    apply(Y, 1, function(y, X) {
        stats::lm.fit(X, y)$coefficients        
    }, X)

Putting this together gives

result <- bpiterate(iter(Y), fun, X, BPPARAM = SnowParam(progress = TRUE))

The second argument to iter() can be used to control the amount of data sent to each worker; the default is to divide all the rows as evenly as possible, so each worker gets one slice of the matrix. This would mean that the manager has a copy of Y, and the workers collectively have a copy of Y. Something like

result <- bpiterate(iter(Y, 1000), fun, X, BPPARAM = SnowParam(progress = TRUE))

would mean that Y was distributed in 1000 approximately equal sized chunks over the course of the computation, with the memory in use at any one time equal to Y on the master, plus slices of Y with nrow(Y) / 1000 on each of the workers. This seems like a moderately effective way to manage overall memory use and to provide interactive feedback (the progress bar iterates when each slice finishes, so there are 1000 progress bar updates for the scenario with iter(Y, 1000); the smaller slices do imply more traffic between manager and worker, and there is an optimal size for throughput.

Likely one wants to add reduce.in.order = TRUE to bpiterate(), otherwise results are presented in the order received. A complete solution might be

result <- bpiterate(
    iter(Y, 100), fun, X,
    REDUCE=cbind,
    reduce.in.order=TRUE,
    BPPARAM = SnowParam(progressbar=TRUE)
)
ADD REPLYlink modified 12 weeks ago • written 12 weeks ago by Martin Morgan ♦♦ 23k

I really appreciate your detailed answer

Cheers, - Gabriel

ADD REPLYlink written 12 weeks ago by gabriel.hoffman80
Please log in to add an answer.

Help
Access

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.
Powered by Biostar version 16.09
Traffic: 352 users visited in the last hour