I am using the R package foreach()
with %dopar%
to do long (~days) calculations in parallel. I would like the ability to stop the entire set of calculations in the event that one of them produces an error. However, I have not found a way to achieve this, and from the documentation and various forums I have found no indication that this is possible. In particular, break()
does not work and stop()
only stops the current calculation, not the whole foreach
loop.
Note that I cannot use a simple for loop, because ultimately I want to parallelize this using the doRNG package.
Here is a simplified, reproducible version of what I am attempting (shown here in serial with %do%
, but I have the same problem when using doRNG
and %dopar%
). Note that in reality I want to run all of the elements of this loop (here 10) in parallel.
library(foreach)
myfunc <- function() {
x <- foreach(k = 1:10, .combine="cbind", .errorhandling="stop") %do% {
cat("Element ", k, "\n")
Sys.sleep(0.5) # just to show that stop does not cause exit from foreach
if(is.element(k, 2:6)) {
cat("Should stop\n")
stop("Has stopped")
}
k
}
return(x)
}
x <- myfunc()
# stop() halts the processing of k=2:6, but it does not stop the foreach loop itself.
# x is not returned. The execution produces the error message
# Error in { : task 2 failed - "Has stopped"
What I would like to achieve is that the entire foreach loop can be exited immediately upon some condition (here, when the stop()
is encountered).
I have found no way to achieve this with foreach
. It seems that I would need a way to send a message to all the other processes to make them stop too.
If not possible with foreach
, does anyone know of alternatives? I have also tried to achieve this with parallel::mclapply
, but that does not work either.
> sessionInfo()
R version 3.0.0 (2013-04-03)
Platform: x86_64-apple-darwin10.8.0 (64-bit)
locale:
[1] C/UTF-8/C/C/C/C
attached base packages:
[1] stats graphics grDevices utils datasets methods base
other attached packages:
[1] foreach_1.4.0
loaded via a namespace (and not attached):
[1] codetools_0.2-8 compiler_3.0.0 iterators_1.0.6
It sounds like you want an impatient version of the "stop" error handling. You could implement that by writing a custom combine function, and arranging for foreach
to call it as soon as each result is returned. To do that you need to:
combine
on-the-fly, like doMPI
or doRedis
.multicombine
.inorder
to FALSE
.init
to something (like NULL
)Here's an example that does that:
library(foreach)
parfun <- function(errval, n) {
abortable <- function(errfun) {
comb <- function(x, y) {
if (inherits(y, 'error')) {
warning('This will leave your parallel backend in an inconsistent state')
errfun(y)
}
c(x, y)
}
foreach(i=seq_len(n), .errorhandling='pass', .export='errval',
.combine='comb', .inorder=FALSE, .init=NULL) %dopar% {
if (i == errval)
stop('testing abort')
Sys.sleep(10)
i
}
}
callCC(abortable)
}
Note that I also set the error handling to "pass" so foreach
will call the combine function with an error object. The callCC
function is used to return from the foreach
loop regardless of the error handling used within foreach
and the backend. In this case, callCC
will call the abortable
function, passing it a function object that is used force callCC
to immediately return. By calling that function from the combine function we can escape from the foreach
loop when we detect an error object, and have callCC
return that object. See ?callCC
for more information.
You can actually use parfun
without a parallel backend registered and verify that the foreach
loop "breaks" as soon as it executes a task that throws an error, but that could take awhile since the tasks are executed sequentially. For example, this takes 20 seconds to execute if no backend is registered:
print(system.time(parfun(3, 4)))
When executing parfun
in parallel, we need to do more than simply break out of the foreach
loop: we also need to stop the workers, otherwise they will continue to compute their assigned tasks. With doMPI
, the workers can be stopped using mpi.abort
:
library(doMPI)
cl <- startMPIcluster()
registerDoMPI(cl)
r <- parfun(getDoParWorkers(), getDoParWorkers())
if (inherits(r, 'error')) {
cat(sprintf('Caught error: %s\n', conditionMessage(r)))
mpi.abort(cl$comm)
}
Note that the cluster object can't be used after the loop aborts, because things weren't properly cleaned up, which is why the normal "stop" error handling doesn't work this way.