Search
Question: Unable to calculate in parallel what can be calculated on a for loop
1
23 months ago by
European Union
Lluís Revilla Sancho400 wrote:

I am using registerDoParallel(2) to run some analysis in parallel using foreach and %dopar%. However I can't manage to calculate in parallel one thing I can calculate in sequential order.

The function I want to use in parallel is this one:

# Calculates the degree of overlap of the GO BP ontologies of entrez ids.
go_cor <- function(e_a, e_b, chip = "hgu133plus2.db", mapfun = NULL,
Ontology = "BP", ...) {
# https://support.bioconductor.org/p/85702/#85732

if (is.na(e_a) | is.na(e_b)) {
return(NA)
}
# Ensure proper format
e_a <- as.character(e_a)
e_b <- as.character(e_b)

if (mapfun) {
mapfunc <- function(z) {
mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
}

LP <- simLL(e_a, e_b, Ontology, measure = "LP", mapfun = mapfunc)
UI <- simLL(e_a, e_b, Ontology, measure = "UI", mapfun = mapfunc)
} else {
LP <- simLL(e_a, e_b, Ontology, measure = "LP", chip = chip)
UI <- simLL(e_a, e_b, Ontology, measure = "UI", chip = chip)
}
out <- NA
if (length(LP) > 1 | length(UI) > 1) {
if (is.na(LP["sim"]) | is.na(UI["sim"])) {
return(NA)
} else {
mean.g1 <- s.path(LP$g1) mean.g2 <- s.path(LP$g2)
out <- (UI$sim/LP$sim)*max(mean.g1, mean.g2,na.rm = T)
return(out)
}
} else if (is.na(LP) | is.na(UI)) {
return(out)
}
return(out)
}

s.path <- function(ig){
lfi <- graph::leaves(ig, "in")
degs <- graph::degree(ig)
root <- names(degs$outDegree)[degs$outDegree == 0]
paths <- RBGL::sp.between(ig, lfi, root)
plens <- Biobase::subListExtract(paths, "length", simplify = TRUE)
out <- max(plens)
}

This code is called with code similar to:

comb <-list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"
), c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"
), c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"
), c("159", "175"), c("159", "280"), c("175", "280"))

go.mat <- foreach(i = seq_len(length(comb)),
.combine = c, verbose = TRUE) %dopar% {
## message("new comb ", paste(comb))
combi <- comb[i]
go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
}


When I use %dopar% or %do% with registerDoSEQ() I just get the first score, but when I use a for loop it calculates more than one score for (i in 1:length(comb){....}.

Is there anything on the Bioconductor packages I use inside go_cor function that prevents me to running it on parallel? I thought it could be due to not loading the right libraries, on the workers, but I used the .package =  c("org.Hs.eg.db", "RBGL", "Rgraphviz", "graph", "Biobase", "AnnotationDbi") and still it didn't work. By didn't work I mean I get no error neither message of why it is failing, just one plain value in go.mat and nothing else.

> sessionInfo()
R version 3.2.3 (2015-12-10)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 14.04.2 LTS

locale:
[1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C
[3] LC_TIME=es_ES.UTF-8        LC_COLLATE=en_US.UTF-8
[5] LC_MONETARY=es_ES.UTF-8    LC_MESSAGES=en_US.UTF-8
[7] LC_PAPER=es_ES.UTF-8       LC_NAME=C
[11] LC_MEASUREMENT=es_ES.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] grid      stats4    parallel  stats     graphics  grDevices utils
[8] datasets  methods   base

other attached packages:
[1] topGO_2.22.0         SparseM_1.72         Rgraphviz_2.14.0
[4] RBGL_1.46.0          org.Hs.eg.db_3.2.3   GOstats_2.36.0
[7] graph_1.48.0         Category_2.36.0      GO.db_3.2.2
[10] RSQLite_1.0.0        DBI_0.5-1            AnnotationDbi_1.32.3
[13] IRanges_2.4.8        S4Vectors_0.8.11     Matrix_1.2-7.1
[16] Biobase_2.30.0       BiocGenerics_0.16.1  doParallel_1.0.10
[19] iterators_1.0.8      foreach_1.4.3        BiocParallel_1.4.3

loaded via a namespace (and not attached):
[1] splines_3.2.3          xtable_1.8-2           lattice_0.20-34
[4] tools_3.2.3            AnnotationForge_1.12.2 genefilter_1.52.1
[7] lambda.r_1.1.9         futile.logger_1.4.3    survival_2.39-5
[10] GSEABase_1.32.0        futile.options_1.0.0   codetools_0.2-15
[13] XML_3.98-1.4           annotate_1.48.0      
> registered()
$MulticoreParam class: MulticoreParam bpjobname:BPJOB; bpworkers:10; bptasks:0; bptimeout:Inf; bpRNGseed:; bpisup:FALSE bplog:FALSE; bpthreshold:INFO; bplogdir:NA bpstopOnError:FALSE; bpprogressbar:FALSE bpresultdir:NA cluster type: FORK$SnowParam
class: SnowParam
bpjobname:BPJOB; bpworkers:10; bptasks:0; bptimeout:Inf; bpRNGseed:; bpisup:FALSE
bplog:FALSE; bpthreshold:INFO; bplogdir:NA
bpstopOnError:FALSE; bpprogressbar:FALSE
bpresultdir:NA
cluster type: SOCK

\$SerialParam
class: SerialParam
bplog:FALSE; bpthreshold:INFO
bpcatchErrors:FALSE

EDIT: After some modifications (per Mike answer) I can see all the iterations but there is an error in

task 2 failed - "rsqlite_query_fetch: failed first step: database disk image is malformed"
modified 23 months ago by Martin Morgan ♦♦ 22k • written 23 months ago by Lluís Revilla Sancho400
1
23 months ago by
Martin Morgan ♦♦ 22k
United States
Martin Morgan ♦♦ 22k wrote:

I made your example as minimal as possible by removing parts until I had just what was needed to reproduce the error

suppressPackageStartupMessages({
library("GOstats")
library("org.Hs.eg.db")

library("foreach")
library("doParallel")
registerDoParallel()
})

go_cor <- function(e_a, e_b) {
mapfunc <- function(z)
mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
simLL(e_a, e_b, "BP", measure = "LP", mapfun = mapfunc)
}

comb <- list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"),
c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"),
c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"),
c("159", "175"), c("159", "280"), c("175", "280"))

