I could not find any discussion on below topic in any forum I searched in internet. It may be because I am new to Spark and Scala and I am not asking a valid question. If there are any existing threads discussing the same or similar topic, the links will be very helpful. :)
I am working on a process which uses Spark and Scala and creates a file by reading a lot of tables and deriving a lot of fields by applying logic to the data fetched from tables. So, the structure of my code is like this:
val driver_sql = "SELECT ...";
var df_res = spark.sql(driver_sql)
var df_res = df_res.withColumn("Col1", <logic>)
var df_res = df_res.withColumn("Col2", <logic>)
var df_res = df_res.withColumn("Col3", <logic>)
.
.
.
var df_res = df_res.withColumn("Col20", <logic>)
Basically, there is a driver query which creates the "driver" dataframe. After that, separate logic (functions) is executed based on a key or keys in the driver dataframe to add new columns/fields. The "logic" part is not always a one-line code, sometimes, it is a separate function which runs another query and does some kind of join on df_res and adds a new column. Record count also changes since I use “inner” join with other tables/dataframes in some cases.
So, here are my questions:
df_res
at any point in time?df_res
again and again after columns are added? I mean, does it add value?df_res
(disk only) every time a new column is added, is the data in the disk replaced? Or does it create a new copy/version of df_res
in the disk?The first thing is persisting a dataframe helps when you are going to apply iterative operations on dataframe.
What you are doing here is applying transformation operation on your dataframes. There is no need to persist these dataframes here.
For eg:- Persisting would be helpful if you are doing something like this.
val df = spark.sql("select * from ...").persist
df.count
val df1 = df.select("..").withColumn("xyz",udf(..))
df1.count
val df2 = df.select("..").withColumn("abc",udf2(..))
df2.count
Now, if you persist df here then it would be beneficial in calculating df1 and df2. One more thing to notice here is, the reason why I did df.count is because dataframe is persisted only when an action is applied on it. From Spark docs: "The first time it is computed in an action, it will be kept in memory on the nodes". And this answers your second question as well.
Every time you persist a new copy will be created but you should unpersist the prev one first.