Hadoop - Produce multiple values for a single key

Ramesh picture Ramesh · Jun 20, 2013 · Viewed 10.9k times · Source

I was able to successfully change the wordcount program in hadoop to suit my requirement. However, I have another situation where in I use the same key for 3 values. Let's say my input file is as below.

A Uppercase 1 firstnumber  I  romannumber a lowercase
B Uppercase 2 secondnumber II romannumber b lowercase

Currently in my map/reduce program, I am doing something like below. Here A is the key and 1 is the value.

A 1

I need my map reduce to perform something like below.

A 1 I a 

I can do them in 3 different programs like below and can produce the output.

A 1
A I
A a

However, I want them to do in a single program itself. Basically, from my map function I want to do this.

context.write(key,value1);
context.write(key,value2);
context.write(key,value3);

Is there any way I can do it in the same program rather than writing three different programs?

EDIT:

Let me provide a much more clearer example. I need to do something like below.

A uppercase 1 firstnumber  1.0 floatnumber str stringchecking
A uppercase 2 secondnumber 2.0 floatnumber ing stringchecking

My final output would be,

A 3 3.0 string

3 is the sum of two integers, 3.0 being sum of float numbers and string is the concatenation of two strings.

Answer

Mike Park picture Mike Park · Jun 20, 2013

First you'll need a composite writable for all three of your values.

public class CompositeWritable implements Writable {
    int val1 = 0;
    float val2 = 0;
    String val3 = "";

    public CompositeWritable() {}

    public CompositeWritable(int val1, float val2, String val3) {
        this.val1 = val1;
        this.val2 = val2;
        this.val3 = val3;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        val1 = in.readInt();
        val2 = in.readFloat();
        val3 = WritableUtils.readString(in);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(val1);
        out.writeFloat(val2);
        WritableUtils.writeString(out, val3);
    }

    public void merge(CompositeWritable other) {
        this.val1 += other.val1;
        this.val2 += other.val2;
        this.val3 += other.val3;
    }

    @Override
    public String toString() {
        return this.val1 + "\t" + this.val2 + "\t" + this.val3;
    }
}

Then in your reduce you'll do something like this...

public void reduce(Text key, Iterable<CompositeWritable> values, Context ctx) throws IOException, InterruptedException{

    CompositeWritable out;

    for (CompositeWritable next : values)
    {
        out.merge(next);
    }

    ctx.write(key, out);
}

Your mapper will simply output one CompositeWritable per map.

I haven't tried to compile this, but the general idea is there.