Proper way to stop Akka Streams on condition

jarandaf picture jarandaf · Jul 12, 2016 · Viewed 8.1k times · Source

I have been successfully using FileIO to stream the contents of a file, compute some transformations for each line and aggregate/reduce the results.

Now I have a pretty specific use case, where I would like to stop the stream when a condition is reached, so that it is not necessary to read the whole file but the process finishes as soon as possible. What is the recommended way to achieve this?

Answer

Konrad 'ktoso' Malawski picture Konrad 'ktoso' Malawski · Jul 12, 2016

If the stop condition is "on the outside of the stream"

There is a advanced building-block called KillSwitch that you could use to do this: http://doc.akka.io/japi/akka/2.4.7/akka/stream/KillSwitches.html The stream would get shut down once the kill switch is notified.

It has methods like abort(reason) / shutdown etc, see here for it's API: http://doc.akka.io/japi/akka/2.4.7/akka/stream/SharedKillSwitch.html

Reference documentation is here: http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html#kill-switch-scala

Example usage would be:

val countingSrc = Source(Stream.from(1)).delay(1.second,
    DelayOverflowStrategy.backpressure)
val lastSnk = Sink.last[Int]

val (killSwitch, last) = countingSrc
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(lastSnk)(Keep.both)
  .run()

doSomethingElse()

killSwitch.shutdown()

Await.result(last, 1.second) shouldBe 2

If the stop condition is inside the stream

You can use takeWhile to express any condition really, though sometimes take or limit may be also enough "take 10 lnes".

If your logic is very advanced, you could build a special stage that handles that special logic using statefulMapConcat that allows to express literally anything - so you could complete the stream whenever you want to "from the inside".