Get whole HttpResponse body as a String with Akka-Streams HTTP

paradigmatic picture paradigmatic · Jul 21, 2015 · Viewed 9.8k times · Source

I'm trying to understand how to use the new akka.http library. I would like to send an http request to a server and read the whole response body as a single String in order to produce a Source[String,?].

Here is the best solution I was able to produce so far:

 def get(
   modelID: String,
   pool: Flow[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]
 ): Source[String,Unit] = {
   val uri = reactionsURL(modelID)
   val req = HttpRequest(uri = uri)
   Source.single( (req,0) )
     .via( pool )
     .map { 
       case (Success(resp),_) =>
         resp.entity.dataBytes.map( _.decodeString("utf-8") )
     }.flatten(FlattenStrategy.concat)
     .grouped( 1024 )
     .map( _.mkString )

It seems to work well (except the missing error path), but it is a bit clunky for such simple tasks. Is there a smarter solution ? Can I avoid the grouped/mkString ?

Answer

Zernike picture Zernike · Jul 21, 2015

You can use toStrict method of HttpResponse with timeout. It gathers whole answer as Future.

def toStrict(timeout: FiniteDuration)(implicit ec: ExecutionContext, fm: Materializer): Future[Strict] Returns a sharable and serializable

copy of this message with a strict entity.

Example:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpResponse, HttpRequest}
import akka.stream.{Materializer, ActorMaterializer}
import akka.stream.scaladsl.{Sink, Flow, Source}
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

import scala.util.{Try, Success}

object Main extends App {

  implicit val system = ActorSystem()

  import system.dispatcher

  implicit val materializer = ActorMaterializer()

  val host = "127.0.0.1"
  lazy val pool = Http().newHostConnectionPool[Int](host, 9000)

  FlowBuilder.get("/path", pool).to(Sink.foreach(_.foreach(println))).run()

}

object FlowBuilder {
  def get(modelID: String, pool: Flow[(HttpRequest, Int), (Try[HttpResponse], Int), Http.HostConnectionPool])
         (implicit ec: ExecutionContext, mat: Materializer): Source[Future[String], Unit] = {
    val uri = modelID
    val req = HttpRequest(uri = modelID)
    Source.single((req, 0)).via(pool)
      .map { 
      case (Success(resp), _) => resp.entity.toStrict(5 seconds).map(_.data.decodeString("UTF-8")) 
    }
  }
}