Question: Using iterators and foreach with doMC, using DNAStringSets
0
gravatar for ben.ward
3.8 years ago by
ben.ward30
United Kingdom
ben.ward30 wrote:

Hi, This is the first time I've programmed a parallel solution in R and Bioconductor, so I wanted to make a post and see if I've done this optimally/right!

My problem was to do compute a series of sliding window based sequence identity scans between many pairs two sequences. To do this, I programmed two iterators. One iterates sliding windows:

windows <- function(obj, ...){
  UseMethod('windows')
}

getIterVal <- function(obj, ...){
  UseMethod('getIterVal')
}

windows.default <- function(obj, width, step, checkFunc = function(...) TRUE){
  n <- length(obj)
  if(width < 1){stop("Window width must be ≥ 1.")}
  if(step < 1){stop("step must be ≥ 1.")}
  if(any(width > n)){stop("The window size cannot be greater than number of data elements.")}
  
  state <- new.env()
  state$i <- 1L
  state$obj <- obj
  state$width <- width
  state$step <- step
  
  it <- list(state=state, length=n, checkFunc=checkFunc)
  class(it) <- c("containerwindow", "window", "iter")
  return(it)
}

getIterVal.containerwindow <- function(obj, ...) {
  i <- obj$state$i
  if (i > obj$length)
    stop('SubscriptOutOfBounds', call.=FALSE)
  start <- i
  end <- start + obj$state$width - 1
  return(obj$state$obj[start:end])
}

nextElem.containerwindow <- function(obj, ...) {
  repeat {
    tryCatch({
      val <- getIterVal(obj)
      if(obj$checkFunc(val)){
        obj$state$i <- obj$state$i + obj$state$step
        return(val)
      }
      obj$state$i <- obj$state$i + obj$state$step
    }, error = function(e){
      if(any(nzchar(e$message))){
        if(identical(e$message, "SubscriptOutOfBounds")){
          stop("StopIteration", call.=FALSE)
        } else {
          stop(e$message, call.=FALSE)
        }
      } else {
        stop("Abort", call.=e)
      }
    })
  }
}

This iterator can then be used in a function that takes a DNAStringSet of two sequences:

slidingSimilarity <- function(dna, windowSize, stepSize){
  conMat <- colSums(consensusMatrix(dna) != 0) > 1
  itr <- windows(conMat,
                 width = windowSize,
                 step = stepSize,
                 checkFunc = function(i) !any(is.na(i)));
  dists <- foreach(x = itr, .combine = c) %do% {
    sum(x)
  }
  dists <- 100 - round((dists / windowSize) * 100)
  windowStarts <- seq(from = 1, to = width(dna)[1], by = stepSize)
  windowEnds <- seq(from = windowSize, to = width(dna)[1], by = stepSize)
  ranges <- IRanges(start = windowStarts[1:length(windowEnds)], end = windowEnds)
  data <- RangedData(ranges = ranges, rawSS = dists)
  return(data)
}

From two aligned sequences in a DNAStringSet - I get a neat sliding window scan of sequence identity!

But now say I wanted to do many pairs of sequences, this may take some time, so I'd want to parallelise. However, I want to avoid passing the entire DNAStringSet object (or copies) around, as this could be memory intensive. The idea came to me this can be done with an iterator too, which should only pass around one or two sequences, and not the entire dataset. That's the idea anyway, and I try to accomplish it by defining an iterator as follows. 

pairsRef <- function(obj, ...){
  UseMethod('pairsRef')
}

pairsRef.DNAStringSet <- function(obj, ref = NULL, checkFunc = function(...) TRUE){
  state <- new.env()
  state$i <- 0L
  state$obj <- obj
  if(is.null(ref)){
    state$ref <- names(obj)[1]
  } else {
    state$ref <- ref
  }
  state$nonRefs <- names(obj)
  state$nonRefs <- state$nonRefs[state$nonRefs != state$ref]
  it <- list(state=state, checkFunc=checkFunc)
  class(it) <- c("pairsRef", "iter")
  return(it)
}

getIterVal.pairsRef <- function(obj, plus = 0L, ...) {
  i <- obj$state$i + plus
  if(i > length(obj$state$nonRefs))
    stop('SubscriptOutOfBounds', call.=FALSE)
  names <- c(obj$state$ref, obj$state$nonRefs[i])
  return(obj$state$obj[names])
}

nextElem.pairsRef <- function(obj, ...) {
  repeat {
    tryCatch({
      val <- getIterVal(obj, 1L)
      if(obj$checkFunc(val)){
        obj$state$i <- obj$state$i + 1L
        return(val)
      }
      obj$state$i <- obj$state$i + 1L
    }, error = function(e){
      if(any(nzchar(e$message))){
        if(identical(e$message, "SubscriptOutOfBounds")){
          stop("StopIteration", call.=FALSE)
        } else {
          stop(e$message, call.=FALSE)
        }
      } else {
        stop("Abort", call.=e)
      }
    })
  }
}​

Using the iterator many analyses of pairs of sequences can be done using multiple cores:

library(Biostrings)
library(iterators)
library(foreach)
library(doMC)

nworkers <- 3
registerDoMC(nworkers)


sequences <- readDNAStringSet(fastaFile, format = "fasta", use.names = TRUE)


results <- foreach(x = pairsRef(sequences)) %dopar% {
    slidingSimilarity(x, windowSize, stepSize)
  }​

So, does this work for avoiding copying whole objects, and is it the right sort of thing to do? Comments, suggestions, and advice is very much appreciated.

Thanks,

Ben.

ADD COMMENTlink modified 3.8 years ago • written 3.8 years ago by ben.ward30

That's a lot of code and it would be interesting to actually run it on an appropriate fasta file that we all have access to, e.g., one available in dir(system.file(package="Biostrings", "extdata")).

 

ADD REPLYlink written 3.8 years ago by Martin Morgan ♦♦ 23k

If you want your package to be portable/cross-platform, don't use doMC, it does not work on windows.

Use the parallel package, or better yet, BiocParallel, which will automatically select the appropriate back end for you depending on your platform.

ADD REPLYlink written 3.8 years ago by Dan Tenenbaum8.2k

Thanks Dan! This looks like it might be exactly what I need. One question if anyone knows this - does the BiocPar methods or apply, mapply and so on work like I believe my own iterators do, in that they only send a subset of the data to a worker, to avoid the overhead of copying a large data object.

ADD REPLYlink modified 3.7 years ago • written 3.7 years ago by ben.ward30

Yes, the typical approach is to separate data into 'tasks' chunks (argument to *Param() constructors), and send each chunk to a worker. The default tasks=0 divides the data into as many chunks as workers. tasks=length(X) would divide X into chunks of size 1; this tends to be quite inefficient, but would be appropriate if each worker did a lot of work, the work load varied considerably between tasks, or one was very concerned about managing memory.

But one of the funny things is that you can often get much better performance increase, much less complexity, and much more robust code by writing efficient vectorized code. So really your first stop is, of course IMHO, to add to your question with a runnable example and expected output.

ADD REPLYlink written 3.7 years ago by Martin Morgan ♦♦ 23k
Please log in to add an answer.

Help
Access

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.
Powered by Biostar version 16.09
Traffic: 222 users visited in the last hour