My requirement is to get the top N items from a dataframe.
I've this DataFrame:
val df = List(
("MA", "USA"),
("MA", "USA"),
("OH", "USA"),
("OH", "USA"),
("OH", "USA"),
("OH", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("NY", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA"),
("CT", "USA")).toDF("value", "country")
I was able to map it to an RDD[((Int, String), Long)]
colValCount:
Read: ((colIdx, value), count)
((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)
Now I need to get the top 2 items for each column index. So my expected output is this:
RDD[((Int, String), Long)]
((0,CT),5)
((0,NY),6)
((1,USA),17)
I've tried using freqItems api in DataFrame but it's slow.
Any suggestions are welcome.
For example:
import org.apache.spark.sql.functions._
df.select(lit(0).alias("index"), $"value")
.union(df.select(lit(1), $"country"))
.groupBy($"index", $"value")
.count
.orderBy($"count".desc)
.limit(3)
.show
// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// | 1| USA| 17|
// | 0| NY| 6|
// | 0| CT| 5|
// +-----+-----+-----+
where:
df.select(lit(0).alias("index"), $"value")
.union(df.select(lit(1), $"country"))
creates a two column DataFrame
:
// +-----+-----+
// |index|value|
// +-----+-----+
// | 0| MA|
// | 0| MA|
// | 0| OH|
// | 0| OH|
// | 0| OH|
// | 0| OH|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| NY|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 0| CT|
// | 1| USA|
// | 1| USA|
// | 1| USA|
// +-----+-----+
If you want specifically two values for each column:
import org.apache.spark.sql.DataFrame
def topN(df: DataFrame, key: String, n: Int) = {
df.select(
lit(df.columns.indexOf(key)).alias("index"),
col(key).alias("value"))
.groupBy("index", "value")
.count
.orderBy($"count")
.limit(n)
}
topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// | 0| MA| 2|
// | 0| OH| 4|
// | 1| USA| 17|
// +-----+-----+-----+
So like pault said - just "some combination of sort()
and limit()
".