Question: in pandas when dropping duplicates you can specify which columns to keep. Is there an equivalent in Spark Dataframes?
Pandas:
df.sort_values('actual_datetime', ascending=False).drop_duplicates(subset=['scheduled_datetime', 'flt_flightnumber'], keep='first')
Spark dataframe (I use Spark 1.6.0) doesn't have the keep option
df.orderBy(['actual_datetime']).dropDuplicates(subset=['scheduled_datetime', 'flt_flightnumber'])
Imagine scheduled_datetime
and flt_flightnumber
are columns 6 ,17. By creating keys based on the values of these columns we can also deduplicate
def get_key(x):
return "{0}{1}".format(x[6],x[17])
df= df.map(lambda x: (get_key(x),x)).reduceByKey(lambda x,y: (x))
but how to specify to keep the first row and get rid of the other duplicates ? What about the last row ?
To everyone saying that dropDuplicates keeps the first occurrence - this is not strictly correct.
dropDuplicates keeps the 'first occurrence' of a sort operation - only if there is 1 partition. See below for some examples.
However this is not practical for most Spark datasets. So I'm also including an example of 'first occurrence' drop duplicates operation using Window function + sort + rank + filter.
See bottom of post for example.
This is tested in Spark 2.4.0 using pyspark.
import pandas as pd
# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-02-01
1 1 2018-02-01
2 2 2018-02-01
3 3 2018-02-01
4 4 2018-02-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
# first example
# does not give first (based on datestr)
(spark.createDataFrame(dfall)
.orderBy('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-02-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-01-01|
+----+----------+
# second example
# testing what happens with repartition
(spark.createDataFrame(dfall)
.orderBy('datestr')
.repartition('datestr')
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates NOT based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-02-01|
| 1|2018-01-01|
| 3|2018-02-01|
| 2|2018-02-01|
| 4|2018-02-01|
+----+----------+
#third example
# testing with coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr')
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 2|2018-01-01|
| 3|2018-01-01|
| 4|2018-01-01|
+----+----------+
# fourth example
# testing with reverse sort then coalesce(1)
(spark
.createDataFrame(dfall)
.orderBy('datestr', ascending = False)
.coalesce(1)
.dropDuplicates(subset = ['col1'])
.show()
)
# dropDuplicates based on occurrence of sorted datestr```
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-03-01|
| 1|2018-03-01|
| 2|2018-03-01|
| 3|2018-03-01|
| 4|2018-03-01|
+----+----------+
# generating some example data with pandas
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-02-01'
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
# into spark df
df_s = (spark.createDataFrame(dfall))
from pyspark.sql import Window
from pyspark.sql.functions import rank
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
+----+----------+
# however this fails if ties/duplicates exist in the windowing paritions
# and so a tie breaker for the 'rank' function must be added
# generating some example data with pandas, will convert to spark df below
df1 = pd.DataFrame({'col1':range(0,5)})
df1['datestr'] = '2018-01-01'
df2 = pd.DataFrame({'col1':range(0,5)})
df2['datestr'] = '2018-01-01' # note duplicates in this dataset
df3 = pd.DataFrame({'col1':range(0,5)})
df3['datestr'] = '2018-03-01'
dfall = pd.concat([df1,df2,df3])
print(dfall)
col1 datestr
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-01-01
1 1 2018-01-01
2 2 2018-01-01
3 3 2018-01-01
4 4 2018-01-01
0 0 2018-03-01
1 1 2018-03-01
2 2 2018-03-01
3 3 2018-03-01
4 4 2018-03-01
# this will fail, since duplicates exist within the window partitions
# and no way to specify ranking style exists in pyspark rank() fn
window = Window.partitionBy("col1").orderBy("datestr")
(df_s.withColumn('rank', rank().over(window))
.filter(col('rank') == 1)
.drop('rank')
.show()
)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 0|2018-01-01|
| 1|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
| 4|2018-01-01|
+----+----------+
# to deal with ties within window partitions, a tiebreaker column is added
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("col1").orderBy("datestr",'tiebreak')
(df_s
.withColumn('tiebreak', monotonically_increasing_id())
.withColumn('rank', rank().over(window))
.filter(col('rank') == 1).drop('rank','tiebreak')
.show()
)
+----+----------+
|col1| datestr|
+----+----------+
| 0|2018-01-01|
| 1|2018-01-01|
| 3|2018-01-01|
| 2|2018-01-01|
| 4|2018-01-01|
+----+----------+