Spark Scala Get Data Back from rdd.foreachPartition

codeaperature picture codeaperature · Apr 30, 2016 · Viewed 13.3k times · Source

I have some code like this:

      println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
      val lastRevs = distinctFileGidsRDD.
        foreachPartition(iter => {
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          while(iter.hasNext) {
            val item = iter.next()
            //println(item(0))
            println("String: "+item(0).toString())
            val jsonStr = DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
                map { resultSet => resultSet.string(1) }.single.apply()
            }
            println("\nJSON: "+jsonStr)
          }
        })
      println("\nEND Last Revs Class: "+ lastRevs.getClass)

The code outputs (with heavy edits) something like:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",... )
...
JSON: None()
END Last Revs Class: void

QUESTION 1: How can I get the lastRevs value to be in a useful format like the JSON string/null or an option like Some / None?

QUESTION 2: My preference: IS there another way get at partitions data that an RDD-like format (rather than the iterator format)?

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  } 
}

from http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

QUESTION 3: Is the method of getting data that I am using a sane method (given I am following the link above)? (Put aside the fact that this is a scalikejdbc system JDBC right now. This is going to be a key, value store of some type other than this prototype.)

Answer

maasg picture maasg · Apr 30, 2016

To create a transformation that uses resources local to the executor (such as a DB or network connection), you should use rdd.mapPartitions. It allows to initialize some code locally to the executor and use those local resources to process the data in the partition.

The code should look like:

 val lastRevs = distinctFileGidsRDD.
        mapPartitions{iter => 
          SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
          iter.map{ element => 
            DB.readOnly { implicit session =>
              sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar"
              .map { resultSet => resultSet.string(1) }.single.apply()
            }
          }
        }