Calling a rest service from Spark

Yash picture Yash · Jul 18, 2018 · Viewed 8.1k times · Source

I'm trying to figure out the best approach to call a Rest endpoint from Spark.

My current approach (solution [1]) looks something like this -

val df = ... // some dataframe

val repartitionedDf = df.repartition(numberPartitions)

lazy val restEndPoint = new restEndPointCaller() // lazy evaluation of the object which creates the connection to REST. lazy vals are also initialized once per JVM (executor)

val enrichedDf = repartitionedDf 
.map(rec => restEndPoint.getResponse(rec)) // calls the rest endpoint for every record
.toDF

I know I could have used .mapPartitions() instead of .map(), but looking at the DAG, it looks like spark optimizes the repartition -> map to a mapPartition anyway.

In this second approach (solution [2]), a connection is created once for every partition and reused for all records within the partition.

  val newDs = myDs.mapPartitions(partition => {
  val restEndPoint = new restEndPointCaller /*creates a db connection per partition*/

  val newPartition = partition.map(record => {
    restEndPoint.getResponse(record, connection)
  }).toList // consumes the iterator, thus calls readMatchingFromDB 

  restEndPoint.close() // close dbconnection here
  newPartition.iterator // create a new iterator
})

In this third approach (solution [3]), a connection is created once per JVM (executor) reused across all partitions processed by the executor.

    lazy val connection = new DbConnection /*creates a db connection per partition*/
    val newDs = myDs.mapPartitions(partition => {

          val newPartition = partition.map(record => {
            readMatchingFromDB(record, connection)
          }).toList // consumes the iterator, thus calls readMatchingFromDB 
        
          newPartition.iterator // create a new iterator

        })
    connection.close() // close dbconnection here

[a] With Solutions [1] and [3] which are very similar, is my understanding of how lazy val work correct? The intention is to restrict the number of connections to 1 per executor/ JVM and reuse the open connections for processing subsequent requests. Will I be creating 1 connection per JVM or 1 connection per partition?

[b] Are there any other ways by which I can control the number of requests (RPS) we make to the rest endpoint ?

[c] Please let me know if there are better and more efficient ways to do this.

Thanks!

Answer

Bartosz Konieczny picture Bartosz Konieczny · Jul 18, 2018

IMO the second solution with mapPartitions is better. First, you explicitly tells what you're expecting to achieve. The name of the transformation and the implemented logic tell it pretty clearly. For the first option you need to be aware of the how Apache Spark optimizes the processing. And it's maybe obvious to you just now but you should also think about the people who will work on your code or simply about you in 6 months, 1 year, 2 years and so fort. And they should understand better the mapPartitions than repartition + map.

Moreover maybe the optimization for repartition with map will change internally (I don't believe in it but you can still consider is as a valid point) and at this moment your job will perform worse.

Finally, with the 2nd solution you avoid a lot of problems that you can encounter with the serialization. In the code you wrote the driver will create one instance of the endpoint object, serialize it and send to the executors. So yes, maybe it'll be a single instance but only if it's serializable.


[edit] Thanks for clarification. You can achieve what are you looking for in different manners. To have exactly 1 connection per JVM you can use a design pattern called singleton. In Scala it's expressed pretty easily as an object (the first link I found on Google https://alvinalexander.com/scala/how-to-implement-singleton-pattern-in-scala-with-object)

And that it's pretty good because you don't need to serialize anything. The singletons are read directly from the classpath on the executor side. With it you're sure to have exactly one instance of given object.

[a] With Solutions [1] and [3] which are very similar, is my understanding of how lazy val work correct? The intention is to restrict the number of connections to 1 per executor/ JVM and reuse the open connections for processing subsequent requests. Will I be creating 1 connection per JVM or 1 connection per partition? It'll create 1 connection per partition. You can execute this small test to see that:

  class SerializationProblemsTest extends FlatSpec   {
    val conf = new SparkConf().setAppName("Spark serialization problems test").setMaster("local") 
    val sparkContext = SparkContext.getOrCreate(conf)
    "lazy object" should "be created once per partition" in {
      lazy val restEndpoint = new NotSerializableRest()
      sparkContext.parallelize(0 to 120).repartition(12)
        .mapPartitions(numbers => {
           //val restEndpoint = new NotSerializableRest()
           numbers.map(nr => restEndpoint.enrich(nr))
      })
      .collect()
   }
 }  
 class NotSerializableRest() {
   println("Creating REST instance")
   def enrich(id: Int): String = s"${id}"
}

It should print Creating REST instance 12 times (# of partitions)

[b] Are there ways by which I can control the number of requests (RPS) we make to the rest endpoint ?

To control the number of requests you can use an approach similar to database connection pools: HTTP connection pool (one quickly found link: HTTP connection pooling using HttpClient).

But maybe another valid approach would be the processing of smaller subsets of data ? So instead of taking 30000 rows to process, you can split it into different smaller micro-batches (if it's a streaming job). It should give your web service a little bit more "rest".

Otherwise you can also try to send bulk requests (Elasticsearch does it to index/delete multiple documents at once https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html). But it's up to the web service to allow you to do so.