I've been playing around with converting RDDs to DataFrames and back again. First, I had an RDD of type (Int, Int) called dataPair. Then I created a DataFrame object with column headers using:
val dataFrame = dataPair.toDF(header(0), header(1))
Then I converted it from a DataFrame back to an RDD using:
val testRDD = dataFrame.rdd
which returns an RDD of type org.apache.spark.sql.Row (not (Int, Int)). Then I'd like to convert it back to an RDD using .toDF but I get an error:
error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
I've tried defining a Schema of type Data(Int, Int) for testRDD, but I get type mismatch exceptions:
error: type mismatch;
found : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
val testRDD: RDD[Data] = dataFrame.rdd
^
I've already imported
import sqlContext.implicits._
To create a DataFrame from an RDD of Rows, usually you have two main options:
1) You can use toDF()
which can be imported by import sqlContext.implicits._
. However, this approach only works for the following types of RDDs:
RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(source: Scaladoc of the SQLContext.implicits
object)
The last signature actually means that it can work for an RDD of tuples or an RDD of case classes (because tuples and case classes are subclasses of scala.Product).
So, to use this approach for an RDD[Row]
, you have to map it to an RDD[T <: scala.Product]
. This can be done by mapping each row to a custom case class or to a tuple, as in the following code snippets:
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
or
case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({
case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")
The main drawback of this approach (in my opinion) is that you have to explicitly set the schema of the resulting DataFrame in the map function, column by column. Maybe this can be done programatically if you don't know the schema in advance, but things can get a little messy there. So, alternatively, there is another option:
2) You can use createDataFrame(rowRDD: RDD[Row], schema: StructType)
, which is available in the SQLContext object. Example:
val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)
Note that there is no need to explicitly set any schema column. We reuse the old DF's schema, which is of StructType
class and can be easily extended. However, this approach sometimes is not possible, and in some cases can be less efficient than the first one.
I hope it's clearer than before. Cheers.