I have the following Reducer class
public static class TokenCounterReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
JSONObject jsn = new JSONObject();
for (Text value : values) {
String[] vals = value.toString().split("\t");
String[] targetNodes = vals[0].toString().split(",",-1);
jsn.put("source",vals[1] );
jsn.put("target",targetNodes);
}
// context.write(key, new Text(sum));
}
}
Going thru examples (disclaimer: newbie here), I can see that the general output type seems to be like a key/value store.
But what if I dont have any key in the output? or what if I want if my output is in some other format (json in my case)?
Anyways, from the above code:
I want to write json
object to HDFS?
It was very trivial in Hadoop streaming.. but how do i do it in Hadoop java?
You can use Hadoop's OutputFormat interfaces to create your custom formats which will write the data as per your wish. For instance if you need data to be written as a JSON object then you could do this :
public class JsonOutputFormat extends TextOutputFormat<Text, IntWritable> {
@Override
public RecordWriter<Text, IntWritable> getRecordWriter(
TaskAttemptContext context) throws IOException,
InterruptedException {
Configuration conf = context.getConfiguration();
Path path = getOutputPath(context);
FileSystem fs = path.getFileSystem(conf);
FSDataOutputStream out =
fs.create(new Path(path,context.getJobName()));
return new JsonRecordWriter(out);
}
private static class JsonRecordWriter extends
LineRecordWriter<Text,IntWritable>{
boolean firstRecord = true;
@Override
public synchronized void close(TaskAttemptContext context)
throws IOException {
out.writeChar('{');
super.close(null);
}
@Override
public synchronized void write(Text key, IntWritable value)
throws IOException {
if (!firstRecord){
out.writeChars(",\r\n");
firstRecord = false;
}
out.writeChars("\"" + key.toString() + "\":\""+
value.toString()+"\"");
}
public JsonRecordWriter(DataOutputStream out)
throws IOException{
super(out);
out.writeChar('}');
}
}
}
And if you do not want to have the key in your output just emit null, like :
context.write(NullWritable.get(), new IntWritable(sum));
HTH