Spark java.lang.OutOfMemoryError : Java Heap space

user3245722 picture user3245722 · Jun 28, 2018 · Viewed 14.1k times · Source

I am geting the above error when i run a model training pipeline with spark

`val inputData = spark.read
  .option("header", true)
  .option("mode","DROPMALFORMED")
  .csv(input)
  .repartition(500)
  .toDF("b", "c")
  .withColumn("b", lower(col("b")))
  .withColumn("c", lower(col("c")))
  .toDF("b", "c")
  .na.drop()`

inputData has about 25 million rows and is about 2gb in size. the model building phase happens like so

val tokenizer = new Tokenizer()
  .setInputCol("c")
  .setOutputCol("tokens")

val cvSpec = new CountVectorizer()
  .setInputCol("tokens")
  .setOutputCol("features")
  .setMinDF(minDF)
  .setVocabSize(vocabSize)

val nb = new NaiveBayes()
  .setLabelCol("bi")
  .setFeaturesCol("features")
  .setPredictionCol("prediction")
  .setSmoothing(smoothing)

new Pipeline().setStages(Array(tokenizer, cvSpec, nb)).fit(inputData)

I am running the above spark jobs locally in a machine with 16gb RAM using the following command

spark-submit --class holmes.model.building.ModelBuilder ./holmes-model-building/target/scala-2.11/holmes-model-building_2.11-1.0.0-SNAPSHOT-7d6978.jar --master local[*] --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryoserializer.buffer.max=2000m --conf spark.driver.maxResultSize=2g --conf spark.rpc.message.maxSize=1024 --conf spark.memory.offHeap.enabled=true --conf spark.memory.offHeap.size=50g --driver-memory=12g

The oom error is triggered by (at the bottow of the stack trace) by org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:706)

Logs :

Caused by: java.lang.OutOfMemoryError: Java heap space at java.lang.reflect.Array.newInstance(Array.java:75) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1897) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:706) 

Any suggestions will be great :)

Answer

Ryan Widmaier picture Ryan Widmaier · Jun 28, 2018

Things I would try:

1) Removing spark.memory.offHeap.enabled=true and increasing driver memory to something like 90% of the available memory on the box. You probably are aware of this since you didn't set executor memory, but in local mode the driver and the executor all run in the same process which is controlled by driver-memory. I haven't tried it, but the offHeap feature sounds like it has limited value. Reference

2) An actual cluster instead of local mode. More nodes will obviously give you more RAM.

3a) If you want to stick with local mode, try using less cores. You can do this by specifying the number of cores to use in the master setting like --master local[4] instead of local[*] which uses all of them. Running with less threads simultaneously processing data will lead to less data in RAM at any given time.

3b) If you move to a cluster, you may also want to tweak the number of executors cores for the same reason as mentioned above. You can do this with the --executor-cores flag.

4) Try with more partitions. In your example code you repartitioned to 500 partitions, maybe try 1000, or 2000? More partitions means each partition is smaller and less memory pressure.