Spark throws java.util.NoSuchElementException: key not found: 67

Mnemosyne picture Mnemosyne · Oct 28, 2016 · Viewed 16.4k times · Source

Running the Spark bisecting kmmeans algorithm in Zeppelin.

//I transform my data using the TF-IDF algorithm 

val idf = new IDF(minFreq).fit(data)
val hashIDF_features = idf.transform(dbTF)    

//and parse the transformed data to the clustering algorithm.

val bkm = new BisectingKMeans().setK(100).setMaxIterations(2)
val model = bkm.run(hashIDF_features)
val cluster_rdd = model.predict(hashIDF_features)

I always get this error though:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 270.0 failed 4 times, most recent failure: Lost task 0.3 in stage 270.0 (TID 126885, IP): java.util.NoSuchElementException: key not found: 67
    at scala.collection.MapLike$class.default(MapLike.scala:228)
    at scala.collection.AbstractMap.default(Map.scala:58)
    at scala.collection.MapLike$class.apply(MapLike.scala:141)
    at scala.collection.AbstractMap.apply(Map.scala:58)
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply$mcDJ$sp(BisectingKMeans.scala:338)
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337)
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1$$anonfun$2.apply(BisectingKMeans.scala:337)
    at scala.collection.TraversableOnce$$anonfun$minBy$1.apply(TraversableOnce.scala:231)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at scala.collection.LinearSeqOptimized$class.reduceLeft(LinearSeqOptimized.scala:125)
    at scala.collection.immutable.List.reduceLeft(List.scala:84)
    at scala.collection.TraversableOnce$class.minBy(TraversableOnce.scala:231)
    at scala.collection.AbstractTraversable.minBy(Traversable.scala:105)
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:337)
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$org$apache$spark$mllib$clustering$BisectingKMeans$$updateAssignments$1.apply(BisectingKMeans.scala:334)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:189)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:64)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1433)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:801)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:801)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1642)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1601)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1590)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:622)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1882)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1953)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:934)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:933)
    at org.apache.spark.mllib.clustering.BisectingKMeans$.org$apache$spark$mllib$clustering$BisectingKMeans$$summarize(BisectingKMeans.scala:261)
    at org.apache.spark.mllib.clustering.BisectingKMeans$$anonfun$run$1.apply$mcVI$sp(BisectingKMeans.scala:194)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.mllib.clustering.BisectingKMeans.run(BisectingKMeans.scala:189)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:95)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:97)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:99)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$$$93297bcd59dca476dd569cf51abed168$$$$$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:101)

Im using Spark 1.6.1. Interestingly, if I run this algorithm on a standalone application, it hits no errors, but I get this in Zeppelin. In addition to that, the input has been calculated by an external algorithm so I dont think it is a formatting problem. Any ideas?

Edit:
I tested the system again by using smaller amounts of clusters, and the error does not happen. Why would the algorithm break for large cluster values?

Answer

Balaji Reddy picture Balaji Reddy · Oct 28, 2016

I believe the issue is because of closure. When you run your application locally, everything might be running in same memory/process.So make sure you are not trying to access local variable from a clousre which might be running in some other memory/process. This will be helpful to resolve your issue.