spark-streaming and connection pool implementation

botkop picture botkop · May 26, 2015 · Viewed 9.7k times · Source

The spark-streaming website at https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams mentions the following code:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

I have tried to implement this using org.apache.commons.pool2 but running the application fails with the expected java.io.NotSerializableException:

15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
 ...

I am wondering how realistic it is to implement a connection pool that is serializable. Has anyone succeeded in doing this ?

Thank you.

Answer

maasg picture maasg · Jun 2, 2015

To address this "local resource" problem what's needed is a singleton object - i.e. an object that's warranted to be instantiated once and only once in the JVM. Luckily, Scala object provides this functionality out of the box.

The second thing to consider is that this singleton will provide a service to all tasks running on the same JVM where it's hosted, so, it MUST take care of concurrency and resource management.

Let's try to sketch(*) such service:

class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
   def release() = pool.returnObject(socket)
}

// singleton object 
object SocketPool {
    var hostPortPool:Map[(String, Int),ObjectPool] = Map()
    sys.addShutdownHook{
        hostPortPool.values.foreach{ // terminate each pool } 
    }

    // factory method
    def apply(host:String, port:String): ManagedSocket = {
        val pool = hostPortPool.getOrElse{(host,port), {
            val p = ??? // create new pool for (host, port)
            hostPortPool += (host,port) -> p
            p
        }
        new ManagedSocket(pool, pool.borrowObject)
    }
}

Then usage becomes:

val host = ???
val port = ???
stream.foreachRDD { rdd =>
    rdd.foreachPartition { partition => 
        val mSocket = SocketPool(host, port)
        partition.foreach{elem => 
            val os = mSocket.socket.getOutputStream()
            // do stuff with os + elem
        }
        mSocket.release()
    }
}

I'm assuming that the GenericObjectPool used in the question is taking care of concurrency. Otherwise, access to each pool instance need to be guarded with some form of synchronization.

(*) code provided to illustrate the idea on how to design such object - needs additional effort to be converted into a working version.