I wanted to change a column name of a Databricks Delta table.
So I did the following:
// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")
// Created a new DF with a renamed column
val new_data_DF = old_data_DF
.withColumnRenamed("column_a", "metric1")
.select("*")
// Dropped and recereated the Delta files location
dbutils.fs.rm("dbfs:/mnt/main/sales", true)
dbutils.fs.mkdirs("dbfs:/mnt/main/sales")
// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")
Here I'm getting an Error at the last step when writing to Delta:
java.io.FileNotFoundException: dbfs:/mnt/main/sales/sale_date_partition=2019-04-29/part-00000-769.c000.snappy.parquet
A file referenced in the transaction log cannot be found. This occurs when data has been manually deleted from the file system rather than using the table `DELETE` statement
Obviously the data was deleted and most likely I've missed something in the above logic. Now the only place that contains the data is the new_data_DF
.
Writing to a location like dbfs:/mnt/main/sales_tmp
also fails
What should I do to write data from new_data_DF
to a Delta location?
In general, it is a good idea to avoid using rm
on Delta tables. Delta's transaction log can prevent eventual consistency issues in most cases, however, when you delete and recreate a table in a very short time, different versions of the transaction log can flicker in and out of existence.
Instead, I'd recommend using the transactional primitives provided by Delta. For example, to overwrite the data in a table you can:
df.write.format("delta").mode("overwrite").save("/delta/events")
If you have a table that has already been corrupted, you can fix it using FSCK.