Databricks - failing to write from a DataFrame to a Delta location

samba picture samba · May 6, 2019 · Viewed 10.5k times · Source

I wanted to change a column name of a Databricks Delta table.

So I did the following:

// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")

// Created a new DF with a renamed column
val new_data_DF = old_data_DF
      .withColumnRenamed("column_a", "metric1")
      .select("*")

// Dropped and recereated the Delta files location
dbutils.fs.rm("dbfs:/mnt/main/sales", true)
dbutils.fs.mkdirs("dbfs:/mnt/main/sales")

// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")

Here I'm getting an Error at the last step when writing to Delta:

java.io.FileNotFoundException: dbfs:/mnt/main/sales/sale_date_partition=2019-04-29/part-00000-769.c000.snappy.parquet
A file referenced in the transaction log cannot be found. This occurs when data has been manually deleted from the file system rather than using the table `DELETE` statement

Obviously the data was deleted and most likely I've missed something in the above logic. Now the only place that contains the data is the new_data_DF. Writing to a location like dbfs:/mnt/main/sales_tmp also fails

What should I do to write data from new_data_DF to a Delta location?

Answer

Michael Armbrust picture Michael Armbrust · May 7, 2019

In general, it is a good idea to avoid using rm on Delta tables. Delta's transaction log can prevent eventual consistency issues in most cases, however, when you delete and recreate a table in a very short time, different versions of the transaction log can flicker in and out of existence.

Instead, I'd recommend using the transactional primitives provided by Delta. For example, to overwrite the data in a table you can:

df.write.format("delta").mode("overwrite").save("/delta/events")

If you have a table that has already been corrupted, you can fix it using FSCK.