Spark Dataframes- Reducing By Key

d80tb7 picture d80tb7 · Dec 20, 2016 · Viewed 14.5k times · Source

Let's say I have a data structure like this where ts is some timestamp

case class Record(ts: Long, id: Int, value: Int)

Given a large number of these records I want to end up with the record with the highest timestamp for each id. Using the RDD api I think the following code gets the job done:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
  records.keyBy(_.id).reduceByKey{
    (x, y) => if(x.ts > y.ts) x else y
  }.values
}

Likewise this is my attempt with datasets:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
  records.groupByKey(_.id).mapGroups{
    case(id, records) => {
      records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
    }
  }
}

I've being trying to work out how to achieve something similar with dataframes but to no avail- I realise I can do the grouping with:

records.groupBy($"id")

But that gives me a RelationGroupedDataSet and it's not clear to me what aggregation function I need to write to achieve what I want- all example aggregations I've seen appear to focus on returning just a single column being aggregated rather than the whole row.

Is it possible to achieve this using dataframes?

Answer

Assaf Mendelson picture Assaf Mendelson · Dec 20, 2016

You can use the argmax logic (see databricks example)

For example, lets say your dataframe is called df and it has the columns id, val, ts you would do something like this:

import org.apache.spark.sql.functions._
val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*")