I have the following spark job, trying to keep everything in memory:
val myOutRDD = myInRDD.flatMap { fp =>
val tuple2List: ListBuffer[(String, myClass)] = ListBuffer()
:
tuple2List
}.persist(StorageLevel.MEMORY_ONLY).reduceByKey { (p1, p2) =>
myMergeFunction(p1,p2)
}.persist(StorageLevel.MEMORY_ONLY)
However, when I looked in to the job tracker, I still have a lot of Shuffle Write and Shuffle spill to disk ...
Total task time across all tasks: 49.1 h
Input Size / Records: 21.6 GB / 102123058
Shuffle write: 532.9 GB / 182440290
Shuffle spill (memory): 370.7 GB
Shuffle spill (disk): 15.4 GB
Then the job failed because "no space left on device"
... I am wondering for the 532.9 GB Shuffle write here, is it written to disk or memory?
Also, why there are still 15.4 G data spill to the disk while I specifically ask to keep them in the memory?
Thanks!
The persist
calls in your code are entirely wasted if you don't access the RDD multiple times. What's the point of storing something if you never access it? Caching has no bearing on shuffle behavior other than you can avoid re-doing shuffles by keeping their output cached.
Shuffle spill is controlled by the spark.shuffle.spill
and spark.shuffle.memoryFraction
configuration parameters. If spill
is enabled (it is by default) then shuffle files will spill to disk if they start using more than given by memoryFraction
(20% by default).
The metrics are very confusing. My reading of the code is that "Shuffle spill (memory)" is the amount of memory that was freed up as things were spilled to disk. The code for "Shuffle spill (disk)" looks like it's the amount actually written to disk. By the code for "Shuffle write" I think it's the amount written to disk directly — not as a spill from a sorter.