I have a class that implements a custom Kryo serializer by implementing the read()
and write()
methods from com.esotericsoftware.kryo.Serializer
(see example below). How can I register this custom serializer with Spark?
Here is a pseudo-code example of what I have:
class A()
CustomASerializer extends com.esotericsoftware.kryo.Serializer[A]{
override def write(kryo: Kryo, output: Output, a: A): Unit = ???
override def read(kryo: Kryo, input: Input, t: Class[A]): A = ???
}
val kryo: Kryo = ...
kryo.register(classOf[A], new CustomASerializer()); // I can register my serializer
Now in Spark:
val sparkConf = new SparkConf()
sparkConf.registerKryoClasses(Array(classOf[A]))
Unfortunately, Spark doesn't give me the option to register my custom serializer. Any idea if there is a way to do this?
Create your own KryoRegistrator
with this custom serializer registered:
package com.acme
class MyRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[A], new CustomASerializer())
}
}
Then, set spark.kryo.registrator
to your registrator's fully-qualified name, e.g. com.acme.MyRegistrator
:
val conf = new SparkConf()
conf.set("spark.kryo.registrator", "com.acme.KryoRegistrator")