Transform bag of key-value tuples to map in Apache Pig

Penguinator picture Penguinator · Jul 25, 2013 · Viewed 10k times · Source

I am new to Pig and I want to convert a bag of tuples to a map with specific value in each tuple as key. Basically I want to change:

{(id1, value1),(id2, value2), ...} into [id1#value1, id2#value2]

I've been looking around online for a while, but I can't seem to find a solution. I've tried:

bigQMap = FOREACH bigQFields GENERATE TOMAP(queryId, queryStart);

but I end up with a bag of maps (e.g. {[id1#value1], [id2#value2], ...}), which is not what I want. How can I build up a map out of a bag of key-value tuple?

Below is the specific script I'm trying to run, in case it's relevant

rawlines = LOAD '...' USING PigStorage('`');
bigQFields = FOREACH bigQLogs GENERATE GFV(*,'queryId')
   as queryId, GFV(*, 'queryStart')
   as queryStart;
bigQMap = ?? how to make a map with queryId as key and queryStart as value ?? ;

Answer

mr2ert picture mr2ert · Jul 25, 2013

TOMAP takes a series of pairs and converts them into the map, so it is meant to be used like:

-- Schema: A:{foo:chararray, bar:int, bing:chararray, bang:int}
-- Data:     (John,          27,      Joe,            30)
B = FOREACH A GENERATE TOMAP(foo, bar, bing, bang) AS m ;
-- Schema: B:{m: map[]}
-- Data:     (John#27,Joe#30)

So as you can see the syntax does not support converting a bag to a map. As far as I know there is no way to convert a bag in the format you have to map in pure pig. However, you can definitively write a java UDF to do this.

NOTE: I'm not too experienced with java, so this UDF can easily be improved on (adding exception handling, what happens if a key added twice etc.). However, it does accomplish what you need it to.

package myudfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;

import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;

public class ConvertToMap extends EvalFunc<Map>
{
    public Map exec(Tuple input) throws IOException {
        DataBag values = (DataBag)input.get(0);
        Map<Object, Object> m = new HashMap<Object, Object>();
        for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
            Tuple t = it.next();
            m.put(t.get(0), t.get(1));
        }
        return m;
    }
}

Once you compile the script into a jar, it can be used like:

REGISTER myudfs.jar ;
-- A is loading some sample data I made
A = LOAD 'foo.in' AS (foo:{T:(id:chararray, value:chararray)}) ;
B = FOREACH A GENERATE myudfs.ConvertToMap(foo) AS bar;

Contents of foo.in:

{(open,apache),(apache,hadoop)}
{(foo,bar),(bar,foo),(open,what)}

Output from B:

([open#apache,apache#hadoop])
([bar#foo,open#what,foo#bar])

Another approach is to use python to create the UDF:

myudfs.py

#!/usr/bin/python

@outputSchema("foo:map[]")
def BagtoMap(bag):
    d = {}
    for key, value in bag:
        d[key] = value
    return d

Which is used like this:

Register 'myudfs.py' using jython as myfuncs;
-- A is still just loading some of my test data
A = LOAD 'foo.in' AS (foo:{T:(key:chararray, value:chararray)}) ;
B = FOREACH A GENERATE myfuncs.BagtoMap(foo) ;

And produces the same output as the Java UDF.


BONUS: Since I don't like maps very much, here is a link explaining how the functionality of a map can be replicated with just key value pairs. Since your key value pairs are in a bag, you'll need to do the map-like operations in a nested FOREACH:

-- A is a schema that contains kv_pairs, a bag in the form {(id, value)}
B = FOREACH A {
    temp = FOREACH kv_pairs GENERATE (key=='foo'?value:NULL) ;
    -- Output is like: ({(),(thevalue),(),()})

    -- MAX will pull the maximum value from the filtered bag, which is 
    -- value (the chararray) if the key matched. Otherwise it will return NULL.
    GENERATE MAX(temp) as kv_pairs_filtered ;
}