I have a problem when i use spark streaming to read from Cassandra.
As the link above, i use
val rdd = ssc.cassandraTable("streaming_test", "key_value").select("key", "value").where("fu = ?", 3)
to select the data from cassandra, but it seems that the spark streaming has just one query once but i want it continues to query using an interval 10 senconds.
My code is as follow, wish for your response.
Thanks!
import org.apache.spark._
import org.apache.spark.streaming._
import com.datastax.spark.connector.streaming._
import org.apache.spark.rdd._
import scala.collection.mutable.Queue
object SimpleApp {
def main(args: Array[String]){
val conf = new SparkConf().setAppName("scala_streaming_test").set("spark.cassandra.connection.host", "127.0.0.1")
val ssc = new StreamingContext(conf, Seconds(10))
val rdd = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
//rdd.collect().foreach(println)
val rddQueue = new Queue[RDD[com.datastax.spark.connector.CassandraRow]]()
val dstream = ssc.queueStream(rddQueue)
dstream.print()
ssc.start()
rdd.collect().foreach(println)
rddQueue += rdd
ssc.awaitTermination()
}
}
You can create a ConstantInputDStream with the CassandraRDD as input. ConstantInputDStream will provide the same RDD on each streaming interval, and by executing an action on that RDD you will trigger a materialization of the RDD lineage, leading to executing the query on Cassandra every time.
Make sure that the data being queried does not grow unbounded to avoid increasing query times and resulting in an unstable streaming process.
Something like this should do the trick (using your code as starting point):
import org.apache.spark.streaming.dstream.ConstantInputDStream
val ssc = new StreamingContext(conf, Seconds(10))
val cassandraRDD = ssc.cassandraTable("mykeyspace", "users").select("fname", "lname").where("lname = ?", "yu")
val dstream = new ConstantInputDStream(ssc, cassandraRDD)
dstream.foreachRDD{ rdd =>
// any action will trigger the underlying cassandra query, using collect to have a simple output
println(rdd.collect.mkString("\n"))
}
ssc.start()
ssc.awaitTermination()