I am using yelps MRJob library for achieving map-reduce functionality. I know that map reduce has an internal sort and shuffle algorithm which sorts the values on the basis of their keys. So if I have the following results after map phase
(1, 24) (4, 25) (3, 26)
I know the sort and shuffle phase will produce following output
(1, 24) (3, 26) (4, 25)
Which is as expected
But if I have two similar keys and different values why does the sort and shuffle phase sorts the data on the basis of first value that appears?
For example If I have the following list of values from mapper
(2, <25, 26>) (1, <24, 23>) (1, <23, 24>)
The expected output is
(1, <24, 23>) (1, <23, 24>) (2, <25, 26>)
But the output that I am getting is
(1, <23, 24>) (1, <24, 23>) (2, <25, 26>)
is this MRjob library specific? Is there anyway to stop this sorting on the basis of values??
CODE
from mrjob.job import MRJob
import math
class SortMR(MRJob):
def steps(self):
return [
self.mr(mapper=self.rangemr,
reducer=self.rangesort)]
def rangemr(self, key, line):
for a in line.split():
yield 1,a
def rangesort(self,numid,line):
for a in line:
yield(1, a)
if __name__ == '__main__':
SortMR.run()
The only way to 'sort' the values is to use a composite key which contains some information from the value itself. Your key's compareTo method can then ensure that the keys are sorted first by the actual key component, then by the value component. Finally you'll need a group partitioner to ensure that in the reducer all the keys with the same 'key' component (the actual key) are considered equal, and the associated values iterated over in one call to the reduce method.
This is known as a 'secondary sort', a question similar to this one provides some links to examples.