External Hive Table Refresh table vs MSCK Repair

Ajith Kannan picture Ajith Kannan · Aug 6, 2018 · Viewed 19.8k times · Source

I have external hive table stored as Parquet, partitioned on a column say as_of_dt and data gets inserted via spark streaming. Now Every day new partition get added. I am doing msck repair table so that the hive metastore gets the newly added partition info. Is this the only way or is there a better way? I am concerned if downstream users querying the table, will msck repair cause any issue in non availability of data or stale data? I was going through the HiveContext API and see refreshTable option. Any idea if this makes sense to use refreshTable instead ?

Answer

afeldman picture afeldman · Aug 7, 2018

To directly answer your question msck repair table, will check if partitions for a table is active. Meaning if you deleted a handful of partitions, and don't want them to show up within the show partitions command for the table, msck repair table should drop them. Msck repair could take more time than an invalidate or refresh statement, however Invalidate Metadata only runs within Hive updating only the Hive Metastore. Refresh runs only in Spark SQL and updates the Spark metadata store.

Hive metastore should be fine if you are completing the add partition step somewhere in the processing, however if you ever want to access the hive table through Spark SQL you will need to update the metadata through Spark (or Impala or another process that updates the spark metadata).

Anytime you update or change the contents of a hive table, the Spark metastore can fall out of sync, causing you to be unable to query the data through the spark.sql command set. Meaning if you want to query that data you need to keep the Spark metastore in sync.

If you have a Spark version that allows for it, you should refresh and add partitions to Hive tables within Spark, so all metastores are in sync. Below is how I do it:

//Non-Partitioned Table
outputDF.write.format("parquet").mode("overwrite").load(fileLocation)
spark.sql("refresh table " + tableName)

//Partitioned Table
outputDF.write.format("parquet").mode("overwrite").load(fileLocation + "/" + partition)
val addPartitionsStatement = "alter table" + tableName = " add if not exists partition(partitionKey='" + partition + "') location '" + fileLocation + "/" + partition + "'"
spark.sql(addPartitionsStatement)
spark.sql("refresh table " + tableName)