Spark structured streaming kafka convert JSON without schema (infer schema)

Arnon Rodman picture Arnon Rodman · Jan 20, 2018 · Viewed 14k times · Source

I read Spark Structured Streaming doesn't support schema inference for reading Kafka messages as JSON. Is there a way to retrieve schema the same as Spark Streaming does:

val dataFrame = spark.read.json(rdd.map(_.value()))
dataFrame.printschema 

Answer

D.M. picture D.M. · Aug 10, 2018

Here is one possible way to do this:

  1. Before you start streaming, get a small batch of the data from Kafka

  2. Infer the schema from the small batch

  3. Start streaming the data using the extracted schema.

The pseudo-code below illustrates this approach.

Step 1:

Extract a small (two records) batch from Kafka,

val smallBatch = spark.read.format("kafka")
                           .option("kafka.bootstrap.servers", "node:9092")
                           .option("subscribe", "topicName")
                           .option("startingOffsets", "earliest")
                           .option("endingOffsets", """{"topicName":{"0":2}}""")
                           .load()
                           .selectExpr("CAST(value AS STRING) as STRING").as[String].toDF()

Step 2: Write the small batch to a file:

smallBatch.write.mode("overwrite").format("text").save("/batch")

This command writes the small batch into hdfs directory /batch. The name of the file that it creates is part-xyz*. So you first need to rename the file using hadoop FileSystem commands (see org.apache.hadoop.fs._ and org.apache.hadoop.conf.Configuration, here's an example https://stackoverflow.com/a/41990859) and then read the file as json:

val smallBatchSchema = spark.read.json("/batch/batchName.txt").schema

Here, batchName.txt is the new name of the file and smallBatchSchema contains the schema inferred from the small batch.

Finally, you can stream the data as follows (Step 3):

val inputDf = spark.readStream.format("kafka")
                             .option("kafka.bootstrap.servers", "node:9092")
                             .option("subscribe", "topicName")
                             .option("startingOffsets", "earliest")
                             .load()

val dataDf = inputDf.selectExpr("CAST(value AS STRING) as json")
                    .select( from_json($"json", schema=smallBatchSchema).as("data"))
                    .select("data.*")

Hope this helps!