saving json data in hdfs in hadoop

frazman picture frazman · Jun 4, 2013 · Viewed 8.6k times · Source

I have the following Reducer class

public static class TokenCounterReducer extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        JSONObject jsn = new JSONObject();

        for (Text value : values) {
            String[] vals = value.toString().split("\t");
            String[] targetNodes = vals[0].toString().split(",",-1);
            jsn.put("source",vals[1] );
            jsn.put("target",targetNodes);

        }
        // context.write(key, new Text(sum));
    }
}

Going thru examples (disclaimer: newbie here), I can see that the general output type seems to be like a key/value store.

But what if I dont have any key in the output? or what if I want if my output is in some other format (json in my case)?

Anyways, from the above code: I want to write json object to HDFS?

It was very trivial in Hadoop streaming.. but how do i do it in Hadoop java?

Answer

Tariq picture Tariq · Jun 4, 2013

You can use Hadoop's OutputFormat interfaces to create your custom formats which will write the data as per your wish. For instance if you need data to be written as a JSON object then you could do this :

public class JsonOutputFormat extends TextOutputFormat<Text, IntWritable> {
    @Override
    public RecordWriter<Text, IntWritable> getRecordWriter(
            TaskAttemptContext context) throws IOException, 
                  InterruptedException {
        Configuration conf = context.getConfiguration();
        Path path = getOutputPath(context);
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream out = 
                fs.create(new Path(path,context.getJobName()));
        return new JsonRecordWriter(out);
    }

    private static class JsonRecordWriter extends 
          LineRecordWriter<Text,IntWritable>{
        boolean firstRecord = true;
        @Override
        public synchronized void close(TaskAttemptContext context)
                throws IOException {
            out.writeChar('{');
            super.close(null);
        }

        @Override
        public synchronized void write(Text key, IntWritable value)
                throws IOException {
            if (!firstRecord){
                out.writeChars(",\r\n");
                firstRecord = false;
            }
            out.writeChars("\"" + key.toString() + "\":\""+
                    value.toString()+"\"");
        }

        public JsonRecordWriter(DataOutputStream out) 
                throws IOException{
            super(out);
            out.writeChar('}');
        }
    }
}

And if you do not want to have the key in your output just emit null, like :

context.write(NullWritable.get(), new IntWritable(sum));

HTH