The spark-streaming website at https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams mentions the following code:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
I have tried to implement this using org.apache.commons.pool2 but running the application fails with the expected java.io.NotSerializableException:
15/05/26 08:06:21 ERROR OneForOneStrategy: org.apache.commons.pool2.impl.GenericObjectPool
java.io.NotSerializableException: org.apache.commons.pool2.impl.GenericObjectPool
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
...
I am wondering how realistic it is to implement a connection pool that is serializable. Has anyone succeeded in doing this ?
Thank you.
To address this "local resource" problem what's needed is a singleton object - i.e. an object that's warranted to be instantiated once and only once in the JVM. Luckily, Scala object
provides this functionality out of the box.
The second thing to consider is that this singleton will provide a service to all tasks running on the same JVM where it's hosted, so, it MUST take care of concurrency and resource management.
Let's try to sketch(*) such service:
class ManagedSocket(private val pool: ObjectPool, val socket:Socket) {
def release() = pool.returnObject(socket)
}
// singleton object
object SocketPool {
var hostPortPool:Map[(String, Int),ObjectPool] = Map()
sys.addShutdownHook{
hostPortPool.values.foreach{ // terminate each pool }
}
// factory method
def apply(host:String, port:String): ManagedSocket = {
val pool = hostPortPool.getOrElse{(host,port), {
val p = ??? // create new pool for (host, port)
hostPortPool += (host,port) -> p
p
}
new ManagedSocket(pool, pool.borrowObject)
}
}
Then usage becomes:
val host = ???
val port = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val mSocket = SocketPool(host, port)
partition.foreach{elem =>
val os = mSocket.socket.getOutputStream()
// do stuff with os + elem
}
mSocket.release()
}
}
I'm assuming that the GenericObjectPool
used in the question is taking care of concurrency. Otherwise, access to each pool
instance need to be guarded with some form of synchronization.
(*) code provided to illustrate the idea on how to design such object - needs additional effort to be converted into a working version.