I'm doing some data preparation using a single node hadoop job. The mapper/combiner in my job outputs many keys (more than 5M or 6M) and obviously the job proceeds slowly or even fails. The mapping phase runs up to 120 mapper and there is just one reducer (these are automatically determined and I've not set any values for them). I want to optimize the job so that shuffling/sorting phase occurs more efficiently. I increased mapreduce.task.io.sort.mb
to 300m but the job faile because its value was larger than mapper heap. I then set mapred.child.java.opts
to -Xmx1024m but it again failed because it couldn't initialize an Output Collector. What are the best practices for these scenarios?
To begin with, since you are using a single node cluster, there is not much optimization you can do. You will have limited number of containers/slots on a single node cluster and for the amount of data you are processing (5 to 6 million keys), your jobs will always run slow and may also fail.
I am going to answer this question for a fully distributed Hadoop setup. There is a section ("Shuffle and Sort") in the book "Hadoop The Definitive Guide", which you should read for tuning the Shuffle and Sort phase. My answer is mainly influenced by the contents of this section and also my own experience with tuning the MapReduce jobs.
You can do the following to achieve the Shuffle and Sort efficiency:
mapreduce.job.reduce.slowstart.completedmaps
in YARN. It will not start the reducers until a certain percentage of mappers are completed. It is by default set to "0.05" (It means reducers start after 5% of mappers are completed). If the reducers are started early, then most of the reducers are idle, till all the mappers are completed. Also, the reducers may consume the slots, which could otherwise be used by the mappers for processing. By controlling this, you can use the mapper/reducers slots optimally and improve the time spent during the shuffle.mapreduce.map.output.compress
), so that lesser data gets written to disk and gets transferred to reducers. Following are the other configuration parameters which can be tuned to improve the Shuffle and Sort phase performance (see the description of these configurations here: https://hadoop.apache.org/docs/r2.4.1/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml):
mapreduce.map.sort.spill.percent
: Determines the threshold for the in memory buffer used by the mapper. When this threshold is reached, the the contents of the buffer are spilled to disk. So this value determines the number of spills to diskmapreduce.task.io.sort.factor
: Minimum number of streams to be merged at once, during sorting. So, on the reducer side, if there are 50 mapper outputs and this value is set to 10, then there will be 5 rounds of merging (on an average 10 files for merge round).mapreduce.shuffle.max.threads
: Number of worker threads for copying the map outputs to reducers. mapreduce.reduce.shuffle.input.buffer.percent
: How much of heap should be used for storing the map output, during the shuffle phase in the reducer. This setting determines the amount of mapper output that can be held in memory, before it is spilled to disk.mapreduce.reduce.shuffle.merge.percent
: Threshold for starting the process of merge and spilling to diskmapreduce.reduce.merge.inmem.threshold
: Number of map outputs needed for starting the merge process. When either mapreduce.reduce.shuffle.merge.percent
or mapreduce.reduce.merge.inmem.threshold
is reached, then the map outputs are merged and spilled to disk.