Timeout Exception in Apache-Spark during program Execution

Yasir picture Yasir · Nov 22, 2016 · Viewed 34.7k times · Source

I am running a Bash Script in MAC. This script calls a spark method written in Scala language for a large number of times. I am currently trying to call this spark method for 100,000 times using a for loop.

The code exits with the following exception after running a small number of iterations, around 3000 iterations.

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)

Exception in thread "dag-scheduler-event-loop" 16/11/22 13:37:32 WARN NioEventLoop: Unexpected exception in the selector loop.
java.lang.OutOfMemoryError: Java heap space
    at io.netty.util.internal.MpscLinkedQueue.offer(MpscLinkedQueue.java:126)
    at io.netty.util.internal.MpscLinkedQueue.add(MpscLinkedQueue.java:221)
    at io.netty.util.concurrent.SingleThreadEventExecutor.fetchFromScheduledTaskQueue(SingleThreadEventExecutor.java:259)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:346)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
java.lang.OutOfMemoryError: Java heap space
    at java.util.regex.Pattern.compile(Pattern.java:1047)
    at java.lang.String.replace(String.java:2180)
    at org.apache.spark.util.Utils$.getFormattedClassName(Utils.scala:1728)
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:57)
    at org.apache.spark.storage.RDDInfo$$anonfun$1.apply(RDDInfo.scala:57)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.storage.RDDInfo$.fromRdd(RDDInfo.scala:57)
    at org.apache.spark.scheduler.StageInfo$$anonfun$1.apply(StageInfo.scala:87)

Can someone help please, is this error being caused because of a large number of calls to spark method?

Answer

Ram Ghadiyaram picture Ram Ghadiyaram · Nov 22, 2016

Its RpcTimeoutException .. so spark.network.timeout (spark.rpc.askTimeout) could be tuned with larger-than-default values in order to handle complex workload. You can start with these values and adjust accordingly to your workloads. Please see latest

spark.network.timeout 120s Default timeout for all network interactions. This config will be used in place of spark.core.connection.ack.wait.timeout, spark.storage.blockManagerSlaveTimeoutMs, spark.shuffle.io.connectionTimeout, spark.rpc.askTimeout or spark.rpc.lookupTimeout if they are not configured.

Also consider increasing executor memory i.e spark.executor.memory and most imp thing is review your code, to check whether that is candidate for further optimization.

Solution : value 600 is based on requirement

set by SparkConf: conf.set("spark.network.timeout", "600s")
set by spark-defaults.conf: spark.network.timeout 600s
set when calling spark-submit: --conf spark.network.timeout=600s