How to deal with tasks running too long (comparing to others in job) in yarn-client?

tnk_peka picture tnk_peka · Aug 4, 2015 · Viewed 15k times · Source

We use a Spark cluster as yarn-client to calculate several business, but sometimes we have a task run too long time:

enter image description here

We don't set timeout but I think default timeout a spark task is not too long such here ( 1.7h ).

Anyone give me an ideal to work around this issue ???

Answer

zengr picture zengr · Sep 21, 2015

There is no way for spark to kill its tasks if its taking too long.

But I figured out a way to handle this using speculation,

This means if one or more tasks are running slowly in a stage, they will be re-launched.

spark.speculation                  true
spark.speculation.multiplier       2
spark.speculation.quantile         0

Note: spark.speculation.quantile means the "speculation" will kick in from your first task. So use it with caution. I am using it because some jobs get slowed down due to GC over time. So I think you should know when to use this - its not a silver bullet.

Some relevant links: http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.html and http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3CCAPmMX=rOVQf7JtDu0uwnp1xNYNyz4xPgXYayKex42AZ_9Pvjug@mail.gmail.com%3E

Update

I found a fix for my issue (might not work for everyone). I had a bunch of simulations running per task, so I added timeout around the run. If a simulation is taking longer (due to a data skew for that specific run), it will timeout.

ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task = () -> simulator.run();

Future<SimResult> future = executor.submit(task);
try {
    result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
    future.cancel(true);
    SPARKLOG.info("Task timed out");
}

Make sure you handle an interrupt inside the simulator's main loop like:

if(Thread.currentThread().isInterrupted()){
    throw new InterruptedException();
}