I am new to Pig and I want to convert a bag of tuples to a map with specific value in each tuple as key. Basically I want to change:
{(id1, value1),(id2, value2), ...}
into [id1#value1, id2#value2]
I've been looking around online for a while, but I can't seem to find a solution. I've tried:
bigQMap = FOREACH bigQFields GENERATE TOMAP(queryId, queryStart);
but I end up with a bag of maps (e.g. {[id1#value1], [id2#value2], ...}
), which is not what I want. How can I build up a map out of a bag of key-value tuple?
Below is the specific script I'm trying to run, in case it's relevant
rawlines = LOAD '...' USING PigStorage('`');
bigQFields = FOREACH bigQLogs GENERATE GFV(*,'queryId')
as queryId, GFV(*, 'queryStart')
as queryStart;
bigQMap = ?? how to make a map with queryId as key and queryStart as value ?? ;
TOMAP
takes a series of pairs and converts them into the map, so it is meant to be used like:
-- Schema: A:{foo:chararray, bar:int, bing:chararray, bang:int}
-- Data: (John, 27, Joe, 30)
B = FOREACH A GENERATE TOMAP(foo, bar, bing, bang) AS m ;
-- Schema: B:{m: map[]}
-- Data: (John#27,Joe#30)
So as you can see the syntax does not support converting a bag to a map. As far as I know there is no way to convert a bag in the format you have to map in pure pig. However, you can definitively write a java UDF to do this.
NOTE: I'm not too experienced with java, so this UDF can easily be improved on (adding exception handling, what happens if a key added twice etc.). However, it does accomplish what you need it to.
package myudfs;
import java.io.IOException;
import org.apache.pig.EvalFunc;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.DataBag;
public class ConvertToMap extends EvalFunc<Map>
{
public Map exec(Tuple input) throws IOException {
DataBag values = (DataBag)input.get(0);
Map<Object, Object> m = new HashMap<Object, Object>();
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
m.put(t.get(0), t.get(1));
}
return m;
}
}
Once you compile the script into a jar, it can be used like:
REGISTER myudfs.jar ;
-- A is loading some sample data I made
A = LOAD 'foo.in' AS (foo:{T:(id:chararray, value:chararray)}) ;
B = FOREACH A GENERATE myudfs.ConvertToMap(foo) AS bar;
Contents of foo.in
:
{(open,apache),(apache,hadoop)}
{(foo,bar),(bar,foo),(open,what)}
Output from B
:
([open#apache,apache#hadoop])
([bar#foo,open#what,foo#bar])
Another approach is to use python to create the UDF:
#!/usr/bin/python
@outputSchema("foo:map[]")
def BagtoMap(bag):
d = {}
for key, value in bag:
d[key] = value
return d
Which is used like this:
Register 'myudfs.py' using jython as myfuncs;
-- A is still just loading some of my test data
A = LOAD 'foo.in' AS (foo:{T:(key:chararray, value:chararray)}) ;
B = FOREACH A GENERATE myfuncs.BagtoMap(foo) ;
And produces the same output as the Java UDF.
BONUS:
Since I don't like maps very much, here is a link explaining how the functionality of a map can be replicated with just key value pairs. Since your key value pairs are in a bag, you'll need to do the map-like operations in a nested FOREACH
:
-- A is a schema that contains kv_pairs, a bag in the form {(id, value)}
B = FOREACH A {
temp = FOREACH kv_pairs GENERATE (key=='foo'?value:NULL) ;
-- Output is like: ({(),(thevalue),(),()})
-- MAX will pull the maximum value from the filtered bag, which is
-- value (the chararray) if the key matched. Otherwise it will return NULL.
GENERATE MAX(temp) as kv_pairs_filtered ;
}