I'm running an EMR cluster (version emr-4.2.0) for Spark using the Amazon specific maximizeResourceAllocation
flag as documented here. According to those docs, "this option calculates the maximum compute and memory resources available for an executor on a node in the core node group and sets the corresponding spark-defaults settings with this information".
I'm running the cluster using m3.2xlarge instances for the worker nodes. I'm using a single m3.xlarge for the YARN master - the smallest m3 instance I can get it to run on, since it doesn't do much.
The situation is this: When I run a Spark job, the number of requested cores for each executor is 8. (I only got this after configuring "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
which isn't actually in the documentation, but I digress). This seems to make sense, because according to these docs an m3.2xlarge has 8 "vCPUs". However, on the actual instances themselves, in /etc/hadoop/conf/yarn-site.xml
, each node is configured to have yarn.nodemanager.resource.cpu-vcores
set to 16
. I would (at a guess) think that must be due to hyperthreading or perhaps some other hardware fanciness.
So the problem is this: when I use maximizeResourceAllocation
, I get the number of "vCPUs" that the Amazon Instance type has, which seems to be only half of the number of configured "VCores" that YARN has running on the node; as a result, the executor is using only half of the actual compute resources on the instance.
Is this a bug in Amazon EMR? Are other people experiencing the same problem? Is there some other magic undocumented configuration that I am missing?
Okay, after a lot of experimentation, I was able to track down the problem. I'm going to report my findings here to help people avoid frustration in the future.
maximizeResourceAllocation
is set, when you run a Spark program, it sets the property spark.default.parallelism
to be the number of instance cores (or "vCPUs") for all the non-master instances that were in the cluster at the time of creation. This is probably too small even in normal cases; I've heard that it is recommended to set this at 4x the number of cores you will have to run your jobs. This will help make sure that there are enough tasks available during any given stage to keep the CPUs busy on all executors.spark.default.parallelism
setting at runtime, this can be a convenient number to repartition to.TL;DR
maximizeResourceAllocation
will do almost everything for you correctly except...spark.default.parallelism
to 4x number of instance cores you want the job to run on on a per "step" (in EMR speak)/"application" (in YARN speak) basis, i.e. set it every time and...