Search
Question: Unable to calculate in parallel what can be calculated on a for loop
1
gravatar for Lluís R
12 months ago by
Lluís R300
European Union
Lluís R300 wrote:

I am using registerDoParallel(2) to run some analysis in parallel using foreach and %dopar%. However I can't manage to calculate in parallel one thing I can calculate in sequential order.

The function I want to use in parallel is this one:

# Calculates the degree of overlap of the GO BP ontologies of entrez ids.
go_cor <- function(e_a, e_b, chip = "hgu133plus2.db", mapfun = NULL,
                   Ontology = "BP", ...) {
  # https://support.bioconductor.org/p/85702/#85732

  if (is.na(e_a) | is.na(e_b)) {
    return(NA)
  }
  # Ensure proper format
  e_a <- as.character(e_a)
  e_b <- as.character(e_b)

  if (mapfun) {
    mapfunc <- function(z) {
      mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
    }

    LP <- simLL(e_a, e_b, Ontology, measure = "LP", mapfun = mapfunc)
    UI <- simLL(e_a, e_b, Ontology, measure = "UI", mapfun = mapfunc)
  } else {
    LP <- simLL(e_a, e_b, Ontology, measure = "LP", chip = chip)
    UI <- simLL(e_a, e_b, Ontology, measure = "UI", chip = chip)
  }
  out <- NA
  if (length(LP) > 1 | length(UI) > 1) {
    if (is.na(LP["sim"]) | is.na(UI["sim"])) {
      return(NA)
    } else {
      mean.g1 <- s.path(LP$g1)
      mean.g2 <- s.path(LP$g2)
      out <- (UI$sim/LP$sim)*max(mean.g1, mean.g2,na.rm = T)
      return(out)
    }
  } else if (is.na(LP) | is.na(UI)) {
    return(out)
  }
  return(out)
}

s.path <- function(ig){
  lfi <- graph::leaves(ig, "in")
  degs <- graph::degree(ig)
  root <- names(degs$outDegree)[degs$outDegree == 0]
  paths <- RBGL::sp.between(ig, lfi, root)
  plens <- Biobase::subListExtract(paths, "length", simplify = TRUE)
  out <- max(plens)
}

This code is called with code similar to:

comb <-list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"
), c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"
), c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"
), c("159", "175"), c("159", "280"), c("175", "280"))

go.mat <- foreach(i = seq_len(length(comb)),
                  .combine = c, verbose = TRUE) %dopar% {
    ## message("new comb ", paste(comb))
    combi <- comb[i]
    go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
}

When I use %dopar% or %do% with registerDoSEQ() I just get the first score, but when I use a for loop it calculates more than one score `for (i in 1:length(comb){....}`.

Is there anything on the Bioconductor packages I use inside go_cor function that prevents me to running it on parallel? I thought it could be due to not loading the right libraries, on the workers, but I used the `.package =  c("org.Hs.eg.db", "RBGL", "Rgraphviz", "graph", "Biobase", "AnnotationDbi")` and still it didn't work. By didn't work I mean I get no error neither message of why it is failing, just one plain value in go.mat and nothing else.

 

> sessionInfo()
R version 3.2.3 (2015-12-10)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 14.04.2 LTS

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=es_ES.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=es_ES.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=es_ES.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=es_ES.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
 [1] grid      stats4    parallel  stats     graphics  grDevices utils    
 [8] datasets  methods   base     

other attached packages:
 [1] topGO_2.22.0         SparseM_1.72         Rgraphviz_2.14.0    
 [4] RBGL_1.46.0          org.Hs.eg.db_3.2.3   GOstats_2.36.0      
 [7] graph_1.48.0         Category_2.36.0      GO.db_3.2.2         
[10] RSQLite_1.0.0        DBI_0.5-1            AnnotationDbi_1.32.3
[13] IRanges_2.4.8        S4Vectors_0.8.11     Matrix_1.2-7.1      
[16] Biobase_2.30.0       BiocGenerics_0.16.1  doParallel_1.0.10   
[19] iterators_1.0.8      foreach_1.4.3        BiocParallel_1.4.3  

