Why does Yarn on EMR not allocate all nodes to running Spark jobs?

retnuH picture retnuH · Nov 26, 2015 · Viewed 9.1k times · Source

I'm running a job on Apache Spark on Amazon Elastic Map Reduce (EMR). Currently I'm running on emr-4.1.0 which includes Amazon Hadoop 2.6.0 and Spark 1.5.0.

When I start the job, YARN correctly has allocated all the worker nodes to the spark job (with one for the driver, of course).

I have the magic "maximizeResourceAllocation" property set to "true", and the spark property "spark.dynamicAllocation.enabled" also set to "true".

However, if I resize the emr cluster by adding nodes to the CORE pool of worker machines, YARN only adds some of the new nodes to the spark job.

For example, this morning I had a job that was using 26 nodes (m3.2xlarge, if that matters) - 1 for the driver, 25 executors. I wanted to speed up the job so I tried adding 8 more nodes. YARN has picked up all of the new nodes, but only allocated 1 of them to the Spark job. Spark did successfully pick up the new node and is using it as an executor, but my question is why is YARN letting the other 7 nodes just sit idle?

It's annoying for obvious reasons - I have to pay for the resources even though they're not being used, and my job hasn't sped up at all!

Anybody know how YARN decides when to add nodes to running spark jobs? What variables come into play? Memory? V-Cores? Anything?

Thanks in advance!

Answer

retnuH picture retnuH · Nov 30, 2015

Okay, with the help of @sean_r_owen, I was able to track this down.

The problem was this: when setting spark.dynamicAllocation.enabled to true, spark.executor.instances shouldn't be set - an explicit value for that will override dynamic allocation and turn it off. It turns out that EMR sets it in the background if you do not set it yourself. To get the desired behaviour, you need to explicitly set spark.executor.instances to 0.

For the records, here is the contents of one of the files we pass to the --configurations flag when creating an EMR cluster:

[
    {
        "Classification": "capacity-scheduler",
        "Properties": {
            "yarn.scheduler.capacity.resource-calculator": "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"
        }
    },

    {
        "Classification": "spark",
        "Properties": {
            "maximizeResourceAllocation": "true"
        }
    },

    {
        "Classification": "spark-defaults",
        "Properties": {
            "spark.dynamicAllocation.enabled": "true",
            "spark.executor.instances": "0"
        }
    } 
]

This gives us an EMR cluster where Spark uses all the nodes, including added nodes, when running jobs. It also appears to use all/most of the memory and all (?) the cores.

(I'm not entirely sure that it's using all the actual cores; but it is definitely using more than 1 VCore, which it wasn't before, but following Glennie Helles's advice it is now behaving better and using half of the listed VCores, which seems to equal the actual number of cores...)