How do I split an RDD into two or more RDDs?

Carlos Bribiescas picture Carlos Bribiescas · Oct 6, 2015 · Viewed 51k times · Source

I'm looking for a way to split an RDD into two or more RDDs. The closest I've seen is Scala Spark: Split collection into several RDD? which is still a single RDD.

If you're familiar with SAS, something like this:

data work.split1, work.split2;
    set work.preSplit;

    if (condition1)
        output work.split1
    else if (condition2)
        output work.split2
run;

which resulted in two distinct data sets. It would have to be immediately persisted to get the results I intend...

Answer

zero323 picture zero323 · Oct 6, 2015

It is not possible to yield multiple RDDs from a single transformation*. If you want to split a RDD you have to apply a filter for each split condition. For example:

def even(x): return x % 2 == 0
def odd(x): return not even(x)
rdd = sc.parallelize(range(20))

rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))

If you have only a binary condition and computation is expensive you may prefer something like this:

kv_rdd = rdd.map(lambda x: (x, odd(x)))
kv_rdd.cache()

rdd_odd = kv_rdd.filter(lambda kv: kv[1]).keys()
rdd_even = kv_rdd.filter(lambda kv: not kv[1]).keys()

It means only a single predicate computation but requires additional pass over all data.

It is important to note that as long as an input RDD is properly cached and there no additional assumptions regarding data distribution there is no significant difference when it comes to time complexity between repeated filter and for-loop with nested if-else.

With N elements and M conditions number of operations you have to perform is clearly proportional to N times M. In case of for-loop it should be closer to (N + MN) / 2 and repeated filter is exactly NM but at the end of the day it is nothing else than O(NM). You can see my discussion** with Jason Lenderman to read about some pros-and-cons.

At the very high level you should consider two things:

  1. Spark transformations are lazy, until you execute an action your RDD is not materialized

    Why does it matter? Going back to my example:

    rdd_odd, rdd_even = (rdd.filter(f) for f in (odd, even))
    

    If later I decide that I need only rdd_odd then there is no reason to materialize rdd_even.

    If you take a look at your SAS example to compute work.split2 you need to materialize both input data and work.split1.

  2. RDDs provide a declarative API. When you use filter or map it is completely up to Spark engine how this operation is performed. As long as the functions passed to transformations are side effects free it creates multiple possibilities to optimize a whole pipeline.

At the end of the day this case is not special enough to justify its own transformation.

This map with filter pattern is actually used in a core Spark. See my answer to How does Sparks RDD.randomSplit actually split the RDD and a relevant part of the randomSplit method.

If the only goal is to achieve a split on input it is possible to use partitionBy clause for DataFrameWriter which text output format:

def makePairs(row: T): (String, String) = ???

data
  .map(makePairs).toDF("key", "value")
  .write.partitionBy($"key").format("text").save(...)

* There are only 3 basic types of transformations in Spark:

  • RDD[T] => RDD[T]
  • RDD[T] => RDD[U]
  • (RDD[T], RDD[U]) => RDD[W]

where T, U, W can be either atomic types or products / tuples (K, V). Any other operation has to be expressed using some combination of the above. You can check the original RDD paper for more details.

** http://chat.stackoverflow.com/rooms/91928/discussion-between-zero323-and-jason-lenderman

*** See also Scala Spark: Split collection into several RDD?