I am new to Hadoop framework. I was trying to write a program which reads XML file from hdfs, parses it using JDOM and sends it to a database. The following is the Java file
package JDOMprs;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.JDOMException;
import org.jdom2.input.SAXBuilder;
import com.vertica.hadoop.VerticaOutputFormat;
import com.vertica.hadoop.VerticaRecord;
public class ExampleParser extends Configured implements Tool {
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable> {
private final static DoubleWritable one = new DoubleWritable(1);
private Text word = new Text();
private List mylist;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, one);
}
}
public static class Reduce extends Reducer<Text, DoubleWritable, Text, VerticaRecord> {
VerticaRecord record = null;
String src_name;
String comment;
String rev_by;
String rev_dt;
String com_title;
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
try {
record = new VerticaRecord(context.getConfiguration());
} catch (Exception e) {
throw new IOException(e);
}
}
public void reduce(Text key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
if (record == null) {
throw new IOException("No output record found");
}
/******************** JDOM PARSER ***************************/
SAXBuilder builder = new SAXBuilder();
// File xmlFile = new
// File("C:/Users/Administrator/workspace/VerticaHadoop/src/JDOMprs/HadoopXML.xml");
try {
Document document = (Document) builder.build(key.toString());
Element rootNode = document.getRootElement();
List list = rootNode.getChildren("source");
// List ls= new ArrayList();
// Jdomparse jp= new Jdomparse();
// ls=jp.getParse(key);
//
for (int i = 0; i < list.size(); i++) {
Element node = (Element) list.get(i);
// System.out.println("Source Name : " +
// node.getChildText("source-name"));
// System.out.println("comment : " +
// node.getChildText("comment"));
// System.out.println("review by : " +
// node.getChildText("review-by"));
// System.out.println("review date : " +
// node.getChildText("review-date"));
// System.out.println("comment-title : " +
// node.getChildText("comment-title"));
record.set(0, node.getChildText("source-name").toString());
record.set(0, node.getChildText("comment").toString());
record.set(0, node.getChildText("review-by").toString());
record.set(0, node.getChildText("review-date").toString());
record.set(0, node.getChildText("comment-title").toString());
}
} catch (IOException io) {
System.out.println(io.getMessage());
} catch (JDOMException jdomex) {
System.out.println(jdomex.getMessage());
}
/****************** END OF PARSER *****************************/
context.write(new Text("reviewtbl"), record);
}
}
@Override
public int run(String[] args) throws Exception {
// Set up the configuration and job objects
Configuration conf = getConf();
Job job = new Job(conf);
conf = job.getConfiguration();
conf.set("mapreduce.job.tracker", "local");
job.setJobName("vertica test");
job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.class);
FileInputFormat.addInputPath(job, new Path("/user/cloudera/input"));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VerticaRecord.class);
job.setOutputFormatClass(VerticaOutputFormat.class);
job.setJarByClass(ExampleParser.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
VerticaOutputFormat.setOutput(job, "reviewtbl", true, "source varchar",
"comment varchar", "rev_by varchar", "rev_dt varchar",
"com_title varchar");
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ExampleParser(), args);
System.exit(res);
}
}
but I am getting the following exceptions.
12/12/20 02:41:34 INFO mapred.JobClient: Cleaning up the staging area hdfs://0.0.0.0/var/lib/hadoop-0.20/cache/mapred/mapred/staging/root/.staging/job_201212191356_0006
Exception in thread "main" java.lang.RuntimeException: java.lang.InstantiationException
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:115)
at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:947)
at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:967)
at org.apache.hadoop.mapred.JobClient.access$500(JobClient.java:170)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:880)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177)
at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:476)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:506)
at ExampleParser.run(ExampleParser.java:148)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at ExampleParser.main(ExampleParser.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:197)
Caused by: java.lang.InstantiationException
at sun.reflect.InstantiationExceptionConstructorAccessorImpl.newInstance(InstantiationExceptionConstructorAccessorImpl.java:30)
at java.lang.reflect.Constructor.newInstance(Constructor.java:513)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:113)
... 19 more
job.setInputFormatClass(
org.apache.hadoop.mapreduce.lib.input.FileInputFormat.class);
You can't use / instantiate the FileInputFormat
class: it's an abstract class.
If you want to parse the XML yourself then you'll need to write your own InputFormat
that extends FileInputFormat
, and the record reader can pass the entire contents to the mapper as the value. I think the Hadoop - The Definitive Guide has an example for WholeFileInputFormat
, or something like that, or Google will probably have something: