Can anyone please explain me difference between map and mapAsync w.r.t AKKA stream? In the documentation it is said that
Stream transformations and side effects involving external non-stream based services can be performed with mapAsync or mapAsyncUnordered
Why cant we simply us map here? I assume that Flow, Source, Sink all would be Monadic in nature and thus map should work fine w.r.t the Delay in the nature of these ?
Signature
The difference is best highlighted in the signatures: Flow.map
takes in a function that returns a type T
while Flow.mapAsync
takes in a function that returns a type Future[T]
.
Practical Example
As an example, suppose that we have a function which queries a database for a user's full name based on a user id:
type UserID = String
type FullName = String
val databaseLookup : UserID => FullName = ??? //implementation unimportant
Given an akka stream Source
of UserID
values we could use Flow.map
within a stream to query the database and print the full names to the console:
val userIDSource : Source[UserID, _] = ???
val stream =
userIDSource.via(Flow[UserID].map(databaseLookup))
.to(Sink.foreach[FullName](println))
.run()
One limitation of this approach is that this stream will only make 1 db query at a time. This serial querying will be a "bottleneck" and likely prevent maximum throughput in our stream.
We could try to improve performance through concurrent queries using a Future
:
def concurrentDBLookup(userID : UserID) : Future[FullName] =
Future { databaseLookup(userID) }
val concurrentStream =
userIDSource.via(Flow[UserID].map(concurrentDBLookup))
.to(Sink.foreach[Future[FullName]](_ foreach println))
.run()
The problem with this simplistic addendum is that we have effectively eliminated backpressure.
The Sink is just pulling in the Future and adding a foreach println
, which is relatively fast compared to database queries. The stream will continuously propagate demand to the Source and spawn off more Futures inside of the Flow.map
. Therefore, there is no limit to the number of databaseLookup
running concurrently. Unfettered parallel querying could eventually overload the database.
Flow.mapAsync
to the rescue; we can have concurrent db access while at the same time capping the number of simultaneous lookups:
val maxLookupCount = 10
val maxLookupConcurrentStream =
userIDSource.via(Flow[UserID].mapAsync(maxLookupCount)(concurrentDBLookup))
.to(Sink.foreach[FullName](println))
.run()
Also notice that the Sink.foreach
got simpler, it no longer takes in a Future[FullName]
but just a FullName
instead.
Unordered Async Map
If maintaining a sequential ordering of the UserIDs to FullNames is unnecessary then you can use Flow.mapAsyncUnordered
. For example: you just need to print all of the names to the console but didn't care about order they were printed.