Spark code organization and best practices

Glennie Helles Sindholt picture Glennie Helles Sindholt · Sep 25, 2015 · Viewed 9.9k times · Source

So, having spend many years in an object oriented world with code reuse, design patterns and best practices always taken into account, I find myself struggling somewhat with code organization and code reuse in world of Spark.

If I try to write code in a reusable way, it nearly always comes with a performance cost and I end up rewriting it to whatever is optimal for my particular use case. This constant "write what is optimal for this particular use case" also affects code organization, because splitting code into different objects or modules is difficult when "it all really belongs together" and I thus end up with very few "God" object containing long chains of complex transformations. In fact, I frequently think that if I had taken a look at most of the Spark code I'm writing now back when I was working in the object oriented world, I would have winced and dismissed it as "spaghetti code".

I have surfed the internet trying to find some sort of equivalent to the best practices of the object oriented world, but without much luck. I can find some "best practices" for functional programming but Spark just adds an extra layer, because performance is such a major factor here.

So my question to you is, have any of you Spark gurus found some best practices for writing Spark code that you can recommend?

EDIT

As written in a comment, I did not actually expect anyone to post an answer on how to solve this problem, but rather I was hoping that someone in this community had come across some Martin Fowler type, who had written som articles or blog posts somewhere on how to address problems with code organization in the world of Spark.

@DanielDarabos suggested that I might put in an example of a situation where code organization and performance are conflicting. While I find that I frequently have issues with this in my everyday work, I find it a bit hard to boil it down to a good minimal example ;) but I will try.

In the object oriented world, I'm a big fan of the Single Responsibility Principle, so I would make sure that my methods were only responsible for one thing. It makes them reusable and easily testable. So if I had to, say, calculate the sum of some numbers in a list (matching some criteria) and I had to calculate the average of the same number, I would most definitely create two methods - one that calculated the sum and one that calculated the average. Like this:

def main(implicit args: Array[String]): Unit = {
  val list = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5))

  println("Summed weights for DK = " + summedWeights(list, "DK")
  println("Averaged weights for DK = " + averagedWeights(list, "DK")
}

def summedWeights(list: List, country: String): Double = {
  list.filter(_._1 == country).map(_._2).sum
}

def averagedWeights(list: List, country: String): Double = {
  val filteredByCountry = list.filter(_._1 == country) 
  filteredByCountry.map(_._2).sum/ filteredByCountry.length
}

I can of course continue to honor SRP in Spark:

def main(implicit args: Array[String]): Unit = {
  val df = List(("DK", 1.2), ("DK", 1.4), ("SE", 1.5)).toDF("country", "weight")

  println("Summed weights for DK = " + summedWeights(df, "DK")
  println("Averaged weights for DK = " + averagedWeights(df, "DK")
}


def avgWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(avg('weight))

  summedWeight.first().getDouble(0)
}

def summedWeights(df: DataFrame, country: String, sqlContext: SQLContext): Double = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country)
  val summedWeight = countrySpecific.agg(sum('weight))

  summedWeight.first().getDouble(0)
}

But because my df may contain billions of rows I would rather not have to perform the filter twice. In fact, performance is directly coupled to EMR cost, so I REALLY don't want that. To overcome it, I thus decide to violate SRP and simply put the two functions in one and make sure I call persist on the country-filtered DataFrame, like this:

def summedAndAveragedWeights(df: DataFrame, country: String, sqlContext: SQLContext): (Double, Double) = {
  import org.apache.spark.sql.functions._
  import sqlContext.implicits._

  val countrySpecific = df.filter('country === country).persist(StorageLevel.MEMORY_AND_DISK_SER)
  val summedWeights = countrySpecific.agg(sum('weight)).first().getDouble(0)
  val averagedWeights = summedWeights / countrySpecific.count()

  (summedWeights, averagedWeights)
}

Now, this example if of course a huge simplification of what's encountered in real life. Here I could simply solve it by filtering and persisting df before handing it to the sum and avg functions (which would also be more SRP), but in real life there may be a number of intermediate calculations going on that are needed again and again. In other words, the filter function here is merely an attempt to make a simple example of something that will benefit from being persisted. In fact, I think calls to persist is a keyword here. Calling persist will vastly speed up my job, but the cost is that I have to tightly couple all code that depends on the persisted DataFrame - even if they are logically separate.