How to optimize shuffling/sorting phase in a hadoop job

HHH picture HHH · Dec 9, 2015 · Viewed 14.2k times · Source

I'm doing some data preparation using a single node hadoop job. The mapper/combiner in my job outputs many keys (more than 5M or 6M) and obviously the job proceeds slowly or even fails. The mapping phase runs up to 120 mapper and there is just one reducer (these are automatically determined and I've not set any values for them). I want to optimize the job so that shuffling/sorting phase occurs more efficiently. I increased mapreduce.task.io.sort.mb to 300m but the job faile because its value was larger than mapper heap. I then set mapred.child.java.opts to -Xmx1024m but it again failed because it couldn't initialize an Output Collector. What are the best practices for these scenarios?

Answer

Manjunath Ballur picture Manjunath Ballur · Dec 10, 2015

To begin with, since you are using a single node cluster, there is not much optimization you can do. You will have limited number of containers/slots on a single node cluster and for the amount of data you are processing (5 to 6 million keys), your jobs will always run slow and may also fail.

I am going to answer this question for a fully distributed Hadoop setup. There is a section ("Shuffle and Sort") in the book "Hadoop The Definitive Guide", which you should read for tuning the Shuffle and Sort phase. My answer is mainly influenced by the contents of this section and also my own experience with tuning the MapReduce jobs.

You can do the following to achieve the Shuffle and Sort efficiency:

  • Combiner: Using combiner will reduce the amount of data transferred to each of the the reducers, since combiner merges the output on the mapper side.
  • Number of reducers: Choose optimal number of reducers. If data size is huge, then one reducer is not a good idea. Also, setting the number of reducers to a high number, is not a good idea, since the number of reducers also determines the number of partitions on the mapper side. Look at the link here: https://github.com/paulhoule/infovore/wiki/Choosing-the-number-of-reducers
  • When to start the reducers:; You can control, when the reduce tasks are started. This is determined by configuration mapreduce.job.reduce.slowstart.completedmaps in YARN. It will not start the reducers until a certain percentage of mappers are completed. It is by default set to "0.05" (It means reducers start after 5% of mappers are completed). If the reducers are started early, then most of the reducers are idle, till all the mappers are completed. Also, the reducers may consume the slots, which could otherwise be used by the mappers for processing. By controlling this, you can use the mapper/reducers slots optimally and improve the time spent during the shuffle.
  • Compress Mapper Output: Its recommended to compress the mapper outputs (determined by configuration: mapreduce.map.output.compress), so that lesser data gets written to disk and gets transferred to reducers.
  • Tune config "mapreduce.task.io.sort.mb": Increase the buffer size used by the mappers during the sorting. This will reduce the number of spills to the disk.
  • Tune config "mapreduce.reduce.input.buffer.percent": If your reduce task has lesser memory requirements, then this value can be set to a high percentage. This means, higher amount of heap is used for retaining the map outputs during the reduce phase (after the shuffle phase), thus reducing the number of spills to disk.
  • Tune config "mapreduce.reduce.shuffle.parallelcopies": Number of threads used to copy map outputs to reducers. Check the link here: how to tune mapred.reduce.parallel.copies?

Following are the other configuration parameters which can be tuned to improve the Shuffle and Sort phase performance (see the description of these configurations here: https://hadoop.apache.org/docs/r2.4.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml):

  • mapreduce.map.sort.spill.percent: Determines the threshold for the in memory buffer used by the mapper. When this threshold is reached, the the contents of the buffer are spilled to disk. So this value determines the number of spills to disk
  • mapreduce.task.io.sort.factor: Minimum number of streams to be merged at once, during sorting. So, on the reducer side, if there are 50 mapper outputs and this value is set to 10, then there will be 5 rounds of merging (on an average 10 files for merge round).
  • mapreduce.shuffle.max.threads: Number of worker threads for copying the map outputs to reducers.
  • mapreduce.reduce.shuffle.input.buffer.percent: How much of heap should be used for storing the map output, during the shuffle phase in the reducer. This setting determines the amount of mapper output that can be held in memory, before it is spilled to disk.
  • mapreduce.reduce.shuffle.merge.percent: Threshold for starting the process of merge and spilling to disk
  • mapreduce.reduce.merge.inmem.threshold: Number of map outputs needed for starting the merge process. When either mapreduce.reduce.shuffle.merge.percent or mapreduce.reduce.merge.inmem.threshold is reached, then the map outputs are merged and spilled to disk.