Dataset Operators

You can group the set of all operators to use with Datasets per their target, i.e. the part of a Dataset they can be applied to.

Beside the above operators, there are the following ones.

Table 1. Dataset Operators
Operator Description

as

Converting a Dataset to a Dataset

explain

filter

flatMap

foreachPartition

isLocal

isStreaming

mapPartition

randomSplit

rdd

repartition

schema

select

selectExpr

show

take

toDF

Converts a Dataset to a DataFrame

toJSON

transform

Transforms a Dataset

where

write

writeStream

Transforming Datasets — transform method

transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]

transform applies t function to the source Dataset[T] and produces a Dataset[U]. It is for chaining custom transformations.

val dataset = spark.range(5)

// Transformation t
import org.apache.spark.sql.Dataset
def withDoubled(longs: Dataset[java.lang.Long]) = longs.withColumn("doubled", 'id * 2)

scala> dataset.transform(withDoubled).show
+---+-------+
| id|doubled|
+---+-------+
|  0|      0|
|  1|      2|
|  2|      4|
|  3|      6|
|  4|      8|
+---+-------+

Internally, transform executes t function on the current Dataset[T].

Converting to DataFrame — toDF Methods

toDF(): DataFrame
toDF(colNames: String*): DataFrame

toDF converts a Dataset into a DataFrame.

Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset's SparkSession and QueryExecution with the encoder being RowEncoder.

Caution
FIXME Describe toDF(colNames: String*)

Converting to Dataset — as Method

Caution
FIXME

Accessing DataFrameWriter — write Method

write: DataFrameWriter[T]

write method returns DataFrameWriter for records of type T.

import org.apache.spark.sql.{DataFrameWriter, Dataset}
val ints: Dataset[Int] = (0 to 5).toDS

val writer: DataFrameWriter[Int] = ints.write

Accessing DataStreamWriter — writeStream Method

writeStream: DataStreamWriter[T]

writeStream method returns DataStreamWriter for records of type T.

val papers = spark.readStream.text("papers").as[String]

import org.apache.spark.sql.streaming.DataStreamWriter
val writer: DataStreamWriter[String] = papers.writeStream

Display Records — show Methods

show(): Unit
show(numRows: Int): Unit
show(truncate: Boolean): Unit
show(numRows: Int, truncate: Boolean): Unit
show(numRows: Int, truncate: Int): Unit
Caution
FIXME

Internally, show relays to a private showString to do the formatting. It turns the Dataset into a DataFrame (by calling toDF()) and takes first n records.

Taking First n Records — take Method

take(n: Int): Array[T]

take is an action on a Dataset that returns a collection of n records.

Warning
take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError.

Internally, take creates a new Dataset with Limit logical plan for Literal expression and the current LogicalPlan. It then runs the SparkPlan that produces a Array[InternalRow] that is in turn decoded to Array[T] using a bounded encoder.

foreachPartition Method

foreachPartition(f: Iterator[T] => Unit): Unit

foreachPartition applies the f function to each partition of the Dataset.

case class Record(id: Int, city: String)
val ds = Seq(Record(0, "Warsaw"), Record(1, "London")).toDS

ds.foreachPartition { iter: Iterator[Record] => iter.foreach(println) }
Note
foreachPartition is used to save a DataFrame to a JDBC table (indirectly through JdbcUtils.saveTable) and ForeachSink.

mapPartitions Method

mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]

mapPartitions returns a new Dataset (of type U) with the function func applied to each partition.

Caution
FIXME Example

Creating Zero or More Records — flatMap Method

flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]

flatMap returns a new Dataset (of type U) with all records (of type T) mapped over using the function func and then flattening the results.

Note
flatMap can create new records. It deprecated explode.
final case class Sentence(id: Long, text: String)
val sentences = Seq(Sentence(0, "hello world"), Sentence(1, "witaj swiecie")).toDS

scala> sentences.flatMap(s => s.text.split("\\s+")).show
+-------+
|  value|
+-------+
|  hello|
|  world|
|  witaj|
|swiecie|
+-------+

Internally, flatMap calls mapPartitions with the partitions flatMap(ped).

Repartitioning Dataset — repartition Method

repartition(numPartitions: Int): Dataset[T]

repartition repartition the Dataset to exactly numPartitions partitions.

Projecting Columns — select Methods

