I am doing some memory tuning on my Spark job on YARN and I notice different settings would give different results and affect the outcome of the Spark job run. However, I am confused and do not understand completely why it happens and would appreciate if someone can provide me with some guidance and explanation.
I will provide some background information and post my questions and describe the cases that I have experienced after them below.
My environment setting were as below:
My code recursively filters an RDD to make it smaller (removing examples as part of an algorithm), then does mapToPair and collect to gather the results and save them within a list.
Why is a different error thrown and the job runs longer (for the second case) between the first and second case with only the executor memory being increased? Are the two errors linked in some way?
Both the third and fourth case succeeds and I understand that it is because I am giving more memory which solves the memory problems. However, in the third case,
spark.driver.memory + spark.yarn.driver.memoryOverhead = the memory that YARN will create a JVM
= 11g + (driverMemory * 0.07, with minimum of 384m) = 11g + 1.154g = 12.154g
So, from the formula, I can see that my job requires MEMORY_TOTAL of around 12.154g to run successfully which explains why I need more than 10g for the driver memory setting.
But for the fourth case,
spark.driver.memory + spark.yarn.driver.memoryOverhead = the memory that YARN will create a JVM
= 2 + (driverMemory * 0.07, with minimum of 384m) = 2g + 0.524g = 2.524g
It seems that just by increasing the memory overhead by a small amount of 1024(1g) it leads to the successful run of the job with driver memory of only 2g and the MEMORY_TOTAL is only 2.524g! Whereas without the overhead configuration, driver memory less than 11g fails but it doesn't make sense from the formula which is why I am confused.
Why increasing the memory overhead (for both driver and executor) allows my job to complete successfully with a lower MEMORY_TOTAL (12.154g vs 2.524g)? Is there some other internal things at work here that I am missing?
/bin/spark-submit --class <class name> --master yarn-cluster --driver-memory 7g --executor-memory 1g --num-executors 3 --executor-cores 1 --jars <jar file>
If I run my program with any driver memory less than 11g, I will get the error below which is the SparkContext being stopped or a similar error which is a method being called on a stopped SparkContext. From what I have gathered, this is related to memory not being enough.
/bin/spark-submit --class <class name> --master yarn-cluster --driver-memory 7g --executor-memory 3g --num-executors 3 --executor-cores 1 --jars <jar file>
If I run the program with the same driver memory but higher executor memory, the job runs longer (about 3-4 minutes) than the first case and then it will encounter a different error from earlier which is a Container requesting/using more memory than allowed and is being killed because of that. Although I find it weird since the executor memory is increased and this error occurs instead of the error in the first case.
/bin/spark-submit --class <class name> --master yarn-cluster --driver-memory 11g --executor-memory 1g --num-executors 3 --executor-cores 1 --jars <jar file>
Any setting with driver memory greater than 10g will lead to the job being able to run successfully.
/bin/spark-submit --class <class name> --master yarn-cluster --driver-memory 2g --executor-memory 1g --conf spark.yarn.executor.memoryOverhead=1024 --conf spark.yarn.driver.memoryOverhead=1024 --num-executors 3 --executor-cores 1 --jars <jar file>
The job will run successfully with this setting (driver memory 2g and executor memory 1g but increasing the driver memory overhead(1g) and the executor memory overhead(1g).
Any help will be appreciated and would really help with my understanding of Spark. Thanks in advance.
All your cases use
--executor-cores 1
It is the best practice to go above 1. And don't go above 5. From our experience and from Spark developers recommendation.
E.g. http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ :
A rough guess is that at most five tasks per executor
can achieve full write throughput, so it’s good to keep
the number of cores per executor below that number
I can't find now reference where it was recommended to go above 1 cores per executor. But the idea is that running multiple tasks in the same executor gives you ability to share some common memory regions so it actually saves memory.
Start with --executor-cores 2, double --executor-memory (because --executor-cores tells also how many tasks one executor will run concurently), and see what it does for you. Your environment is compact in terms of available memory, so going to 3 or 4 will give you even better memory utilization.
We use Spark 1.5 and stopped using --executor-cores 1 quite some time ago as it was giving GC problems; it looks also like a Spark bug, because just giving more memory wasn't helping as much as just switching to having more tasks per container. I guess tasks in the same executor may peak its memory consumption at different times, so you don't waste/don't have to overprovision memory just to make it work.
Another benefit is that Spark's shared variables (accumulators and broadcast variables) will have just one copy per executor, not per task - so switching to multiple tasks per executor is a direct memory saving right there. Even if you don't use Spark shared variables explicitly, Spark very likely creates them internally anyway. For example, if you join two tables through Spark SQL, Spark's CBO may decide to broadcast a smaller table (or a smaller dataframe) across to make join run faster.
http://spark.apache.org/docs/latest/programming-guide.html#shared-variables