How Java Hadoop Mapper can send multiple values

Mike B picture Mike B · Mar 31, 2013 · Viewed 9.2k times · Source

My mapper needs to send the following tuples:

<custID,prodID,rate>

And I want to send to reducer the custID as a key, and as value the prodID and rate together, as they are needed for the reduce phase. Which is the best way of doing this?

public void map(Object key, Text value, Context context) 
        throws IOException, InterruptedException {

    String[] col = value.toString().split(",");
    custID.set(col[0]);
    data.set(col[1] + "," + col[2]);
    context.write(custID, data);
}

public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

    for (Text val : values) {
        String[] temp = val.toString().split(",");
        Text rate = new Text(temp[1]);
        result.set(rate);
        context.write(key, result);
    }
}

Answer

USB picture USB · Aug 23, 2014

The best way is to write CustomWritables

This is for double value. You can change that to Text or String

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;


/**
 * @author Unmesha SreeVeni U.B
 *
 */
public class TwovalueWritable implements Writable {
    private double first;
    private double second;

    public  TwovalueWritable() {
        set(first, second);
    }
    public  TwovalueWritable(double first, double second) {
        set(first, second);
    }
    public void set(double first, double second) {
        this.first = first;
        this.second = second;
    }
    public double getFirst() {
        return first;
    }
    public double getSecond() {
        return second;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeDouble(first);
        out.writeDouble(second);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        first = in.readDouble();
        second = in.readDouble();
    }

    /* (non-Javadoc)
     * @see java.lang.Object#hashCode()
     */
    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        long temp;
        temp = Double.doubleToLongBits(first);
        result = prime * result + (int) (temp ^ (temp >>> 32));
        temp = Double.doubleToLongBits(second);
        result = prime * result + (int) (temp ^ (temp >>> 32));
        return result;
    }
    /* (non-Javadoc)
     * @see java.lang.Object#equals(java.lang.Object)
     */
    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (!(obj instanceof TwovalueWritable)) {
            return false;
        }
        TwovalueWritable other = (TwovalueWritable) obj;
        if (Double.doubleToLongBits(first) != Double
                .doubleToLongBits(other.first)) {
            return false;
        }
        if (Double.doubleToLongBits(second) != Double
                .doubleToLongBits(other.second)) {
            return false;
        }
        return true;
    }
    @Override
    public String toString() {
        return first + "," + second;
    }
}

And from mapper you can just emit it as

context.write(key,new TwovalueWritable(prodID,rate));

Hope this helps.