Can anyone explain the difference between reducebykey,groupbykey,aggregatebykey and combinebykey? I have read the documents regarding this , but couldn't understand the exact differences?
If you can explain it with examples it would be great.
groupByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" ") )
.map(word => (word,1))
.groupByKey()
.map((x,y) => (x,sum(y)))
groupByKey can cause out of disk problems as data is sent over the network and collected on the reduce workers.
reduceByKey:
Syntax:
sparkContext.textFile("hdfs://")
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey((x,y)=> (x+y))
Data are combined at each partition, only one output for one key at each partition to send over the network. reduceByKey required combining all your values into another value with the exact same type.
aggregateByKey:
same as reduceByKey, which takes an initial value.
3 parameters as input i. initial value ii. Combiner logic iii. sequence op logic
Example:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
//Create key value pairs
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
ouput: Aggregate By Key sum Results bar -> 3 foo -> 5
combineByKey:
3 parameters as input
Example:
val result = rdd.combineByKey(
(v) => (v,1),
( (acc:(Int,Int),v) => acc._1 +v , acc._2 +1 ) ,
( acc1:(Int,Int),acc2:(Int,Int) => (acc1._1+acc2._1) , (acc1._2+acc2._2))
).map( { case (k,v) => (k,v._1/v._2.toDouble) })
result.collect.foreach(println)
reduceByKey,aggregateByKey,combineByKey preferred over groupByKey
Reference: Avoid groupByKey