Can Hadoop mapper produce multiple keys in output?

Monis Iqbal picture Monis Iqbal · May 25, 2011 · Viewed 10.3k times · Source

Can a single Mapper class produce multiple key-value pairs (of same type) in a single run?

We output the key-value pair in the mapper like this:

context.write(key, value);

Here's a trimmed down (and exemplified) version of the Key:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;


public class MyKey extends ObjectWritable implements WritableComparable<MyKey> {

    public enum KeyType {
        KeyType1,
        KeyType2
    }

    private KeyType keyTupe;
    private Long field1;
    private Integer field2 = -1;
    private String field3 = "";


    public KeyType getKeyType() {
        return keyTupe;
    }

    public void settKeyType(KeyType keyType) {
        this.keyTupe = keyType;
    }

    public Long getField1() {
        return field1;
    }

    public void setField1(Long field1) {
        this.field1 = field1;
    }

    public Integer getField2() {
        return field2;
    }

    public void setField2(Integer field2) {
        this.field2 = field2;
    }


    public String getField3() {
        return field3;
    }

    public void setField3(String field3) {
        this.field3 = field3;
    }

    @Override
    public void readFields(DataInput datainput) throws IOException {
        keyTupe = KeyType.valueOf(datainput.readUTF());
        field1 = datainput.readLong();
        field2 = datainput.readInt();
        field3 = datainput.readUTF();
    }

    @Override
    public void write(DataOutput dataoutput) throws IOException {
        dataoutput.writeUTF(keyTupe.toString());
        dataoutput.writeLong(field1);
        dataoutput.writeInt(field2);
        dataoutput.writeUTF(field3);
    }

    @Override
    public int compareTo(MyKey other) {
        if (getKeyType().compareTo(other.getKeyType()) != 0) {
            return getKeyType().compareTo(other.getKeyType());
        } else if (getField1().compareTo(other.getField1()) != 0) {
            return getField1().compareTo(other.getField1());
        } else if (getField2().compareTo(other.getField2()) != 0) {
            return getField2().compareTo(other.getField2());
        } else if (getField3().compareTo(other.getField3()) != 0) {
            return getField3().compareTo(other.getField3());
        } else {
            return 0;
        }
    }

    public static class MyKeyComparator extends WritableComparator {
        public MyKeyComparator() {
            super(MyKey.class);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return compareBytes(b1, s1, l1, b2, s2, l2);
        }
    }

    static { // register this comparator
        WritableComparator.define(MyKey.class, new MyKeyComparator());
    }
}

And this is how we tried to output both keys in the Mapper:

MyKey key1 = new MyKey();
key1.settKeyType(KeyType.KeyType1);
key1.setField1(1L);
key1.setField2(23);

MyKey key2 = new MyKey();
key2.settKeyType(KeyType.KeyType2);
key2.setField1(1L);
key2.setField3("abc");

context.write(key1, value1);
context.write(key2, value2);

Our job's output format class is: org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat

I'm stating this because in other output format classes I've seen the output not appending and just committing in their implementation of write method.

Also, we are using the following classes for Mapper and Context: org.apache.hadoop.mapreduce.Mapper org.apache.hadoop.mapreduce.Context

Answer

ajduff574 picture ajduff574 · May 26, 2011

Writing to the context multiple times in one map task is perfectly fine.

However, you may have several problems with your key class. Whenever you implement WritableComparable for a key, you should also implement equals(Object) and hashCode() methods. These aren't part of the WritableComparable interface, since they are defined in Object, but you must provide implementations.

The default partitioner uses the hashCode() method to decide which reducer each key/value pair goes to. If you don't provide a sane implementation, you can get strange results.

As a rule of thumb, whenever you implement hashCode() or any sort of comparison method, you should provide an equals(Object) method as well. You will have to make sure it accepts an Object as the parameter, as this is how it is defined in the Object class (whose implementation you are probably overriding).