foreach(combi = comb) %dopar%
go_cor(combi[1], combi[2])

I agree with Robert that the likely problem is that multiple threads are using the same database connection, and I tried to solve this by loading the data base packages in the workers only

suppressPackageStartupMessages({
library("foreach")
library("doParallel")
registerDoParallel()
})

go_cor <- function(e_a, e_b) {
suppressPackageStartupMessages({
library("GOstats")
library("org.Hs.eg.db")
})
mapfunc <- function(z)
mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
simLL(e_a, e_b, "BP", measure = "LP", mapfun = mapfunc)
}

comb <- list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"),
c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"),
c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"),
c("159", "175"), c("159", "280"), c("175", "280"))

foreach(combi = comb) %dopar%
go_cor(combi[1], combi[2])

This seems to work.

0
23 months ago by
Mike Smith2.9k
EMBL Heidelberg / de.NBI
Mike Smith2.9k wrote:

A couple of thoughts on this bit of code

go.mat <- foreach(i = seq_len(length(comb),
.combine = c, verbose = TRUE) %dopar% {
# message("new comb ", paste(comb))
combi <- comb[i]
go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
}

Are you missing a bracket to close seq_len() before the .combine? Also the verbose argument to foreach() has a period before it e.g. .verbose = TRUE. I think without that you're passing verbose into the evaluation environment and that may be influencing how many times it is executed.

Does this work any better?

go.mat <- foreach(i = seq_len(length(comb)),
.combine = c,
.verbose = TRUE) %dopar% {
# message("new comb ", paste(comb))
combi <- comb[i]
go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
}

The bracket was a mistake when doing the reproducible example (Rstudio makes it so easy to accommodate), the .verbose, might be the reason, at least not it takes longer to finish. I hope it won't be that, it would be too embarrassing. Thanks Mike

Although it has helped the .verbose, didn't work completely, I can see all the iterations but there is an error in

task 2 failed - "rsqlite_query_fetch: failed first step: database disk image is malformed"

And on the verbose output

numValues: 843, numResults: 0, stopped: TRUE
numValues: 843, numResults: 1, stopped: TRUE
returning status FALSE
accumulate got an error result
numValues: 843, numResults: 2, stopped: TRUE
returning status FALSE



I would suggest updating to the latest version of R and the various packages.  Yours is nearly a year old and this runs without errors for me.  Here's the output from go.mat and my sessionInfo()

> go.mat
[1] 0.2516854 0.2146341 0.2335456 0.2400000 0.2400000 0.5490196 0.2777778 0.4949495 0.2424242 0.1730769 0.8115942 0.5555556 0.2222222
[14] 0.2040816 0.5072464
> sessionInfo()
R Under development (unstable) (2016-08-31 r71184)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 14.04.4 LTS

locale:
[1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8     LC_MONETARY=de_DE.UTF-8
[6] LC_MESSAGES=en_US.UTF-8    LC_PAPER=de_DE.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C
[11] LC_MEASUREMENT=de_DE.UTF-8 LC_IDENTIFICATION=C

attached base packages:
[1] stats4    parallel  stats     graphics  grDevices utils     datasets  methods   base

other attached packages:
[1] BiocParallel_1.8.1   GO.db_3.4.0          org.Hs.eg.db_3.4.0   GOstats_2.40.0       graph_1.52.0         Category_2.40.0
[7] Matrix_1.2-7.1       AnnotationDbi_1.36.0 IRanges_2.8.0        S4Vectors_0.12.0     Biobase_2.34.0       BiocGenerics_0.20.0
[13] BiocInstaller_1.24.0 foreach_1.4.3

loaded via a namespace (and not attached):
[1] splines_3.4.0          xtable_1.8-2           lattice_0.20-34        tools_3.4.0            grid_3.4.0
[6] AnnotationForge_1.16.0 DBI_0.5-1              genefilter_1.56.0      iterators_1.0.8        survival_2.39-5
[11] RBGL_1.50.0            GSEABase_1.36.0        bitops_1.0-6           codetools_0.2-15       RCurl_1.95-4.8
[16] RSQLite_1.0.0          compiler_3.4.0         XML_3.98-1.4           annotate_1.52.0       

Sometimes the error is on task 2 but if I run it again it might come in task X or Y, which could be already addressed in newer versions. Thanks for testing it.

I have updated to the latest R and Bioconductor version but the problem is still there. I have problems providing a useful reproducible error because each time the error is different.The error in the task is "accumulate got an error result" which results on foreach stopping because:"INTEGER() can only be applied to a 'integer', not a 'double'" or "rsqlite_query_fetch: failed: database disk image is malformed", but it happens at different combinations.

Lluis,

this error suggests me that at some point more than one process is trying to access the SQLite database behind an annotation package through the same connection, which means through the same annotation object. Instead, each process should load independently the annotation package, which will create its own unique SQLite connection within the process.

cheers,

robert.