select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1]
select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
select[U1, U2, U3](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
select[U1, U2, U3, U4](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3],
  c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
select[U1, U2, U3, U4, U5](
  c1: TypedColumn[T, U1],
  c2: TypedColumn[T, U2],
  c3: TypedColumn[T, U3],
  c4: TypedColumn[T, U4],
  c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
Caution
FIXME

filter Method

Caution
FIXME

where Methods

where(condition: Column): Dataset[T]
where(conditionExpr: String): Dataset[T]

where is a synonym for filter operator, i.e. it simply passes the parameters on to filter.

Projecting Columns using Expressions — selectExpr Method

selectExpr(exprs: String*): DataFrame

selectExpr is like select, but accepts SQL expressions exprs.

val ds = spark.range(5)

scala> ds.selectExpr("rand() as random").show
16/04/14 23:16:06 INFO HiveSqlParser: Parsing command: rand() as random
+-------------------+
|             random|
+-------------------+
|  0.887675894185651|
|0.36766085091074086|
| 0.2700020856675186|
| 0.1489033635529543|
| 0.5862990791950973|
+-------------------+

Internally, it executes select with every expression in exprs mapped to Column (using SparkSqlParser.parseExpression).

scala> ds.select(expr("rand() as random")).show
+------------------+
|            random|
+------------------+
|0.5514319279894851|
|0.2876221510433741|
|0.4599999092045741|
|0.5708558868374893|
|0.6223314406247136|
+------------------+
Note
A new feature in Spark 2.0.0.

randomSplit Methods

randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]

randomSplit randomly splits the Dataset per weights.

weights doubles should sum up to 1 and will be normalized if they do not.

You can define seed and if you don’t, a random seed will be used.

Note
It is used in TrainValidationSplit to split dataset into training and validation datasets.
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(_.show)
+---+
| id|
+---+
|  0|
|  1|
|  2|
+---+

+---+
| id|
+---+
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+
Note
A new feature in Spark 2.0.0.

explain Method

explain(): Unit
explain(extended: Boolean): Unit

explain prints the logical and physical plans to the console. Use it for debugging structured queries.

Tip
If you are serious about query debugging you could also use the Debugging Query Execution facility.

Internally, explain executes a ExplainCommand logical command.

scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, step=1, splits=Some(8))

== Analyzed Logical Plan ==
id: bigint
Range (0, 10, step=1, splits=Some(8))

== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(8))

== Physical Plan ==
*Range (0, 10, step=1, splits=Some(8))

toJSON method

toJSON maps the content of Dataset to a Dataset of JSON strings.

Note
A new feature in Spark 2.0.0.
scala> val ds = Seq("hello", "world", "foo bar").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]

scala> ds.toJSON.show
+-------------------+
|              value|
+-------------------+
|  {"value":"hello"}|
|  {"value":"world"}|
|{"value":"foo bar"}|
+-------------------+

Internally, toJSON grabs the RDD[InternalRow] (of the QueryExecution of the Dataset) and maps the records (per RDD partition) into JSON.

Note
toJSON uses Jackson’s JSON parser — jackson-module-scala.

Accessing Schema — schema Method

A Dataset has a schema.

schema: StructType
Tip

You may also use the following methods to learn about the schema:

Converting Dataset into RDD — rdd Attribute

rdd: RDD[T]

Whenever you are in need to convert a Dataset into a RDD, executing rdd method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset.

scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30

Internally, it looks ExpressionEncoder (for the Dataset) up and accesses the deserializer expression. That gives the DataType of the result of evaluating the expression.

Note
A deserializer expression is used to decode an InternalRow to an object of type T. See ExpressionEncoder.

It then executes a DeserializeToObject logical plan that will produce a RDD[InternalRow] that is converted into the proper RDD[T] using the DataType and T.

Note
It is a lazy operation that "produces" a RDD[T].

isStreaming Method

isStreaming returns true when Dataset contains StreamingRelation or StreamingExecutionRelation streaming sources.

Note
Streaming datasets are created using DataFrameReader.stream method (for StreamingRelation) and contain StreamingExecutionRelation after DataStreamWriter.start.
val reader = spark.read
val helloStream = reader.stream("hello")

scala> helloStream.isStreaming
res9: Boolean = true
Note
A new feature in Spark 2.0.0.

Is Dataset Local — isLocal method

isLocal: Boolean

isLocal is a flag that says whether operators like collect or take could be run locally, i.e. without using executors.

Internally, isLocal checks whether the logical query plan of a Dataset is LocalRelation.

results matching ""

    No results matching ""