Hadoop streaming - remove trailing tab from reducer output

Eddified picture Eddified · Aug 8, 2013 · Viewed 7.4k times · Source

I have a hadoop streaming job whose output does not contain key/value pairs. You can think of it as value-only pairs or key-only pairs.

My streaming reducer (a php script) is outputting records separated by newlines. Hadoop streaming treats this as a key with no value, and inserts a tab before the newline. This extra tab is unwanted.

How do I remove it?

I am using hadoop 1.0.3 with AWS EMR. I downloaded the source of hadoop 1.0.3 and found this code in hadoop-1.0.3/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java :

reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");

So I tried passing -D stream.reduce.output.field.separator= as an argument to the job with no luck. I also tried -D mapred.textoutputformat.separator= and -D mapreduce.output.textoutputformat.separator= with no luck.

I've searched google of course and nothing I found worked. One search result even stated there was no argument that could be passed to achieve the desired result (though, the hadoop version in that case was really really old).

Here is my code (with added line breaks for readability):

hadoop jar streaming.jar -files s3n://path/to/a/file.json#file.json
    -D mapred.output.compress=true -D stream.reduce.output.field.separator=
    -input s3n://path/to/some/input/*/* -output hdfs:///path/to/output/dir
    -mapper 'php my_mapper.php' -reducer 'php my_reducer.php'

Answer

Matt S. picture Matt S. · Aug 15, 2013

As helpful for others, using the tips above, I was able to do an implementation:

CustomOutputFormat<K, V> extends org.apache.hadoop.mapred.TextOutputFormat<K, V> {....}

with exactly one line of the built-in implementation of 'getRecordWriter' changed to:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); 

instead of:

String keyValueSeparator = job.get("mapred.textoutputformat.separator", "\t"); 

after compiling that into a Jar, and including it into my hadoop streaming call (via the instructions on hadoop streaming), the call looked like:

hadoop   jar  /usr/lib/hadoop/contrib/streaming/hadoop-streaming-1.0.3.jar     \
-archives 'hdfs:///user/the/path/to/your/jar/onHDFS/theNameOfTheJar.jar' \
-libjars theNameOfTheJar.jar \
-outputformat com.yourcompanyHere.package.path.tojavafile.CustomOutputFormat  \
-file yourMapper.py    -mapper  yourMapper.py     \
-file yourReducer.py   -reducer yourReducer.py    \
-input $yourInputFile    \
-output $yourOutputDirectoryOnHDFS

I also included the jar in the folder I issued that call from.

It was working great for my needs (and it created no tabs at the end of the line after the reducer).


update: based on a comment implying this is indeed helpful for others, here's the full source of my CustomOutputFormat.java file:

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;

public class CustomOutputFormat<K, V> extends TextOutputFormat<K, V> {

    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name,
        Progressable progress) throws IOException {
    boolean isCompressed = getCompressOutput(job);

    //Channging the default from '\t' to blank
    String keyValueSeparator = job.get("mapred.textoutputformat.separator", ""); // '\t'
    if (!isCompressed) {
        Path file = FileOutputFormat.getTaskOutputPath(job, name);
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
        Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
            GzipCodec.class);
        // create the named codec
        CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
        // build the filename including the extension
        Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
        FileSystem fs = file.getFileSystem(job);
        FSDataOutputStream fileOut = fs.create(file, progress);
        return new LineRecordWriter<K, V>(new DataOutputStream(
            codec.createOutputStream(fileOut)), keyValueSeparator);
    }
    }
}

FYI: For your usage context, be sure to check this does not adversely affect hadoop-streaming managed interactions (in terms of separating key vs. value) between your mapper and reducer. To clarify:

  • From my testing -- if you have a 'tab' in every line of your data (with something on each side of it), you can leave the built in defaults as they are: streaming will interpret the first thing before the first tab as your 'key', and all on that row after it as your 'value.' As such, it does not see a 'null value,' and won't append a tab that shows up after your reducer. (You'll see your final outputs sorted on the value of the 'key' that streaming interprets in each row as what it sees as occuring before each tab.)

  • Conversely, if you have no tabs in your data, and you don't override the defaults using the above trick(s), then you'll see the tabs after the run completes, for which the above override becomes a fix.