This error has been the hardest to trace. I am not sure what is going on. I am running a Spark cluster on my location machine. so the entire spark cluster is under one host which is 127.0.0.1
and I run on a standalone mode
JavaPairRDD<byte[], Iterable<CassandraRow>> cassandraRowsRDD= javaFunctions(sc).cassandraTable("test", "hello" )
.select("rowkey", "col1", "col2", "col3", )
.spanBy(new Function<CassandraRow, byte[]>() {
@Override
public byte[] call(CassandraRow v1) {
return v1.getBytes("rowkey").array();
}
}, byte[].class);
Iterable<Tuple2<byte[], Iterable<CassandraRow>>> listOftuples = cassandraRowsRDD.collect(); //ERROR HAPPENS HERE
Tuple2<byte[], Iterable<CassandraRow>> tuple = listOftuples.iterator().next();
byte[] partitionKey = tuple._1();
for(CassandraRow cassandraRow: tuple._2()) {
System.out.println("************START************");
System.out.println(new String(partitionKey));
System.out.println("************END************");
}
This error has been the hardest to trace. It clearly happens at cassandraRowsRDD.collect()
and I dont know why?
16/10/09 23:36:21 ERROR Executor: Exception in task 2.3 in stage 0.0 (TID 21)
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Here are the versions I use
Scala code runner version 2.11.8 // when I run scala -version or even ./spark-shell
compile group: 'org.apache.spark' name: 'spark-core_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-streaming_2.11' version: '2.0.0'
compile group: 'org.apache.spark' name: 'spark-sql_2.11' version: '2.0.0'
compile group: 'com.datastax.spark' name: 'spark-cassandra-connector_2.11' version: '2.0.0-M3':
my gradle file looks like this after introducing something called "provided" which actually doesn't seem to exist but google said to create one so my build.gradle looks like this
group 'com.company'
version '1.0-SNAPSHOT'
apply plugin: 'java'
apply plugin: 'idea'
repositories {
mavenCentral()
mavenLocal()
}
configurations {
provided
}
sourceSets {
main {
compileClasspath += configurations.provided
test.compileClasspath += configurations.provided
test.runtimeClasspath += configurations.provided
}
}
idea {
module {
scopes.PROVIDED.plus += [ configurations.provided ]
}
}
dependencies {
compile 'org.slf4j:slf4j-log4j12:1.7.12'
provided group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.0'
provided group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.0.0'
provided group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.0.0'
provided group: 'com.datastax.spark', name: 'spark-cassandra-connector_2.11', version: '2.0.0-M3'
}
jar {
from { configurations.provided.collect { it.isDirectory() ? it : zipTree(it) } }
// with jar
from sourceSets.test.output
manifest {
attributes 'Main-Class': "com.company.batchprocessing.Hello"
}
exclude 'META-INF/.RSA', 'META-INF/.SF', 'META-INF/*.DSA'
zip64 true
}
I had the same issue and could resolve it by adding my application's jar to spark's classpath with
spark = SparkSession.builder()
.appName("Foo")
.config("spark.jars", "target/scala-2.11/foo_2.11-0.1.jar")