Spark and SparkSQL: How to imitate window function?

Martin Senne picture Martin Senne · Sep 5, 2015 · Viewed 13k times · Source

Description

Given a dataframe df

id |       date
---------------
 1 | 2015-09-01
 2 | 2015-09-01
 1 | 2015-09-03
 1 | 2015-09-04
 2 | 2015-09-04

I want to create a running counter or index,

  • grouped by the same id and
  • sorted by date in that group,

thus

id |       date |  counter
--------------------------
 1 | 2015-09-01 |        1
 1 | 2015-09-03 |        2
 1 | 2015-09-04 |        3
 2 | 2015-09-01 |        1
 2 | 2015-09-04 |        2

This is something I can achieve with window function, e.g.

val w = Window.partitionBy("id").orderBy("date")
val resultDF = df.select( df("id"), rowNumber().over(w) )

Unfortunately, Spark 1.4.1 does not support window functions for regular dataframes:

org.apache.spark.sql.AnalysisException: Could not resolve window function 'row_number'. Note that, using window functions currently requires a HiveContext;

Questions

  • How can I achieve the above computation on current Spark 1.4.1 without using window functions?
  • When will window functions for regular dataframes be supported in Spark?

Thanks!

Answer

zero323 picture zero323 · Sep 5, 2015

You can use HiveContext for local DataFrames as well and, unless you have a very good reason not to, it is probably a good idea anyway. It is a default SQLContext available in spark-shell and pyspark shell (as for now sparkR seems to use plain SQLContext) and its parser is recommended by Spark SQL and DataFrame Guide.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.rowNumber

object HiveContextTest {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Hive Context")
    val sc = new SparkContext(conf)
    val sqlContext = new HiveContext(sc)
    import sqlContext.implicits._

    val df = sc.parallelize(
        ("foo", 1) :: ("foo", 2) :: ("bar", 1) :: ("bar", 2) :: Nil
    ).toDF("k", "v")

    val w = Window.partitionBy($"k").orderBy($"v")
    df.select($"k", $"v", rowNumber.over(w).alias("rn")).show
  }
}