How to convert spark SchemaRDD into RDD of my case class?

samthebest picture samthebest · Oct 3, 2014 · Viewed 12.9k times · Source

In the spark docs it's clear how to create parquet files from RDD of your own case classes; (from the docs)

val people: RDD[Person] = ??? // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a SchemaRDD by createSchemaRDD, allowing it to be stored using Parquet.
people.saveAsParquetFile("people.parquet")

But not clear how to convert back, really we want a method readParquetFile where we can do:

val people: RDD[Person] = sc.readParquestFile[Person](path)

where those values of the case class are defined are those which are read by the method.

Answer

marios picture marios · Jun 26, 2015

An easy way is to provide your own converter (Row) => CaseClass. This is a bit more manual, but if you know what you are reading it should be quite straightforward.

Here is an example:

import org.apache.spark.sql.SchemaRDD

case class User(data: String, name: String, id: Long)

def sparkSqlToUser(r: Row): Option[User] = {
    r match {
      case Row(time: String, name: String, id: Long) => Some(User(time,name, id))
      case _ => None
    }
}

val parquetData: SchemaRDD = sqlContext.parquetFile("hdfs://localhost/user/data.parquet")

val caseClassRdd: org.apache.spark.rdd.RDD[User] = parquetData.flatMap(sparkSqlToUser)