There is not an isEmpty
method on RDD's, so what is the most efficient way of testing if an RDD is empty?
RDD.isEmpty()
will be part of Spark 1.3.0.
Based on suggestions in this apache mail-thread and later some comments to this answer, I have done some small local experiments. The best method is using take(1).length==0
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.take(1).length == 0
}
It should run in O(1)
except when the RDD is empty, in which case it is linear in the number of partitions.
Thanks to Josh Rosen and Nick Chammas to point me to this.
Note: This fails if the RDD is of type RDD[Nothing]
e.g. isEmpty(sc.parallelize(Seq()))
, but this is likely not a problem in real life. isEmpty(sc.parallelize(Seq[Any]()))
works fine.
take(1)==0
method, thanks to comments.My original suggestion: Use mapPartitions
.
def isEmpty[T](rdd : RDD[T]) = {
rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_)
}
It should scale in the number of partitions and is not nearly as clean as take(1)
. It is however robust to RDD's of type RDD[Nothing]
.
I used this code for the timings.
def time(n : Long, f : (RDD[Long]) => Boolean): Unit = {
val start = System.currentTimeMillis()
val rdd = sc.parallelize(1L to n, numSlices = 100)
val result = f(rdd)
printf("Time: " + (System.currentTimeMillis() - start) + " Result: " + result)
}
time(1000000000L, rdd => rdd.take(1).length == 0L)
time(1000000000L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1000000000L, rdd => rdd.count() == 0L)
time(1000000000L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1000000000L, rdd => rdd.fold(0)(_ + _) == 0L)
time(1L, rdd => rdd.take(1).length == 0L)
time(1L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(1L, rdd => rdd.count() == 0L)
time(1L, rdd => rdd.takeSample(true, 1).isEmpty)
time(1L, rdd => rdd.fold(0)(_ + _) == 0L)
time(0L, rdd => rdd.take(1).length == 0L)
time(0L, rdd => rdd.mapPartitions(it => Iterator(!it.hasNext)).reduce(_&&_))
time(0L, rdd => rdd.count() == 0L)
time(0L, rdd => rdd.takeSample(true, 1).isEmpty)
time(0L, rdd => rdd.fold(0)(_ + _) == 0L)
On my local machine with 3 worker cores I got these results
Time: 21 Result: false
Time: 75 Result: false
Time: 8664 Result: false
Time: 18266 Result: false
Time: 23836 Result: false
Time: 113 Result: false
Time: 101 Result: false
Time: 68 Result: false
Time: 221 Result: false
Time: 46 Result: false
Time: 79 Result: true
Time: 93 Result: true
Time: 79 Result: true
Time: 100 Result: true
Time: 64 Result: true