How to partition RDD by key in Spark?

BAR picture BAR · Sep 13, 2015 · Viewed 34.9k times · Source

Given that the HashPartitioner docs say:

[HashPartitioner] implements hash-based partitioning using Java's Object.hashCode.

Say I want to partition DeviceData by its kind.

case class DeviceData(kind: String, time: Long, data: String)

Would it be correct to partition an RDD[DeviceData] by overwriting the deviceData.hashCode() method and use only the hashcode of kind?

But given that HashPartitioner takes a number of partitions parameter I am confused as to whether I need to know the number of kinds in advance and what happens if there are more kinds than partitions?

Is it correct that if I write partitioned data to disk it will stay partitioned when read?

My goal is to call

  deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)

And have only DeviceData's of the same kind value in the iterator.

Answer

Justin Pihony picture Justin Pihony · Sep 13, 2015

How about just doing a groupByKey using kind. Or another PairRDDFunctions method.

You make it seem to me that you don't really care about the partitioning, just that you get all of a specific kind in one processing flow?

The pair functions allow this:

rdd.keyBy(_.kind).partitionBy(new HashPartitioner(PARTITIONS))
   .foreachPartition(...)

However, you can probably be a little safer with something more like:

rdd.keyBy(_.kind).reduceByKey(....)

or mapValues or a number of the other pair functions that guarantee you get the pieces as a whole