I'm a newbie to Apache Spark and was learning basic functionalities. Had a small doubt.Suppose I have an RDD of tuples (key, value) and wanted to obtain some unique ones out of them. I use distinct() function. I'm wondering on what basis does the function consider that tuples as disparate..? Is it based on the keys, or values, or both?
.distinct() is definitely doing a shuffle across partitions. To see more of what's happening, run a .toDebugString on your RDD.
val hashPart = new HashPartitioner(<number of partitions>)
val myRDDPreStep = <load some RDD>
val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER)
myRDD.checkpoint
println(myRDD.toDebugString)
which for an RDD example I have (myRDDPreStep is already hash-partitioned by key, persisted by StorageLevel.MEMORY_AND_DISK_SER, and checkpointed), returns:
(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
+-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated]
| myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated]
| CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B
| myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated]
Note that there may be more efficient ways to get a distinct that involve fewer shuffles, ESPECIALLY if your RDD is already partitioned in a smart way and the partitions are not overly skewed.
See Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct? and Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?