scala> spark.range(10).agg(sum('id) as "sum").show
+---+
|sum|
+---+
| 45|
+---+
Aggregation — Typed and Untyped Grouping
You can group data in a Dataset to compute aggregates over a collection of (grouped) records.
You can use agg method for computing aggregations per column on the entire dataset (without creating groups).
The following aggregate operators are available for Datasets
:
-
groupBy for untyped aggregations with Column- or String-based column names.
-
groupByKey for strongly-typed aggregations where the data is grouped by a given key function.
-
rollup
-
cube
The untyped aggregations, e.g. groupBy
, rollup
, and cube
, return RelationalGroupedDatasets while groupByKey
returns a KeyValueGroupedDataset.
Aggregates on Entire Dataset (Without Groups) — agg Operator
agg(expr: Column, exprs: Column*): DataFrame
agg(exprs: Map[String, String]): DataFrame
agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame
agg
computes aggregate expressions on all the records in a Dataset
.
Note
|
agg is simply a shortcut for groupBy().agg(…).
|
Grouping by Columns — groupBy Untyped Operators
groupBy(cols: Column*): RelationalGroupedDataset
groupBy(col1: String, cols: String*): RelationalGroupedDataset
groupBy
methods group the Dataset
using the specified columns as Columns or their text representation. It returns a RelationalGroupedDataset to apply aggregation to.
// 10^6 records
val intsMM = 1 to math.pow(10, 6).toInt
val df = intsMM.toDF("n").withColumn("m", 'n % 2)
df.groupBy('m).agg(sum('n))
Internally, it first resolves columns and then builds a RelationalGroupedDataset.
Note
|
The following session uses the data setup as described in Test Setup section below. |
scala> dataset.show
+----+---------+-----+
|name|productId|score|
+----+---------+-----+
| aaa| 100| 0.12|
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
+----+---------+-----+
scala> dataset.groupBy('name).avg().show
+----+--------------+----------+
|name|avg(productId)|avg(score)|
+----+--------------+----------+
| aaa| 150.0| 0.205|
| bbb| 250.0| 0.475|
+----+--------------+----------+
scala> dataset.groupBy('name, 'productId).agg(Map("score" -> "avg")).show
+----+---------+----------+
|name|productId|avg(score)|
+----+---------+----------+
| aaa| 200| 0.29|
| bbb| 200| 0.53|
| bbb| 300| 0.42|
| aaa| 100| 0.12|
+----+---------+----------+
scala> dataset.groupBy('name).count.show
+----+-----+
|name|count|
+----+-----+
| aaa| 2|
| bbb| 2|
+----+-----+
scala> dataset.groupBy('name).max("score").show
+----+----------+
|name|max(score)|
+----+----------+
| aaa| 0.29|
| bbb| 0.53|
+----+----------+
scala> dataset.groupBy('name).sum("score").show
+----+----------+
|name|sum(score)|
+----+----------+
| aaa| 0.41|
| bbb| 0.95|
+----+----------+
scala> dataset.groupBy('productId).sum("score").show
+---------+------------------+
|productId| sum(score)|
+---------+------------------+
| 300| 0.42|
| 100| 0.12|
| 200|0.8200000000000001|
+---------+------------------+
groupByKey Typed Operator
groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T]
groupByKey
groups records (of type T
) by the input func
. It returns a KeyValueGroupedDataset to apply aggregation to.
Note
|
groupByKey is Dataset 's experimental API.
|
scala> dataset.groupByKey(_.productId).count.show
+-----+--------+
|value|count(1)|
+-----+--------+
| 300| 1|
| 100| 1|
| 200| 2|
+-----+--------+
import org.apache.spark.sql.expressions.scalalang._
scala> dataset.groupByKey(_.productId).agg(typed.sum[Token](_.score)).toDF("productId", "sum").orderBy('productId).show
+---------+------------------+
|productId| sum|
+---------+------------------+
| 100| 0.12|
| 200|0.8200000000000001|
| 300| 0.42|
+---------+------------------+
RelationalGroupedDataset
RelationalGroupedDataset
is also a result of executing pivot operator on a grouped records as RelationalGroupedDataset
.
It offers the following operators to work on a grouped collection of records:
-
agg
-
count
-
mean
-
max
-
avg
-
min
-
sum
-
pivot
KeyValueGroupedDataset
KeyValueGroupedDataset
is an experimental interface to a result of executing the strongly-typed operator groupByKey.
scala> val tokensByName = dataset.groupByKey(_.name)
tokensByName: org.apache.spark.sql.KeyValueGroupedDataset[String,Token] = org.apache.spark.sql.KeyValueGroupedDataset@1e3aad46
It holds keys
that were used for the object.
scala> tokensByName.keys.show
+-----+
|value|
+-----+
| aaa|
| bbb|
+-----+
The following methods are available for any KeyValueGroupedDataset
to work on groups of records:
-
agg
(of 1 to 4 types) -
mapGroups
-
flatMapGroups
-
reduceGroups
-
count
that is a special case ofagg
with count function applied. -
cogroup
Test Setup
This is a setup for learning GroupedData
. Paste it into Spark Shell using :paste
.
import spark.implicits._
case class Token(name: String, productId: Int, score: Double)
val data = Token("aaa", 100, 0.12) ::
Token("aaa", 200, 0.29) ::
Token("bbb", 200, 0.53) ::
Token("bbb", 300, 0.42) :: Nil
val dataset = data.toDS.cache (1)
-
Cache the dataset so the following queries won’t load/recompute data over and over again.