What is the best way to preform a flatMap
on a DataFrame
in spark?
From searching around and doing some testing, I have come up with two different approaches. Both of these have some drawbacks so I'm thinking that there should be some better/easier way to do it.
The first way I have found is to first convert the DataFrame
into an RDD
and then back again:
val map = Map("a" -> List("c","d","e"), "b" -> List("f","g","h"))
val df = List(("a", 1.0), ("b", 2.0)).toDF("x", "y")
val rdd = df.rdd.flatMap{ row =>
val x = row.getAs[String]("x")
val x = row.getAs[Double]("y")
for(v <- map(x)) yield Row(v,y)
}
val df2 = spark.createDataFrame(rdd, df.schema)
The second approach is to create a DataSet
before using the flatMap
(using the same variables as above) and then convert back:
val ds = df.as[(String, Double)].flatMap{
case (x, y) => for(v <- map(x)) yield (v,y)
}.toDF("x", "y")
Both these approaches work quite well when the number of columns are small, however I have a lot more than 2 columns. Is there any better way to solve this problem? Preferably in a way where no conversion is necessary.
You can create a second dataframe
from your map
RDD:
val mapDF = Map("a" -> List("c","d","e"), "b" -> List("f","g","h")).toList.toDF("key", "value")
Then do the join
and apply the explode
function:
val joinedDF = df.join(mapDF, df("x") === mapDF("key"), "inner")
.select("value", "y")
.withColumn("value", explode($"value"))
And you get the solution.
joinedDF.show()