Accessing the underlying ActorRef of an akka stream Source created by Source.actorRef

Ramón J Romero y Vigil picture Ramón J Romero y Vigil · Jun 11, 2015 · Viewed 7.9k times · Source

I'm trying to use the Source.actorRef method to create an akka.stream.scaladsl.Source object. Something of the form

import akka.stream.OverflowStrategy.fail
import akka.stream.scaladsl.Source

case class Weather(zip : String, temp : Double, raining : Boolean)

val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

val sunnySource = weatherSource.filter(!_.raining)
...

My question is: how do I send data to my ActorRef based Source object?

I assumed sending messages to the Source was something of the form

//does not compile
weatherSource ! Weather("90210", 72.0, false)
weatherSource ! Weather("02139", 32.0, true)

But weatherSource doesn't have a ! operator or tell method.

The documentation isn't too descriptive on how to use Source.actorRef, it just says you can...

Thank you in advance for your review and response.

Answer

Noah picture Noah · Jun 11, 2015

You need a Flow:

  import akka.stream.OverflowStrategy.fail
  import akka.stream.scaladsl.Source
  import akka.stream.scaladsl.{Sink, Flow}

  case class Weather(zip : String, temp : Double, raining : Boolean)

  val weatherSource = Source.actorRef[Weather](Int.MaxValue, fail)

  val sunnySource = weatherSource.filter(!_.raining)

  val ref = Flow[Weather]
    .to(Sink.ignore)
    .runWith(sunnySource)

  ref ! Weather("02139", 32.0, true)

Remember this is all experimental and may change!