On EMR Spark, writing an RDD[String]
to S3 via a dataframe.
rddString
.toDF()
.coalesce(16)
.write
.option("compression", "gzip")
.mode(SaveMode.Overwrite)
.json(s"s3n://my-bucket/some/new/path")
Save mode is Overwrite
and s3n://my-bucket/some/new/path
does not yet exist.
I consistently get an IOException: File already exists
:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 55.0 failed 4 times, most recent failure: Lost task 15.3 in stage 55.0 (TID 8441, ip-172-31-17-30.us-west-2.compute.internal, executor 3): org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:270)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:189)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(FileFormatWriter.scala:188)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: File already exists:s3n://my-bucket/some/new/path/part-00015-03a0c001-fc99-4055-9be5-68a1fb0cf6d3-c000.json.gz
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:625)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:932)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:810)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:176)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
at org.apache.spark.sql.execution.datasources.json.JsonOutputWriter.<init>(JsonFileFormat.scala:140)
at org.apache.spark.sql.execution.datasources.json.JsonFileFormat$$anon$1.newInstance(JsonFileFormat.scala:80)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:303)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:312)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:256)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:254)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1371)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:259)
... 8 more
Spark v2.2.1, EMR v5.12.0
Prior to the exception being thrown, files are written to the destination. However, I cannot tell if they are complete.
I bumped into the similar issue when I ran EMR with Glue job. And in nutshell, it is usually not the real root cause that fails your job. The spark task may be failed by other reason. And it finally throws this "IOException: File already exists" after retries for the original failure.
So find and solve the real root cause, it will also gone.
In my case, the reported error looked as below in CloudWatch ErrorLogs:
: org.apache.spark.SparkException: Job aborted.
at ...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: File already exists:s3://personal-tests/xdqian/zappos_triplet_loss/output_cache_test/part-00003-8eaa7c78-e227-4476-b96d-4300e7350bc7-c000.csv
I don't have a clue, but when I inspected the Logs, I found the exception as below:
18/12/05 06:14:15 ERROR Utils: Aborting task
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/mnt/yarn/usercache/root/appcache/application_1543990079218_0001/container_1543990079218_0001_01_000101/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/mnt/yarn/usercache/root/appcache/application_1543990079218_0001/container_1543990079218_0001_01_000101/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/mnt/yarn/usercache/root/appcache/application_1543990079218_0001/container_1543990079218_0001_01_000101/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/mnt/yarn/usercache/root/appcache/application_1543990079218_0001/container_1543990079218_0001_01_000001/GoldenGardensGluePythonScripts.zip/golden_gardens_glue_python_scripts/job.py", line 62, in <lambda>
TypeError: 'NoneType' object has no attribute '__getitem__'
Finally that "File already exists" exception was gone after I solved this NoneType error. I read in some other material (sorry I could no more track it down) that "File already exists" error is always caused by task failure and retry due to some other issue (NoneType in my case). I anticipate the executor task create a file and output the data row by row. It may fail at say row 34 due to the NoneType error and get aborted, while the file still exists with the first 33 rows. It's said the failed task will be retried for 4 times. when the task is retried, it will find the existent file by previous running at the very beginning. So the root cause is actually logged as Loggs, with "File already exists" exception in ErrorLogs as it's the final exception before the job is terminated. And the overwriting mode will not help here, as will only do the check at the beginning, not a control flag for this edge case.