Queries with streaming sources must be executed with writeStream.start();

shivali picture shivali · Nov 15, 2016 · Viewed 25k times · Source

I'm trying to read the messages from kafka (version 10) in spark and trying to print it.

     import spark.implicits._

         val spark = SparkSession
              .builder
              .appName("StructuredNetworkWordCount")
              .config("spark.master", "local")
              .getOrCreate()  

            val ds1 = spark.readStream.format("kafka")
              .option("kafka.bootstrap.servers", "localhost:9092")  
              .option("subscribe", "topicA")
              .load()

           ds1.collect.foreach(println)
           ds1.writeStream
           .format("console")
           .start()

           ds1.printSchema()

getting an error Exception in thread "main"

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

Answer

ssice picture ssice · Jan 5, 2017

You are branching the query plan: from the same ds1 you are trying to:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

But you are only calling .start() on the second branch, leaving the other dangling without a termination, which in turn throws the exception you are getting back.

The solution is to start both branches and await termination.

val ds1 = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")  
  .option("subscribe", "topicA")  
  .load()
val query1 = ds1.collect.foreach(println)
  .writeStream
  .format("console")
  .start()
val query2 = ds1.writeStream
  .format("console")
  .start()

ds1.printSchema()
query1.awaitTermination()
query2.awaitTermination()