Is there any way to break out of a foreach loop?

Coryn Bailer-Jones picture Coryn Bailer-Jones · Apr 18, 2013 · Viewed 8k times · Source

I am using the R package foreach() with %dopar% to do long (~days) calculations in parallel. I would like the ability to stop the entire set of calculations in the event that one of them produces an error. However, I have not found a way to achieve this, and from the documentation and various forums I have found no indication that this is possible. In particular, break() does not work and stop() only stops the current calculation, not the whole foreach loop.

Note that I cannot use a simple for loop, because ultimately I want to parallelize this using the doRNG package.

Here is a simplified, reproducible version of what I am attempting (shown here in serial with %do%, but I have the same problem when using doRNG and %dopar%). Note that in reality I want to run all of the elements of this loop (here 10) in parallel.

library(foreach)
myfunc <- function() {
  x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
    cat("Element ", k, "\n")
    Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
    if(is.element(k, 2:6)) {
      cat("Should stop\n")
      stop("Has stopped")
    }
    k
  }
  return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"

What I would like to achieve is that the entire foreach loop can be exited immediately upon some condition (here, when the stop() is encountered).

I have found no way to achieve this with foreach. It seems that I would need a way to send a message to all the other processes to make them stop too.

If not possible with foreach, does anyone know of alternatives? I have also tried to achieve this with parallel::mclapply, but that does not work either.

> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)

locale:
[1] C/UTF-8/C/C/C/C

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods base

other attached packages:
[1] foreach_1.4.0

loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0  iterators_1.0.6

Answer

Steve Weston picture Steve Weston · Apr 19, 2013

It sounds like you want an impatient version of the "stop" error handling. You could implement that by writing a custom combine function, and arranging for foreach to call it as soon as each result is returned. To do that you need to:

  • Use a backend that supports calling combine on-the-fly, like doMPI or doRedis
  • Don't enable .multicombine
  • Set .inorder to FALSE
  • Set .init to something (like NULL)

Here's an example that does that:

library(foreach)
parfun <- function(errval, n) {
  abortable <- function(errfun) {
    comb <- function(x, y) {
      if (inherits(y, 'error')) {
        warning('This will leave your parallel backend in an inconsistent state')
        errfun(y)
      }
      c(x, y)
    }
    foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
            .combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
      if (i == errval)
        stop('testing abort')
      Sys.sleep(10)
      i
    }
  }
  callCC(abortable)
}

Note that I also set the error handling to "pass" so foreach will call the combine function with an error object. The callCC function is used to return from the foreach loop regardless of the error handling used within foreach and the backend. In this case, callCC will call the abortable function, passing it a function object that is used force callCC to immediately return. By calling that function from the combine function we can escape from the foreach loop when we detect an error object, and have callCC return that object. See ?callCC for more information.

You can actually use parfun without a parallel backend registered and verify that the foreach loop "breaks" as soon as it executes a task that throws an error, but that could take awhile since the tasks are executed sequentially. For example, this takes 20 seconds to execute if no backend is registered:

print(system.time(parfun(3, 4)))

When executing parfun in parallel, we need to do more than simply break out of the foreach loop: we also need to stop the workers, otherwise they will continue to compute their assigned tasks. With doMPI, the workers can be stopped using mpi.abort:

library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) {
  cat(sprintf('Caught error: %s\n', conditionMessage(r)))
  mpi.abort(cl$comm)
}

Note that the cluster object can't be used after the loop aborts, because things weren't properly cleaned up, which is why the normal "stop" error handling doesn't work this way.