Running Spark driver program in Docker container - no connection back from executor to the driver?

tashoyan picture tashoyan · Aug 3, 2017 · Viewed 8.1k times · Source

UPDATE: The problem is resolved. The Docker image is here: docker-spark-submit

I run spark-submit with a fat jar inside a Docker container. My standalone Spark cluster runs on 3 virtual machines - one master and two workers. From an executor log on a worker machine, I see that the executor has the following driver URL:

"--driver-url" "spark://[email protected]:5001"

172.17.0.2 is actually the address of the container with the driver program, not the host machine where the container is running. This IP is not accessible from the worker machine, therefore the worker is not able to communicate to the driver program. As I see from the source code of StandaloneSchedulerBackend, it builds driverUrl using spark.driver.host setting:

val driverUrl = RpcEndpointAddress(
  sc.conf.get("spark.driver.host"),
  sc.conf.get("spark.driver.port").toInt,
  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

It does not take into account SPARK_PUBLIC_DNS environment variable - is this correct? In the container, I cannot set spark.driver.host to anything else except the container "internal" IP address (172.17.0.2 in this example). When trying to set spark.driver.host to the IP address of the host machine, I get errors like this:

WARN Utils: Service 'sparkDriver' could not bind on port 5001. Attempting port 5002.

I tried to set spark.driver.bindAddress to the IP address of the host machine, but got same errors. So, how can I configure Spark to communicate with the driver program using the host machine IP address rather than Docker container address?

UPD: Stack trace from the executor:

ERROR RpcOutboxMessage: Ask timeout before connecting successfully
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Failure.recover(Try.scala:216)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
    at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
    ... 8 more

Answer

tashoyan picture tashoyan · Aug 21, 2017

So the working configuration is:

  • set spark.driver.host to the IP address of the host machine
  • set spark.driver.bindAddress to the IP address of the container

The working Docker image is here: docker-spark-submit.