I'm trying to open a Kafka (tried versions 0.11.0.2 and 1.0.1) stream using createDirectStream
method and getting this AbstractMethodError error:
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streaming.kafka010.KafkaUtils$.initializeLogIfNecessary(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streaming.kafka010.KafkaUtils$.log(KafkaUtils.scala:39)
at org.apache.spark.internal.Logging$class.logWarning(Logging.scala:66)
at org.apache.spark.streaming.kafka010.KafkaUtils$.logWarning(KafkaUtils.scala:39)
at org.apache.spark.streaming.kafka010.KafkaUtils$.fixKafkaParams(KafkaUtils.scala:201)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.<init>(DirectKafkaInputDStream.scala:63)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:147)
at org.apache.spark.streaming.kafka010.KafkaUtils$.createDirectStream(KafkaUtils.scala:124)
This is how I'm calling it:
val preferredHosts = LocationStrategies.PreferConsistent
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[IntegerDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest"
)
val aCreatedStream = createDirectStream[String, String](ssc, preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
I have Kafka running on 9092 and I'm able to create producers and consumers and pass messages between them so not sure why it's not working from Scala code. Any ideas appreciated.
Turns out I was using Spark 2.3 and I should've been using Spark 2.2. Apparently that method was made abstract in the later version so I was getting that error.