How do I access DistributedCache in Hadoop Map/Reduce jobs?

sa125 picture sa125 · May 13, 2012 · Viewed 7.3k times · Source

I'm trying to pass a small file to a job I'm running using the GenericOptionsParser's -files flag:

$ hadoop jar MyJob.jar -conf /path/to/cluster-conf.xml -files /path/to/local-file.csv data/input data/output

This is supposed to send the job to my cluster and attach the local-file.csv to be available to the Mapper/Reducer when needed. It worked great when I ran this in pseudo distributed mode, but when I launch the job on the cluster it seems the file cannot be found. I'm reading the file in my mapper's setup method like so:

public static class TheMapper extends Mapper<LongWritable, Text, Text, Text> {

  @Override
  public void setup(Context context) throws IOException, InterruptedException {

    URI[] uriList = DistributedCache.getCacheFiles( context.getConfiguration() );
    CsvReader csv = new CsvReader(uriList[0].getPath());

    // work with csv file..
  }

  // ..
}

When the job is running, I get the following exception:

java.io.FileNotFoundException: File /hdfs/tmp/mapred/staging/hduser/.staging/job_201205112311_011/files/local-file.csv does not exist.
at com.csvreader.CsvReader.<init>(Unknown Source)
at com.csvreader.CsvReader.<init>(Unknown Source)
at com.csvreader.CsvReader.<init>(Unknown Source)
at MyJob$TheMapper.setup(MyJob.java:167)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
...

Any idea what I'm doing wrong? thanks.

Answer

Chris White picture Chris White · May 13, 2012

This is a common problem - the -files option works as an aside from the DistributedCache.

When you use -files, the GenericOptionsParser configures a job property called tmpfiles, while the DistributedCache uses a property called mapred.cache.files.

Also distributed cache expects the files to already be in HDFS and copies them down to the task nodes, where as -files copies to files to HDFS at job submission and then copies them to each task node.

In your case, to make your code work, just create a File object and name the file you passed in (obviously this requires you to know the filename of the local file, and hard code it into your mapper code). The file will be in the current working directory:

@Override
public void setup(Context context) throws IOException, InterruptedException {
  CsvReader csv = new CsvReader(new File("local-file.csv"));

  // work with csv file..

}