How to put the files into memory using Hadoop Distributed cache?

Hellen picture Hellen · Dec 12, 2013 · Viewed 13.3k times · Source

As far as I know, distributed cache copies files to every node, then map or reduce reads the files from the local file system.

My question is: Is there a way that we can put our files into memory using Hadoop distributed cache so that every map or reduce can read files directly from memory?

My MapReduce program distributes a png picture which is about 1M to every node, then every map task reads the picture from the distributed cache and does some image processing with another picture from the input of the map.

Answer

user. picture user. · Dec 17, 2013
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {

          Path[] uris = DistributedCache.getLocalCacheFiles(context
                    .getConfiguration());





                    try{
                        BufferedReader readBuffer1 = new BufferedReader(new FileReader(uris[0].toString()));
                        String line;
                        while ((line=readBuffer1.readLine())!=null){
                            System.out.println(line);

                        }
                        readBuffer1.close(); 
                    }       
                    catch (Exception e){
                        System.out.println(e.toString());
                    }

                  StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      int length=key.getLength();
      System.out.println("length"+length);
      result.set(sum);
/*      key.set("lenght"+lenght);*/
      context.write(key, result);


    }
  }

  public static void main(String[] args) throws Exception {

      final String NAME_NODE = "hdfs://localhost:9000";
    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);


    DistributedCache.addCacheFile(new URI(NAME_NODE
      + "/dataset1.txt"),
      job.getConfiguration());



    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

}