Scala waiting for sequence of futures

matanster picture matanster · Mar 30, 2015 · Viewed 26.3k times · Source

I was hoping code like follows would wait for both futures, but it does not.

object Fiddle {
  val f1 = Future {
    throw new Throwable("baaa") // emulating a future that bumped into an exception
  }

  val f2 = Future {
    Thread.sleep(3000L) // emulating a future that takes a bit longer to complete
    2
  }

  val lf = List(f1, f2) // in the general case, this would be a dynamically sized list

  val seq = Future.sequence(lf) 

  seq.onComplete {
    _ => lf.foreach(f => println(f.isCompleted))
  }
}

val a = FuturesSequence

I assumed seq.onComplete would wait for them all to complete before completing itself, but not so; it results in:

true
false

.sequence was a bit hard to follow in the source of scala.concurrent.Future, I wonder how I would implement a parallel that waits for all original futures of a (dynamically sized) sequence, or what might be the problem here.

Edit: A related question: https://worldbuilding.stackexchange.com/questions/12348/how-do-you-prove-youre-from-the-future :)

Answer

Travis Brown picture Travis Brown · Mar 30, 2015

One common approach to waiting for all results (failed or not) is to "lift" failures into a new representation inside the future, so that all futures complete with some result (although they may complete with a result that represents failure). One natural way to get that is lifting to a Try.

Twitter's implementation of futures provides a liftToTry method that makes this trivial, but you can do something similar with the standard library's implementation:

import scala.util.{ Failure, Success, Try }

val lifted: List[Future[Try[Int]]] = List(f1, f2).map(
  _.map(Success(_)).recover { case t => Failure(t) }
)

Now Future.sequence(lifted) will be completed when every future is completed, and will represent successes and failures using Try.

And so, a generic solution for waiting on all original futures of a sequence of futures may look as follows, assuming an execution context is of course implicitly available.

  import scala.util.{ Failure, Success, Try }

  private def lift[T](futures: Seq[Future[T]]) = 
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]]) =
    Future.sequence(lift(futures)) // having neutralized exception completions through the lifting, .sequence can now be used

  waitAll(SeqOfFutures).map { 
    // do whatever with the completed futures
  }