I need to consume a REST service using Akka's HTTP client (v2.0.2). The logical approach is to do this via a host connection pool because we expect large numbers of simultaneous connections. The Flow
for this consumes a (HttpRequest, T)
and returns a (Try[HttpResponse, T)
. The documentation indicates that some arbitrary type T
is needed to manage potential out of order responses to requests but does not point out what the caller is supposed to do with the returned T
.
My first attempt is the function below using an Int
as T
. It is called from many places to ensure that the connections use a single pool.
val pool = Http().cachedHostConnectionPool[Int]("127.0.0.1", 8888, ConnectionPoolSettings(system))
def pooledRequest(req: HttpRequest): Future[HttpResponse] = {
val unique = Random.nextInt
Source.single(req → unique).via(pool).runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), `unique`) ⇒ Future.successful(r)
case (Failure(f), `unique`) ⇒ Future.failed(f)
case (_, i) ⇒ Future.failed(new Exception("Return does not match the request"))
}
}
The question is how should the client use this T
? Is there a cleaner more efficient solution? And finally, Is my paranoia that something may arrive out of order not actually paranoia?
I was a little confused by this myself initially until I read through the docs a few times. If you are going to use single requests into the pool, no matter how many different places are sharing that same pool, the T
that you are supplying (an Int
in your case) doesn't matter. So if you are using Source.single
all the time, that key can always be 1
if you really want.
Where it does come into play though, is if a piece of code is going to use the pool and submit multiple requests at once into the pool and wants the responses from all of those requests. The reason why is that the responses come back in the order they were received from the service that was called, and not the order in which they were supplied to the pool. Each request could take different amounts of time, so they flow downstream to the Sink
in the order they were received back from the pool.
Say we had a service out there that accepted GET
requests with a url in the form:
/product/123
Where the 123
part is the id of the product that you wanted to look up. If I wanted to look up products 1-10
all at once, with separate request for each, this is where the identifier becomes important so that I can correlate each HttpResponse
with the product id that it is for. A simplified code example for this scenario would be as follows:
val requests = for(id <- 1 until 10) yield (HttpRequest(HttpMethods.GET, s"/product/$id"), id)
val responsesMapFut:Future[Map[Int,HttpResponse]] =
Source(requests).
via(pool).
runFold(Map.empty[Int,HttpResponse]){
case (m, (util.Success(resp), id)) =>
m ++ Map(id -> resp)
case (m, (util.Failure(ex), i)) =>
//Log a failure here probably
m
}
When I get my responses in the fold
, I also conveniently have the id that each is associated with so I can add them to my Map
that is keyed by id. Without this functionality, I would probably have to do something like parse the body (if it was json) to try and figure out which response was which and that is not ideal, and that doesn't cover the fail case. In this solution, I know which requests failed because I still get the identifier back.
I hope that clarifies things a bit for you.