Spark Launcher waiting for job completion infinitely

TomaszGuzialek picture TomaszGuzialek · Jul 31, 2015 · Viewed 17.1k times · Source

I am trying to submit a JAR with Spark job into the YARN cluster from Java code. I am using SparkLauncher to submit SparkPi example:

Process spark = new SparkLauncher()
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
    .setMainClass("org.apache.spark.examples.SparkPi")
    .setMaster("yarn-cluster")
    .launch();
System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);

There are two problems:

  1. While submitting in "yarn-cluster" mode, the application is sucessfully submitted to YARN and executes successfully (it is visible in the YARN UI, reported as SUCCESS and pi is printed in the output). However, the submitting application is never notified that processing is finished - it hangs infinitely after printing "Waiting to finish..." The log of the container can be found here
  2. While submitting in "yarn-client" mode, the application does not appear in YARN UI and the submitting application hangs at "Waiting to finish..." When hanging code is killed, the application shows up in YARN UI and it is reported as SUCCESS, but the output is empty (pi is not printed out). The log of the container can be found here

I tried to execute the submitting application both with Oracle Java 7 and 8.

Answer

TomaszGuzialek picture TomaszGuzialek · Aug 3, 2015

I got help in the Spark mailing list. The key is to read / clear getInputStream and getErrorStream() on the Process. The child process might fill up the buffer and cause a deadlock - see Oracle docs regarding Process. The streams should be read in separate threads:

Process spark = new SparkLauncher()
    .setSparkHome("C:\\spark-1.4.1-bin-hadoop2.6")
    .setAppResource("C:\\spark-1.4.1-bin-hadoop2.6\\lib\\spark-examples-1.4.1-hadoop2.6.0.jar")
    .setMainClass("org.apache.spark.examples.SparkPi").setMaster("yarn-cluster").launch();

InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(spark.getInputStream(), "input");
Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
inputThread.start();

InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(spark.getErrorStream(), "error");
Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
errorThread.start();

System.out.println("Waiting for finish...");
int exitCode = spark.waitFor();
System.out.println("Finished! Exit code:" + exitCode);

where InputStreamReaderRunnable class is:

public class InputStreamReaderRunnable implements Runnable {

    private BufferedReader reader;

    private String name;

    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }

    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = reader.readLine();
            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}