My cluster: 1 master, 11 slaves, each node has 6 GB memory.
My settings:
spark.executor.memory=4g, Dspark.akka.frameSize=512
Here is the problem:
First, I read some data (2.19 GB) from HDFS to RDD:
val imageBundleRDD = sc.newAPIHadoopFile(...)
Second, do something on this RDD:
val res = imageBundleRDD.map(data => {
val desPoints = threeDReconstruction(data._2, bg)
(data._1, desPoints)
})
Last, output to HDFS:
res.saveAsNewAPIHadoopFile(...)
When I run my program it shows:
.....
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:24 as TID 33 on executor 9: Salve7.Hadoop (NODE_LOCAL)
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:24 as 30618515 bytes in 210 ms
14/01/15 21:42:27 INFO cluster.ClusterTaskSetManager: Starting task 1.0:36 as TID 34 on executor 2: Salve11.Hadoop (NODE_LOCAL)
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Serialized task 1.0:36 as 30618515 bytes in 449 ms
14/01/15 21:42:28 INFO cluster.ClusterTaskSetManager: Starting task 1.0:32 as TID 35 on executor 7: Salve4.Hadoop (NODE_LOCAL)
Uncaught error from thread [spark-akka.actor.default-dispatcher-3] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[spark]
java.lang.OutOfMemoryError: Java heap space
There are too many tasks?
PS: Every thing is ok when the input data is about 225 MB.
How can I solve this problem?
I have a few suggestions:
spark.executor.memory=6g
. Make sure you're using as much memory as possible by checking the UI (it will say how much mem you're using)spark.storage.memoryFraction
. If you don't use cache()
or persist
in your code, this might as well be 0. It's default is 0.6, which means you only get 0.4 * 4g memory for your heap. IME reducing the mem frac often makes OOMs go away. UPDATE: From spark 1.6 apparently we will no longer need to play with these values, spark will determine them automatically.String
and heavily nested structures (like Map
and nested case classes). If possible try to only use primitive types and index all non-primitives especially if you expect a lot of duplicates. Choose WrappedArray
over nested structures whenever possible. Or even roll out your own serialisation - YOU will have the most information regarding how to efficiently back your data into bytes, USE IT!Dataset
to cache your structure as it will use more efficient serialisation. This should be regarded as a hack when compared to the previous bullet point. Building your domain knowledge into your algo/serialisation can minimise memory/cache-space by 100x or 1000x, whereas all a Dataset
will likely give is 2x - 5x in memory and 10x compressed (parquet) on disk.http://spark.apache.org/docs/1.2.1/configuration.html
EDIT: (So I can google myself easier) The following is also indicative of this problem:
java.lang.OutOfMemoryError : GC overhead limit exceeded