I have tried to use the MultipleOutputs
class as per the example in page http://hadoop.apache.org/docs/mapreduce/r0.21.0/api/index.html?org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html
Driver Code
Configuration conf = new Configuration();
Job job = new Job(conf, "Wordcount");
job.setJarByClass(WordCount.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
Text.class, IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
Reducer Code
public class WordCountReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private MultipleOutputs<Text, IntWritable> mos;
public void setup(Context context){
mos = new MultipleOutputs<Text, IntWritable>(context);
}
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
//context.write(key, result);
mos.write("text", key,result);
}
public void cleanup(Context context) {
try {
mos.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
The output of the reducer is found to rename to text-r-00000
But the issue here is that I am also getting an empty part-r-00000 file as well. Is this how MultipleOutputs is expected to behave, or is there some problem with my code? Please advice.
Another alternative I have tried out is to iterate through my output folder using the FileSystem class and manually rename all files beginning with part.
What is the best way?
FileSystem hdfs = FileSystem.get(configuration);
FileStatus fs[] = hdfs.listStatus(new Path(outputPath));
for (FileStatus aFile : fs) {
if (aFile.isDir()) {
hdfs.delete(aFile.getPath(), true);
// delete all directories and sub-directories (if any) in the output directory
}
else {
if (aFile.getPath().getName().contains("_"))
hdfs.delete(aFile.getPath(), true);
// delete all log files and the _SUCCESS file in the output directory
else {
hdfs.rename(aFile.getPath(), new Path(myCustomName));
}
}
Even if you are using MultipleOutputs
, the default OutputFormat
(I believe it is TextOutputFormat
) is still being used, and so it will initialize and creating these part-r-xxxxx
files that you are seeing.
The fact that they are empty is because you are not doing any context.write
because you are using MultipleOutputs
. But that doesn't prevent them from being created during initialization.
To get rid of them, you need to define your OutputFormat
to say you are not expecting any output. You can do it this way:
job.setOutputFormat(NullOutputFormat.class);
With that property set, this should ensure that your part files are never initialized at all, but you still get your output in the MultipleOutputs
.
You could also probably use LazyOutputFormat
which would ensure that your output files are only created when/if there is some data, and not initialize empty files. You could do i this way:
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
Note that you are using in your Reducer
the prototype MultipleOutputs.write(String namedOutput, K key, V value)
, which just uses a default output path that will be generated based on your namedOutput
to something like: {namedOutput}-(m|r)-{part-number}
. If you want to have more control over your output filenames, you should use the prototype MultipleOutputs.write(String namedOutput, K key, V value, String baseOutputPath)
which can allow you to get filenames generated at runtime based on your keys/values.