I'm playing with Spark. It is the default, pre-built distribution (0.7.0) from the website, with default config, cluster mode, one worker (my localhost). I read the docs on installing and everything seems fine.
I have a CSV file (various sizes, 1000 - 1million rows). If I run my app with small input file (for example the 1000 rows), everything is fine, the program is done in seconds and produces the expected output. But when I supply a bigger file (100.000 rows, or 1million), the execution fails. I tried to dig in the logs, but did not help much (it repeats the whole process about 9-10 times and exitst with fail after that. Also, there is some error related to fetching from some null source failed).
The result Iterable returned by the first JavaRDD is suspicious for me. If I return a hard-coded, singleton list (like res.add("something"); return res;), everything is fine, even with one million rows. But if I add all my keys I want (28 strings of lenght 6-20 chars), the process fails only with the big input. The problem is, I need all these keys, this is the actual business logic.
I'm using Linux amd64, quad core, 8GB ram. Latest Oracle Java7 JDK. Spark config:
SPARK_WORKER_MEMORY=4g
SPARK_MEM=3g
SPARK_CLASSPATH=$SPARK_CLASSPATH:/my/super/application.jar
I must mention that when I start the program, it says:
13/05/30 11:41:52 WARN spark.Utils: Your hostname, *** resolves to a loopback address: 127.0.1.1; using 192.168.1.157 instead (on interface eth1)
13/05/30 11:41:52 WARN spark.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Here is my program. It is based on the JavaWordCount example, minimally modified.
public final class JavaWordCount
{
public static void main(final String[] args) throws Exception
{
final JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
System.getenv("SPARK_HOME"), new String[] {"....jar" });
final JavaRDD<String> words = ctx.textFile(args[1], 1).flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(final String s)
{
// parsing "s" as the line, computation, building res (it's a List<String>)
return res;
}
});
final JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(final String s)
{
return new Tuple2<String, Integer>(s, 1);
}
});
final JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(final Integer i1, final Integer i2)
{
return i1 + i2;
}
});
counts.collect();
for (Tuple2<?, ?> tuple : counts.collect()) {
System.out.println(tuple._1 + ": " + tuple._2);
}
}
}
I managed to fix it by setting the property spark.mesos.coarse to true. More info here.
Update: I've been playing around with Spark for a couple of hours. These settings helped me a little, but it seems it's nearly impossible to process ~10million lines of text on a single machine.
System.setProperty("spark.serializer", "spark.KryoSerializer"); // kryo is much faster
System.setProperty("spark.kryoserializer.buffer.mb", "256"); // I serialize bigger objects
System.setProperty("spark.mesos.coarse", "true"); // link provided
System.setProperty("spark.akka.frameSize", "500"); // workers should be able to send bigger messages
System.setProperty("spark.akka.askTimeout", "30"); // high CPU/IO load
Note: Increasing the frame size seems particularly helpful at preventing: org.apache.spark.SparkException: Error communicating with MapOutputTracker