loaded via a namespace (and not attached):
 [1] splines_3.2.3          xtable_1.8-2           lattice_0.20-34       
 [4] tools_3.2.3            AnnotationForge_1.12.2 genefilter_1.52.1     
 [7] lambda.r_1.1.9         futile.logger_1.4.3    survival_2.39-5       
[10] GSEABase_1.32.0        futile.options_1.0.0   codetools_0.2-15      
[13] XML_3.98-1.4           annotate_1.48.0      
> registered()
$MulticoreParam
class: MulticoreParam
  bpjobname:BPJOB; bpworkers:10; bptasks:0; bptimeout:Inf; bpRNGseed:; bpisup:FALSE
  bplog:FALSE; bpthreshold:INFO; bplogdir:NA
  bpstopOnError:FALSE; bpprogressbar:FALSE
  bpresultdir:NA
cluster type: FORK

$SnowParam
class: SnowParam
  bpjobname:BPJOB; bpworkers:10; bptasks:0; bptimeout:Inf; bpRNGseed:; bpisup:FALSE
  bplog:FALSE; bpthreshold:INFO; bplogdir:NA
  bpstopOnError:FALSE; bpprogressbar:FALSE
  bpresultdir:NA
cluster type: SOCK

$SerialParam
class: SerialParam
  bplog:FALSE; bpthreshold:INFO
  bpcatchErrors:FALSE

EDIT: After some modifications (per Mike answer) I can see all the iterations but there is an error in

task 2 failed - "rsqlite_query_fetch: failed first step: database disk image is malformed"
ADD COMMENTlink modified 11 months ago by Martin Morgan ♦♦ 20k • written 12 months ago by Lluís R300
1
gravatar for Martin Morgan
11 months ago by
Martin Morgan ♦♦ 20k
United States
Martin Morgan ♦♦ 20k wrote:

I made your example as minimal as possible by removing parts until I had just what was needed to reproduce the error

suppressPackageStartupMessages({
    library("GOstats")
    library("org.Hs.eg.db")

    library("foreach")
    library("doParallel")
    registerDoParallel()
})

go_cor <- function(e_a, e_b) {
    mapfunc <- function(z)
        mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
    simLL(e_a, e_b, "BP", measure = "LP", mapfun = mapfunc)
}

comb <- list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"),
    c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"),
    c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"),
    c("159", "175"), c("159", "280"), c("175", "280"))

foreach(combi = comb) %dopar%
    go_cor(combi[1], combi[2])

I agree with Robert that the likely problem is that multiple threads are using the same database connection, and I tried to solve this by loading the data base packages in the workers only

suppressPackageStartupMessages({
    library("foreach")
    library("doParallel")
    registerDoParallel()
})

go_cor <- function(e_a, e_b) {
    suppressPackageStartupMessages({
        library("GOstats")
        library("org.Hs.eg.db")
    })
    mapfunc <- function(z)
        mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
    simLL(e_a, e_b, "BP", measure = "LP", mapfun = mapfunc)
}

comb <- list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"),
    c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"),
    c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"),
    c("159", "175"), c("159", "280"), c("175", "280"))

foreach(combi = comb) %dopar%
    go_cor(combi[1], combi[2])

This seems to work.

ADD COMMENTlink modified 11 months ago • written 11 months ago by Martin Morgan ♦♦ 20k
0
gravatar for Mike Smith
12 months ago by
Mike Smith2.1k
EMBL Heidelberg / de.NBI
Mike Smith2.1k wrote:

A couple of thoughts on this bit of code

