Processing time of blockApply and DelayedArray
1
1
Entering edit mode
@plbaldoni
Last seen 3 days ago
Melbourne, Australia

Hi all,

I have been trying to use the framework from DelayedArray to compute some row-wise statistics from a relatively large HDF5Matrix. However, I am running into some efficiency issues in terms of computing time.

For example, below I am trying to apply a custom row-wise function on a HDF5Matrix mat of 100,000 rows and 10,000 columns. To illustrate performance, I am computing it on a subset of the array (first 10,000 rows). In summary, it takes about 13 sec to compute the necessary statistics on a subset of 10% of the data (13 sec because I looped the array twice in the example below).

I have tried different configurations of blocking and chunking strategies, but the computing times end up in the same ballpark of 10-20s.

Are these figures somewhat expected from a computing time perspective? Besides parallel computing, does anyone have any suggestions of strategies that would help speed up computations in the example below?

Thanks!


> mat
<100000 x 10000> matrix of class HDF5Matrix and type "integer":
[,1]     [,2]     [,3]     [,4] ...  [,9997]  [,9998]  [,9999] [,10000]
[1,]        0        0        0        0   .        0        0        0        0
[2,]        0        0        1        0   .        0        0        0        0
[3,]        0        0        0        1   .        1        0        0        1
[4,]        0        0        0        0   .        0        0        0        1
[5,]        0        0        0        0   .        1        0        0        0
...        .        .        .        .   .        .        .        .        .
[99996,]        0        0        0        0   .        1        0        1        0
[99997,]        0        0        0        0   .        0        0        1        0
[99998,]        0        0        0        0   .        1        0        0        0
[99999,]        1        0        1        0   .        0        0        0        0
[100000,]        0        0        0        0   .        0        0        0        0
> seed(mat)
An object of class "HDF5ArraySeed"
Slot "filepath":
[1] "/home/simulation/mockdata/mockdata_consensus_counts.h5"

Slot "name":
[1] "/counts"

Slot "as_sparse":
[1] FALSE

Slot "type":
[1] NA

Slot "dim":
[1] 100000  10000

Slot "chunkdim":
[1] 3162  316

Slot "first_val":
[1] 0

>
> foo <- function(x,mu){
+   rowSums(dpois(x,lambda = mu, log = TRUE))
+ }
>
> blockFoo <- function(mu,x,grid,FUN = foo) {
+   unlist(blockApply(x,FUN,grid = grid,mu = mu,verbose = TRUE))
+ }
>
> blockCombine <- function(x,mu,...) {
+   grid <- rowAutoGrid(x,...)
+   res <- lapply(X = mu,FUN = blockFoo,x = x,grid = grid)
+   do.call(cbind,res)
+ }
>
> microbenchmark(output = blockCombine(x = mat[1:10000,],mu = c(1,3)),times = 1)
/ Reading and realizing block 1/4 ... OK
\ Processing it ... OK
/ Reading and realizing block 2/4 ... OK
\ Processing it ... OK
/ Reading and realizing block 3/4 ... OK
\ Processing it ... OK
/ Reading and realizing block 4/4 ... OK
\ Processing it ... OK
/ Reading and realizing block 1/4 ... OK
\ Processing it ... OK
/ Reading and realizing block 2/4 ... OK
/ Reading and realizing block 2/4 ... OK
\ Processing it ... OK
/ Reading and realizing block 3/4 ... OK
\ Processing it ... OK
/ Reading and realizing block 4/4 ... OK
\ Processing it ... OK
Unit: seconds
expr      min       lq     mean   median       uq      max neval
output 26.61289 26.61289 26.61289 26.61289 26.61289 26.61289     1
> microbenchmark(output = blockCombine(x = mat[1:10000,],mu = c(1,3),nrow = 5000),times = 1)
/ Reading and realizing block 1/2 ... OK
\ Processing it ... OK
/ Reading and realizing block 2/2 ... OK
\ Processing it ... OK
/ Reading and realizing block 1/2 ... OK
\ Processing it ... OK
/ Reading and realizing block 2/2 ... OK
\ Processing it ... OK
Unit: seconds
expr      min       lq     mean   median       uq      max neval
output 21.82795 21.82795 21.82795 21.82795 21.82795 21.82795     1
> sessionInfo()
R version 4.1.0 (2021-05-18)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: CentOS Linux 7 (Core)

