How does Distinct() function work in Spark?

preetham madeti picture preetham madeti · Jun 21, 2015 · Viewed 79k times · Source

I'm a newbie to Apache Spark and was learning basic functionalities. Had a small doubt.Suppose I have an RDD of tuples (key, value) and wanted to obtain some unique ones out of them. I use distinct() function. I'm wondering on what basis does the function consider that tuples as disparate..? Is it based on the keys, or values, or both?

Answer

Glenn Strycker picture Glenn Strycker · Jun 30, 2015

.distinct() is definitely doing a shuffle across partitions. To see more of what's happening, run a .toDebugString on your RDD.

val hashPart = new HashPartitioner(<number of partitions>)

val myRDDPreStep = <load some RDD>

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)

which for an RDD example I have (myRDDPreStep is already hash-partitioned by key, persisted by StorageLevel.MEMORY_AND_DISK_SER, and checkpointed), returns:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    |    ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
        |    myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
        |        CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
        |    myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]

Note that there may be more efficient ways to get a distinct that involve fewer shuffles, ESPECIALLY if your RDD is already partitioned in a smart way and the partitions are not overly skewed.

See Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? and Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?