Should I persist a Spark dataframe if I keep adding columns in it?

Jaygovind Sahu picture Jaygovind Sahu · May 20, 2018 · Viewed 9.3k times · Source

I could not find any discussion on below topic in any forum I searched in internet. It may be because I am new to Spark and Scala and I am not asking a valid question. If there are any existing threads discussing the same or similar topic, the links will be very helpful. :)

I am working on a process which uses Spark and Scala and creates a file by reading a lot of tables and deriving a lot of fields by applying logic to the data fetched from tables. So, the structure of my code is like this:

val driver_sql = "SELECT ...";

var df_res = spark.sql(driver_sql)

var df_res = df_res.withColumn("Col1", <logic>)

var df_res = df_res.withColumn("Col2", <logic>)

var df_res = df_res.withColumn("Col3", <logic>)
.
.
.

var df_res = df_res.withColumn("Col20", <logic>)

Basically, there is a driver query which creates the "driver" dataframe. After that, separate logic (functions) is executed based on a key or keys in the driver dataframe to add new columns/fields. The "logic" part is not always a one-line code, sometimes, it is a separate function which runs another query and does some kind of join on df_res and adds a new column. Record count also changes since I use “inner” join with other tables/dataframes in some cases.

So, here are my questions:

  • Should I persist df_res at any point in time?
  • Can I persist df_res again and again after columns are added? I mean, does it add value?
  • If I persist df_res (disk only) every time a new column is added, is the data in the disk replaced? Or does it create a new copy/version of df_res in the disk?
  • Is there is a better technique to persist/cache data in a scenario like this (to avoid doing a lot of stuff in memory)?

Answer

wypul picture wypul · Jun 26, 2018

The first thing is persisting a dataframe helps when you are going to apply iterative operations on dataframe.
What you are doing here is applying transformation operation on your dataframes. There is no need to persist these dataframes here.
For eg:- Persisting would be helpful if you are doing something like this.

val df = spark.sql("select * from ...").persist

df.count

val df1 = df.select("..").withColumn("xyz",udf(..))

df1.count

val df2 = df.select("..").withColumn("abc",udf2(..))

df2.count

Now, if you persist df here then it would be beneficial in calculating df1 and df2. One more thing to notice here is, the reason why I did df.count is because dataframe is persisted only when an action is applied on it. From Spark docs: "The first time it is computed in an action, it will be kept in memory on the nodes". And this answers your second question as well.

Every time you persist a new copy will be created but you should unpersist the prev one first.