Matrix products: default
BLAS:   /home/System/data/apps/R/R-4.1.0/lib64/R/lib/libRblas.so
LAPACK: /home/System/data/apps/R/R-4.1.0/lib64/R/lib/libRlapack.so

locale:
[1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8     LC_MONETARY=en_US.UTF-8
[6] LC_MESSAGES=en_US.UTF-8    LC_PAPER=en_US.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C
[11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] parallel  stats4    stats     graphics  grDevices utils     datasets  methods   base

other attached packages:
[1] microbenchmark_1.4-7 HDF5Array_1.20.0     rhdf5_2.34.0         DelayedArray_0.18.0  IRanges_2.26.0       S4Vectors_0.30.0
[7] MatrixGenerics_1.4.0 matrixStats_0.58.0   BiocGenerics_0.38.0  Matrix_1.3-3         devtools_2.4.1       usethis_2.0.1

loaded via a namespace (and not attached):
[1] compiler_4.1.0     prettyunits_1.1.1  rhdf5filters_1.4.0 remotes_2.3.0      tools_4.1.0        testthat_3.0.2     pkgbuild_1.2.0
[8] pkgload_1.2.1      memoise_2.0.0      lifecycle_1.0.0    lattice_0.20-44    rlang_0.4.11       cli_2.5.0          fastmap_1.1.0
[15] withr_2.4.2        desc_1.3.0         fs_1.5.0           rprojroot_2.0.2    grid_4.1.0         glue_1.4.2         R6_2.5.0
[22] processx_3.5.2     sessioninfo_1.1.1  callr_3.7.0        purrr_0.3.4        Rhdf5lib_1.12.1    magrittr_2.0.1     ps_1.6.0
[29] ellipsis_0.3.2     cachem_1.0.5       crayon_1.4.1

DelayedArray HDF5Array • 376 views
3
Entering edit mode
Mike Smith ★ 5.2k
@mike-smith
Last seen 10 hours ago
EMBL Heidelberg / de.NBI

I won't pretend to be an expert in DelayedArray and I find the various block processing options a bit opaque, so maybe there's a clever strategy that will accelerate this. However, to me it seems like you're over complicating things with the blockFoo() step. dpois() and rowSums() are both DelayArray-aware so I think you can use foo() directly in you lapply(). For me this already achieves an ~ 45% reduction in the run time. Here's an example:

library(HDF5Array)

## create a matrix consisting of mostly zeros
nrow <- 10000
ncol <- 10000
mat <- matrix(rnbinom(n = nrow * ncol, 0.1, 0.1), nrow = nrow)

## convert to HDF5Array with HDF5 chunk sizes the same as the example
mat_disk <- writeHDF5Array(mat, chunkdim = c(3162, 316))

## original function definitions
foo <- function(x,mu){
rowSums(dpois(x,lambda = mu, log = TRUE))
}

blockFoo <- function(mu,x,grid,FUN = foo) {
unlist(blockApply(x,FUN,grid = grid,mu = mu,verbose = TRUE))
}

blockCombine <- function(x,mu,...) {
grid <- rowAutoGrid(x,...)
res <- lapply(X = mu,FUN = blockFoo,x = x,grid = grid)
do.call(cbind,res)
}

## alternative applying foo() over multiple mu values
fooCombine <- function(x, mu) {
res <- lapply(X = mu, FUN = foo, x = x)
do.call(cbind,res)
}

## compare times
microbenchmark::microbenchmark(
blockCombine(x = mat_disk, mu = c(1,3)),
fooCombine(x = mat_disk, mu = c(1,3)),
times = 3
)
#>...
#> Unit: seconds
#>                                      expr      min       lq     mean   median       uq      max neval
#>  blockCombine(x = mat_disk, mu = c(1, 3)) 19.24674 19.25481 19.35729 19.26287 19.41256 19.56226     3
#>    fooCombine(x = mat_disk, mu = c(1, 3)) 11.66849 11.77222 11.94350 11.87595 12.08100 12.28606     3

## check for the same output
all.equal(
blockCombine(x = mat_disk, mu = c(1,3)),
fooCombine(x = mat_disk, mu = c(1,3))
)
#> ...
#> [1] TRUE


We can also think about whether these are "good" speeds. If we consider just the dpois() step, that alone on an in memory matrix takes ~5 seconds on my computer. Reading the matrix from disk also takes ~1.5 seconds.

system.time(dpois(mat, lambda = 1))
#>   user  system elapsed
#>  4.665   0.450   5.115

## there must be a better way to get the file path of an HDF5Array!
#>   user  system elapsed
#>  1.660   0.107   1.768


So 11 seconds for reading and calculating dpois() twice looks pretty good, and the bottleneck doesn't seem to be the disk access.

0
Entering edit mode

I had more of a think about this, and realised that the duplication of the Reading and realizing block 1/4 messages implied that the blocks were being processed once for each value of mu. Reading from disk is the slow part here, so we want to minimise the number of time the blocks are "processed" and perform the calculation for all mu at the same time. One way to do this is to move the lapply() step inside the block processing function.

Here's some code that does something like that.

foo2 <- function(block, mu){
## for each block we use all values of mu
lapply(mu, function(mu, x) {
rowSums(dpois(x, lambda = mu, log = TRUE))
}, x = block) |>
{\(y) do.call(cbind, y)}()
}

blockFoo2 <- function(mu, x, ...) {
grid <- rowAutoGrid(x,...)
tmp <- blockApply(x, FUN = foo2, grid = grid,
mu = mu, verbose = TRUE)
do.call(rbind, tmp)
}

## check this is still consistent with the original version
all.equal(
blockCombine(x = mat_disk, mu = c(1,3)),
blockFoo2(x = mat_disk, mu = c(1,3))
)
#> ...
#> [1] TRUE


It actually still turns out to still be slower than my suggestion above, but if we align the block size with a multiple of the HDF5 chunk size we get slightly better performance.

bench::mark(
blockCombine(x = mat_disk, mu = c(1,3)),
foo_combine(x = mat_disk, mu = c(1,3)),
blockFoo2(x = mat_disk, mu = c(1,3)),
blockFoo2(x = mat_disk, mu = c(1,3), nrow = 2*3162)
)
#> # A tibble: 4 x 13
#>   expression                                                 min median itr/sec mem_alloc gc/sec n_itr  n_gc total_time result memory time  gc
#>   <bch:expr>                                             <bch:t> <bch:>     <dbl> <bch:byt>    <dbl> <int> <dbl>   <bch:tm> <list> <list> <lis> <lis>
#> 1 blockCombine(x = mat_disk, mu = c(1, 3))                18.98s 18.98s    0.0527    3.73GB    0.105     1     2     18.98s <dbl … <Rpro… <ben… <tib…
#> 2 fooCombine(x = mat_disk, mu = c(1, 3))                  11.58s 11.58s    0.0864    3.73GB    0.173     1     2     11.58s <dbl … <Rpro… <ben… <tib…
#> 3 blockFoo2(x = mat_disk, mu = c(1, 3))                   13.42s 13.42s    0.0745    3.35GB    0.298     1     4     13.42s <dbl … <Rpro… <ben… <tib…
#> 4 blockFoo2(x = mat_disk, mu = c(1, 3), nrow = 2 * 3162)   9.99s  9.99s    0.100     3.35GB    0.200     1     2      9.99s <dbl … <Rpro… <ben… <tib…

0
Entering edit mode

Given the timings above, it looks like dpois() is a bottleneck. If the data are very sparse, perhaps there's a route to speeding it up since I suspect it's spending a long time calculating dpois(0) many times.

Here's one possible strategy where we calculate the dpois() output for the range of values in the input matrix, and then build a matrix from those (x2 in faster_dpois()). This assumes x is +ve integers but I think that has to be the case for dpois() anyway.

faster_dpois <- function(x, lambda, log) {
idx <- dpois(0:max(x), lambda = lambda, log = log)
x2 <- idx[x+1]
dim(x2) <- dim(x)
x2
}

faster_foo <- function(block, mu){
## for each block we use all values of mu
lapply(mu, function(mu, x) {
rowSums(faster_dpois(x, lambda = mu, log = TRUE))
}, x = block) |>
{\(y) do.call(cbind, y)}()
}

faster_blockFoo <- function(mu, x, ...) {
grid <- rowAutoGrid(x,...)
tmp <- blockApply(x, FUN = faster_foo, grid = grid,
mu = mu, verbose = TRUE)
do.call(rbind, tmp)
}

bench::mark(
blockCombine(x = mat_disk, mu = c(1,3)),
fooCombine(x = mat_disk, mu = c(1,3)),
blockFoo2(x = mat_disk, mu = c(1,3)),
blockFoo2(x = mat_disk, mu = c(1,3), nrow = 2*3162),
faster_blockFoo(x = mat_disk, mu = c(1,3)),
faster_blockFoo(x = mat_disk, mu = c(1,3), nrow = 2*3162)
)

#> # A tibble: 8 x 13
#>   expression                                                                min   median itr/sec mem_alloc gc/sec n_itr
#>   <bch:expr>                                                           <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl> <int>
#> 1 blockCombine(x = mat_disk, mu = c(1, 3))                               18.69s   18.69s    0.0535    3.73GB    0.107     1
#> 2 fooCombine(x = mat_disk, mu = c(1, 3))                                 11.02s   11.02s    0.0907    3.73GB    0.272     1
#> 3 blockFoo2(x = mat_disk, mu = c(1, 3))                                  13.18s   13.18s    0.0759    3.35GB    0.304     1
#> 4 blockFoo2(x = mat_disk, mu = c(1, 3), nrow = 2 * 3162)                  9.39s    9.39s    0.106     3.35GB    0.213     1
#> 5 faster_blockFoo(x = mat_disk, mu = c(1, 3))                             9.27s    9.27s    0.108      4.1GB    0.432     1
#> 6 faster_blockFoo(x = mat_disk, mu = c(1, 3), nrow = 2 * 3162)            6.05s    6.05s    0.165      4.1GB    0.827     1


This gets the run time down to a third of the original approach when combined with the other strategies. However, I also think this may end up using more memory, so it may be self defeating if you're trying to use on-disk arrays in a memory limited environment.

0
Entering edit mode

Thanks Mike, great tips there! I liked the idea you used in faster_dpois.

2
Entering edit mode

Hi Mike, Pedro,

I don't have much to add to Mike's great answer. Thanks Mike!

Maybe 3 things worth mentioning:

1. dpois() and rowSums() can be used directly on a DelayedArray object:

For the particular situation where you just want to compute rowSums(dpois(mat_disk, lambda=3, log=TRUE)), you don't really need to implement your own block processing solution. You can do rowSums(dpois(mat_disk, lambda=3, log=TRUE)) directly on HDF5Array object mat_disk, and this will use block processing internally to compute the result:

> mat_disk
<100000 x 10000> sparse matrix of class HDF5Matrix and type "integer":
[,1]     [,2]     [,3]     [,4] ...  [,9997]  [,9998]  [,9999] [,10000]
[1,]        0        0        0        0   .        1        0        0        0
[2,]        0        0        1        0   .        0        0        0        0
[3,]        0        0        0        1   .        0        0        0        0
[4,]        0        0        0        0   .        0        0        0        0
[5,]        1        0        0        0   .        0        1        0        0
...        .        .        .        .   .        .        .        .        .
[99996,]        0        0        0        0   .        0        0        0        0
[99997,]        0        0        0        0   .        1        0        0        0
[99998,]        1        0        0        0   .        0        0        0        0
[99999,]        0        0        0        0   .        0        0        0        0
[100000,]        1        0        0        0   .        0        0        1        0

> DelayedArray:::set_verbose_block_processing(TRUE)

> system.time(res <- rowSums(dpois(mat_disk, lambda=3, log=TRUE)))
Processing block [[1/29, 1/3]] ... OK
Processing block [[1/29, 2/3]] ... OK
Processing block [[1/29, 3/3]] ... OK
Processing block [[2/29, 1/3]] ... OK
Processing block [[2/29, 2/3]] ... OK
Processing block [[2/29, 3/3]] ... OK
...
Processing block [[28/29, 1/3]] ... OK
Processing block [[28/29, 2/3]] ... OK
Processing block [[28/29, 3/3]] ... OK
Processing block [[29/29, 1/3]] ... OK
Processing block [[29/29, 2/3]] ... OK
Processing block [[29/29, 3/3]] ... OK
user  system elapsed
39.035   5.852  44.868


Note that dpois() is implemented as a delayed operation. This means that when you call dpois() on a DelayedArray object, the operation is not performed on the moment. Instead a new DelayedArray object is returned that carries the delayed dpois() operation.

2. Estimating the cost of IO:

It can be useful to get an idea of the cost of IO. This cost can vary a lot from one machine to another depending on the performance of the disk. An easy way to do this is to call blockApply() on the HDF5Array object and to apply a function that does nothing. This will walk on the grid and load all the blocks sequentially but won't do anything to them:

grid <- rowAutoGrid(mat_disk, nrow=2000)
system.time(blockApply(mat_disk, function(block) NULL, grid=grid, verbose=TRUE))
/ Reading and realizing sparse block 1/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 2/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 3/50 as dense block ... OK
\ Processing it ... OK
...
/ Reading and realizing sparse block 48/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 49/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 50/50 as dense block ... OK
\ Processing it ... OK
user  system elapsed
9.806   1.344  11.133


So on my machine the cost of IO is 11.133 sec. Think of it as the minimal cost that will have to be paid no matter what we'll do on each block.

Now if we process the blocks by applying a function that actually performs some operations on them, then we pay the additional cost of these operations:

> FUN1 <- function(block) rowSums(dpois(block, lambda=3, log=TRUE))
> system.time(res1 <- blockApply(mat_disk, FUN1, grid=grid, verbose=TRUE))
/ Reading and realizing sparse block 1/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 2/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 3/50 as dense block ... OK
\ Processing it ... OK
...
/ Reading and realizing sparse block 48/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 49/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 50/50 as dense block ... OK
\ Processing it ... OK
user  system elapsed
42.513   5.576  48.082


This confirms Mike's observation that much more time is spent on computing rowSums(dpois()) on each block than on loading them from the disk.

3. Using precomputed dpois() values:

Replacing dpois() with Mike's faster_dpois():

> FUN2 <- function(block) rowSums(faster_dpois(block, lambda=3, log=TRUE))
> system.time(res2 <- blockApply(mat_disk, FUN2, grid=grid, verbose=TRUE))
/ Reading and realizing sparse block 1/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 2/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 3/50 as dense block ... OK
\ Processing it ... OK
...
/ Reading and realizing sparse block 48/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 49/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 50/50 as dense block ... OK
\ Processing it ... OK
user  system elapsed
31.448   7.524  38.964


Note that this can be made even more efficient by computing in advance all the possible dpois() values for a given lambda. This requires that we know an upper limit for the count values in mat_disk:

## Assuming we know that the count values are >= 0 and <= 255 (in our case we know this
## because the H5 type of the dataset is H5T_STD_U8LE):

> h5ls(path(mat_disk), all=TRUE)
group   name         ltype cset       otype num_attrs  dclass        dtype
0     / counts H5L_TYPE_HARD    0 H5I_DATASET         1 INTEGER H5T_STD_U8LE
stype rank            dim         maxdim
0 SIMPLE    2 100000 x 10000 100000 x 10000

precomputed_dpois_values <- dpois(0:255, lambda=3, log=TRUE)

precomputed_dpois <- function(x)
{
x2 <- precomputed_dpois_values[x + 1L]
dim(x2) <- dim(x)
x2
}

> FUN3 <- function(block) rowSums(precomputed_dpois(block))
> system.time(res3 <- blockApply(mat_disk, FUN3, grid=grid, verbose=TRUE))
/ Reading and realizing sparse block 1/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 2/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 3/50 as dense block ... OK
\ Processing it ... OK
...
/ Reading and realizing sparse block 48/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 49/50 as dense block ... OK
\ Processing it ... OK
/ Reading and realizing sparse block 50/50 as dense block ... OK
\ Processing it ... OK
user  system elapsed
20.739   5.623  26.348

> identical(res1, res2)
[1] TRUE
> identical(res1, res3)
[1] TRUE


Cheers,

H.

1
Entering edit mode

FWIW here is the code I used to generate the mock data:

library(HDF5Array)

writeRandomSparseCounts <- function(filepath, name, dim,
chunkdim=c(250, 250), density=0.1)
{
sink <- HDF5RealizationSink(dim, type="integer",
filepath=filepath, name=name,
H5type="H5T_STD_U8LE", chunkdim=chunkdim)
sink_grid <- defaultSinkAutoGrid(sink)
nblock <- length(sink_grid)
for (bid in seq_len(nblock)) {
viewport <- sink_grid[[bid]]
block_dim <- dim(viewport)
block <- matrix(rpois(prod(block_dim), density),
nrow=block_dim[[1L]], ncol=block_dim[[2L]])
message("writing counts for block ", bid, "/", nblock, " ... ",
appendLF=FALSE)
sink <- write_block(sink, viewport, block)
message("ok")
}
close(sink)
}

filepath <- "mockdata_consensus_counts.h5"
name <- "counts"
mockdata_dim <- c(100000L, 10000L)
writeRandomSparseCounts(filepath, name, mockdata_dim)  # took about 65 sec on my machine

0
Entering edit mode

This is neat. I hadn't thought of using the datatype of the HDF5 dataset to set an upper limit. I actually wrote a further version that precomputed dpois() based on the max value of mat, but in practice was slower to iterate once through the whole matrix to find the maximum and then again to compute rowSums() vs doing everything on a per-block basis. I suppose your approach might be considerably different if the datatype were 32-bit integers.

0
Entering edit mode

If the H5 datatype of the count values in mat_disk were 32-bit integers and we don't have an upper limit for these values, then I guess I would use your faster_dpois().

FWIW a few months ago I started to work on fast implementations of basic summarization functions like max(), min(), sum(), etc... for H5 datasets:

> system.time(max_count <- HDF5Array:::h5summarize("mockdata_consensus_counts.h5", "counts", as.integer=TRUE, op="max"))
user  system elapsed
5.657   0.028   5.685


This is still work-in-progress and the code is not fully optimized yet. When this is ready, it will be used behind the scene when the user calls things like max() on an HDF5Array object or a subset of it.

Being able to quickly compute the max() will make this kind of 2-pass approach (1st pass to get the max count, 2nd pass to compute rowSums(dpois())) more appealing.