Reading a CSV files using Akka Streams

M.K. picture M.K. · Oct 24, 2016 · Viewed 11.3k times · Source

I'm reading a csv file. I am using Akka Streams to do this so that I can create a graph of actions to perform on each line. I've got the following toy example up and running.

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("MyAkkaSystem")
    implicit val materializer = ActorMaterializer()

        val source = akka.stream.scaladsl.Source.fromIterator(Source.fromFile("a.csv").getLines)
        val sink = Sink.foreach(println)
        source.runWith(sink)
      }

The two Source types don't sit easy with me. Is this idiomatic or is there is a better way to write this?

Answer

fcat picture fcat · Oct 28, 2016

Actually, akka-streams provides a function to directly read from a file.

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .runForeach(println)

Here, runForeach method is to print the lines. If you have a proper Sink to process these lines, use it instead of this function. For example, if you want to split the lines by ' and print the total number of words in it:

val sink: Sink[String] = Sink.foreach(x => println(x.split(",").size))

FileIO.fromPath(Paths.get("a.csv"))
      .via(Framing.delimiter(ByteString("\n"), 256, true).map(_.utf8String))
      .to(sink)
      .run()