I just started working with MapReduce, and I'm running into a weird bug that I haven't been able to answer through Google. I'm making a basic program using ArrayWritable, but when I run it, I get the following error during Reduce:
java.lang.RuntimeException:
java.lang.NoSuchMethodException:org.apache.hadoop.io.ArrayWritable.<init>()
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:62)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40)
at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1276)
at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1214)
at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:250)
at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:246)
at PageRank$Reduce.reduce(Unknown Source)
at PageRank$Reduce.reduce(Unknown Source)
at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:522)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:421)
at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
I'm using Hadoop 1.2.1. Here's my code:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.join.*;
import java.io.IOException;
import java.util.Iterator;
public class TempClass {
public static class MapClass extends MapReduceBase
implements Mapper<LongWritable, Text, Text, ArrayWritable> {
public void map(LongWritable key, Text value,
OutputCollector<Text, ArrayWritable> output,
Reporter reporter) throws IOException {
String[] arr_str = new String[]{"a","b","c"};
for(int i=0; i<3; i++)
output.collect(new Text("my_key"), new ArrayWritable(arr_str));
}
}
public static class Reduce extends MapReduceBase
implements Reducer<Text, ArrayWritable, Text, ArrayWritable> {
public void reduce(Text key, Iterator<ArrayWritable> values,
OutputCollector<Text, ArrayWritable> output,
Reporter reporter) throws IOException {
ArrayWritable tmp;
while(values.hasNext()){
tmp = values.next();
output.collect(key, tmp);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, TempClass.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(ArrayWritable.class);
job.setOutputFormat(TextOutputFormat.class);
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
FileInputFormat.setInputPaths( job, new Path( args[0] ) );
FileOutputFormat.setOutputPath( job, new Path( args[1] ) );
job.setJobName( "TempClass" );
JobClient.runJob(job);
}
}
If I comment below lines (Reduce Class):
//while(values.hasNext()){
// tmp = values.next();
output.collect(key, tmp);
//}
everything will be ok. Do you have any ideas?
A Writable for arrays containing instances of a class. The elements of this writable must all be instances of the same class. If this writable will be the input for a Reducer, you will need to create a subclass that sets the value to be of the proper type. For example: public class IntArrayWritable extends ArrayWritable { public IntArrayWritable() { super(IntWritable.class); } }
Here is from the docs of ArrayWritable. Generally, Writable
should have an constructor with no parameter.
I just modified your code to:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class TempClass {
public static class TextArrayWritable extends ArrayWritable {
public TextArrayWritable() {
super(Text.class);
}
public TextArrayWritable(String[] strings) {
super(Text.class);
Text[] texts = new Text[strings.length];
for (int i = 0; i < strings.length; i++) {
texts[i] = new Text(strings[i]);
}
set(texts);
}
}
public static class MapClass extends MapReduceBase implements
Mapper<LongWritable, Text, Text, ArrayWritable> {
public void map(LongWritable key, Text value,
OutputCollector<Text, ArrayWritable> output, Reporter reporter)
throws IOException {
String[] arr_str = new String[] {
"a", "b", "c" };
for (int i = 0; i < 3; i++)
output.collect(new Text("my_key"), new TextArrayWritable(
arr_str));
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, TextArrayWritable, Text, TextArrayWritable> {
public void reduce(Text key, Iterator<TextArrayWritable> values,
OutputCollector<Text, TextArrayWritable> output,
Reporter reporter) throws IOException {
TextArrayWritable tmp;
while (values.hasNext()) {
tmp = values.next();
output.collect(key, tmp);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
JobConf job = new JobConf(conf, TempClass.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TextArrayWritable.class);
job.setOutputFormat(TextOutputFormat.class);
job.setInputFormat(TextInputFormat.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setJobName("TempClass");
JobClient.runJob(job);
}
}