go.mat <- foreach(i = seq_len(length(comb),
                      .combine = c, verbose = TRUE) %dopar% {
                        # message("new comb ", paste(comb))
                        combi <- comb[i]
                        go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
                      }

Are you missing a bracket to close seq_len() before the .combine? Also the verbose argument to foreach() has a period before it e.g. .verbose = TRUE. I think without that you're passing verbose into the evaluation environment and that may be influencing how many times it is executed.

Does this work any better?

 

go.mat <- foreach(i = seq_len(length(comb)),
                  .combine = c,
                  .verbose = TRUE) %dopar% {
                      # message("new comb ", paste(comb))
                      combi <- comb[i]
                      go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
                  }
ADD COMMENTlink written 12 months ago by Mike Smith2.1k

The bracket was a mistake when doing the reproducible example (Rstudio makes it so easy to accommodate), the .verbose, might be the reason, at least not it takes longer to finish. I hope it won't be that, it would be too embarrassing. Thanks Mike

ADD REPLYlink written 12 months ago by Lluís R300

Although it has helped the .verbose, didn't work completely, I can see all the iterations but there is an error in

task 2 failed - "rsqlite_query_fetch: failed first step: database disk image is malformed"

 

And on the verbose output

numValues: 843, numResults: 0, stopped: TRUE
got results for task 1
numValues: 843, numResults: 1, stopped: TRUE
returning status FALSE
got results for task 2
accumulate got an error result
numValues: 843, numResults: 2, stopped: TRUE
returning status FALSE

ADD REPLYlink modified 12 months ago • written 12 months ago by Lluís R300

I would suggest updating to the latest version of R and the various packages.  Yours is nearly a year old and this runs without errors for me.  Here's the output from go.mat and my sessionInfo()

> go.mat
 [1] 0.2516854 0.2146341 0.2335456 0.2400000 0.2400000 0.5490196 0.2777778 0.4949495 0.2424242 0.1730769 0.8115942 0.5555556 0.2222222
[14] 0.2040816 0.5072464
> sessionInfo()
R Under development (unstable) (2016-08-31 r71184)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 14.04.4 LTS

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8     LC_MONETARY=de_DE.UTF-8   
 [6] LC_MESSAGES=en_US.UTF-8    LC_PAPER=de_DE.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=de_DE.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats4    parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
 [1] BiocParallel_1.8.1   GO.db_3.4.0          org.Hs.eg.db_3.4.0   GOstats_2.40.0       graph_1.52.0         Category_2.40.0     
 [7] Matrix_1.2-7.1       AnnotationDbi_1.36.0 IRanges_2.8.0        S4Vectors_0.12.0     Biobase_2.34.0       BiocGenerics_0.20.0 
[13] BiocInstaller_1.24.0 foreach_1.4.3       

loaded via a namespace (and not attached):
 [1] splines_3.4.0          xtable_1.8-2           lattice_0.20-34        tools_3.4.0            grid_3.4.0            
 [6] AnnotationForge_1.16.0 DBI_0.5-1              genefilter_1.56.0      iterators_1.0.8        survival_2.39-5       
[11] RBGL_1.50.0            GSEABase_1.36.0        bitops_1.0-6           codetools_0.2-15       RCurl_1.95-4.8        
[16] RSQLite_1.0.0          compiler_3.4.0         XML_3.98-1.4           annotate_1.52.0       
ADD REPLYlink modified 12 months ago • written 12 months ago by Mike Smith2.1k

Sometimes the error is on task 2 but if I run it again it might come in task X or Y, which could be already addressed in newer versions. Thanks for testing it.

ADD REPLYlink modified 12 months ago • written 12 months ago by Lluís R300

I have updated to the latest R and Bioconductor version but the problem is still there. I have problems providing a useful reproducible error because each time the error is different.The error in the task is "accumulate got an error result" which results on foreach stopping because:"INTEGER() can only be applied to a 'integer', not a 'double'" or "rsqlite_query_fetch: failed: database disk image is malformed", but it happens at different combinations.

ADD REPLYlink written 11 months ago by Lluís R300

Lluis,

this error suggests me that at some point more than one process is trying to access the SQLite database behind an annotation package through the same connection, which means through the same annotation object. Instead, each process should load independently the annotation package, which will create its own unique SQLite connection within the process.

cheers,

robert.

ADD REPLYlink written 11 months ago by Robert Castelo2.0k

It might be but If I load the annotation package independently in each worker via foreach(..., .packages = ("GOstats", "org.Hs.eg.db")) I get the same error.

ADD REPLYlink written 11 months ago by Lluís R300
Please log in to add an answer.

Help
Access

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.
Powered by Biostar version 2.2.0
Traffic: 153 users visited in the last hour