Aggregation — Typed and Untyped Grouping

You can group data in a Dataset to compute aggregates over a collection of (grouped) records.

You can use agg method for computing aggregations per column on the entire dataset (without creating groups).

scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+

The following aggregate operators are available for Datasets:

  1. groupBy for untyped aggregations with Column- or String-based column names.

  2. groupByKey for strongly-typed aggregations where the data is grouped by a given key function.

  3. rollup

  4. cube

The untyped aggregations, e.g. groupBy, rollup, and cube, return RelationalGroupedDatasets while groupByKey returns a KeyValueGroupedDataset.

Aggregates on Entire Dataset (Without Groups) — agg Operator

agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame

agg computes aggregate expressions on all the records in a Dataset.

Note
agg is simply a shortcut for groupBy().agg(…​).

Grouping by Columns — groupBy Untyped Operators

groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset

groupBy methods group the Dataset using the specified columns as Columns or their text representation. It returns a RelationalGroupedDataset to apply aggregation to.

// 10^6 records
val intsMM = 1 to math.pow(10, 6).toInt
val df = intsMM.toDF("n").withColumn("m", 'n % 2)

df.groupBy('m).agg(sum('n))

Internally, it first resolves columns and then builds a RelationalGroupedDataset.

Note
The following session uses the data setup as described in Test Setup section below.
scala> dataset.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa|      100| 0.12|
| aaa|      200| 0.29|
| bbb|      200| 0.53|
| bbb|      300| 0.42|
+----+---------+-----+

scala> dataset.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa|         150.0|     0.205|
| bbb|         250.0|     0.475|
+----+--------------+----------+

scala> dataset.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa|      200|      0.29|
| bbb|      200|      0.53|
| bbb|      300|      0.42|
| aaa|      100|      0.12|
+----+---------+----------+

scala> dataset.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa|    2|
| bbb|    2|
+----+-----+

scala> dataset.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa|      0.29|
| bbb|      0.53|
+----+----------+

scala> dataset.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa|      0.41|
| bbb|      0.95|
+----+----------+

scala> dataset.groupBy('productId).sum("score").show
+---------+------------------+
|productId|        sum(score)|
+---------+------------------+
|      300|              0.42|
|      100|              0.12|
|      200|0.8200000000000001|
+---------+------------------+

groupByKey Typed Operator

groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]

groupByKey groups records (of type T) by the input func. It returns a KeyValueGroupedDataset to apply aggregation to.

Note
groupByKey is Dataset's experimental API.
scala> dataset.groupByKey(_.productId).count.show
+-----+--------+
|value|count(1)|
+-----+--------+
|  300|       1|
|  100|       1|
|  200|       2|
+-----+--------+

import org.apache.spark.sql.expressions.scalalang._
scala> dataset.groupByKey(_.productId).agg(typed.sum[Token](_.score)).toDF("productId", "sum").orderBy('productId).show
+---------+------------------+
|productId|               sum|
+---------+------------------+
|      100|              0.12|
|      200|0.8200000000000001|
|      300|              0.42|
+---------+------------------+

RelationalGroupedDataset

RelationalGroupedDataset is a result of executing the untyped operators groupBy, rollup and cube.

RelationalGroupedDataset is also a result of executing pivot operator on a grouped records as RelationalGroupedDataset.

It offers the following operators to work on a grouped collection of records:

  • agg

  • count

  • mean

  • max

  • avg

  • min

  • sum

  • pivot

KeyValueGroupedDataset

KeyValueGroupedDataset is an experimental interface to a result of executing the strongly-typed operator groupByKey.

scala> val tokensByName = dataset.groupByKey(_.name)
tokensByName: org.apache.spark.sql.KeyValueGroupedDataset[String,Token] = org.apache.spark.sql.KeyValueGroupedDataset@1e3aad46

It holds keys that were used for the object.

scala> tokensByName.keys.show
+-----+
|value|
+-----+
|  aaa|
|  bbb|
+-----+

The following methods are available for any KeyValueGroupedDataset to work on groups of records:

  1. agg (of 1 to 4 types)

  2. mapGroups

  3. flatMapGroups

  4. reduceGroups

  5. count that is a special case of agg with count function applied.

  6. cogroup

Test Setup

This is a setup for learning GroupedData. Paste it into Spark Shell using :paste.

import spark.implicits._

case class Token(name: String, productId: Int, score: Double)
val data = Token("aaa", 100, 0.12) ::
  Token("aaa", 200, 0.29) ::
  Token("bbb", 200, 0.53) ::
  Token("bbb", 300, 0.42) :: Nil
val dataset = data.toDS.cache  (1)
  1. Cache the dataset so the following queries won’t load/recompute data over and over again.

results matching ""

    No results matching ""