I have a spark 2.0 application that reads messages from kafka using spark streaming (with spark-streaming-kafka-0-10_2.11).
Structured streaming looks really cool so I wanted to try and migrate the code but I can't figure out how to use it.
in the regular streaming I used kafkaUtils to createDstrean and in the parameters I passed it was the value deserializer.
in the Structured streaming the doc says that I should deserialize using DataFrame functions but I can't figure exactly what that means.
I looked at examples such as this example but my Avro object in Kafka is quit complex and cannot be simply casted like the String in the example..
So far I tried this kind of code (which I saw here in a different question):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
and I get "data type mismatch: cannot cast BinaryType to StructType(StructField(...."
how can I deserialize the value?
As noted above, as of Spark 2.1.0 there is support for avro with the batch reader but not with SparkSession.readStream(). Here is how I got it to work in Scala based on the other responses. I've simplified the schema for brevity.
package com.sevone.sparkscala.mypackage
import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
object MyMain {
// Create avro schema and reader
case class KafkaMessage (
deviceId: Int,
deviceName: String
)
val schemaString = """{
"fields": [
{ "name": "deviceId", "type": "int"},
{ "name": "deviceName", "type": "string"},
],
"name": "kafkamsg",
"type": "record"
}""""
val messageSchema = new Schema.Parser().parse(schemaString)
val reader = new GenericDatumReader[GenericRecord](messageSchema)
// Factory to deserialize binary avro data
val avroDecoderFactory = DecoderFactory.get()
// Register implicit encoder for map operation
implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]
def main(args: Array[String]) {
val KafkaBroker = args(0);
val InTopic = args(1);
val OutTopic = args(2);
// Get Spark session
val session = SparkSession
.builder
.master("local[*]")
.appName("myapp")
.getOrCreate()
// Load streaming data
import session.implicits._
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.map(d => {
val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
val deviceId = rec.get("deviceId").asInstanceOf[Int]
val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
new KafkaMessage(deviceId, deviceName)
})