transform[U](t: Dataset[T] => Dataset[U]): Dataset[U]
Dataset Operators
You can group the set of all operators to use with Datasets
per their target, i.e. the part of a Dataset
they can be applied to.
Beside the above operators, there are the following ones.
Operator | Description |
---|---|
Converting a |
|
Converts a |
|
Transforms a |
|
Transforming Datasets — transform
method
transform
applies t
function to the source Dataset[T]
and produces a Dataset[U]
. It is for chaining custom transformations.
val dataset = spark.range(5)
// Transformation t
import org.apache.spark.sql.Dataset
def withDoubled(longs: Dataset[java.lang.Long]) = longs.withColumn("doubled", 'id * 2)
scala> dataset.transform(withDoubled).show
+---+-------+
| id|doubled|
+---+-------+
| 0| 0|
| 1| 2|
| 2| 4|
| 3| 6|
| 4| 8|
+---+-------+
Internally, transform
executes t
function on the current Dataset[T]
.
Converting to DataFrame
— toDF
Methods
toDF(): DataFrame
toDF(colNames: String*): DataFrame
Internally, the empty-argument toDF
creates a Dataset[Row]
using the Dataset
's SparkSession and QueryExecution with the encoder being RowEncoder.
Caution
|
FIXME Describe toDF(colNames: String*)
|
Converting to Dataset
— as
Method
Caution
|
FIXME |
Accessing DataFrameWriter
— write
Method
write: DataFrameWriter[T]
write
method returns DataFrameWriter for records of type T
.
import org.apache.spark.sql.{DataFrameWriter, Dataset}
val ints: Dataset[Int] = (0 to 5).toDS
val writer: DataFrameWriter[Int] = ints.write
Accessing DataStreamWriter
— writeStream
Method
writeStream: DataStreamWriter[T]
writeStream
method returns DataStreamWriter for records of type T
.
val papers = spark.readStream.text("papers").as[String]
import org.apache.spark.sql.streaming.DataStreamWriter
val writer: DataStreamWriter[String] = papers.writeStream
Display Records — show
Methods
show(): Unit
show(numRows: Int): Unit
show(truncate: Boolean): Unit
show(numRows: Int, truncate: Boolean): Unit
show(numRows: Int, truncate: Int): Unit
Caution
|
FIXME |
Internally, show
relays to a private showString
to do the formatting. It turns the Dataset
into a DataFrame
(by calling toDF()
) and takes first n
records.
Taking First n Records — take
Method
take(n: Int): Array[T]
take
is an action on a Dataset
that returns a collection of n
records.
Warning
|
take loads all the data into the memory of the Spark application’s driver process and for a large n could result in OutOfMemoryError .
|
foreachPartition
Method
foreachPartition(f: Iterator[T] => Unit): Unit
foreachPartition
applies the f
function to each partition of the Dataset
.
case class Record(id: Int, city: String)
val ds = Seq(Record(0, "Warsaw"), Record(1, "London")).toDS
ds.foreachPartition { iter: Iterator[Record] => iter.foreach(println) }
Note
|
foreachPartition is used to save a DataFrame to a JDBC table (indirectly through JdbcUtils.saveTable ) and ForeachSink.
|
mapPartitions
Method
mapPartitions[U: Encoder](func: Iterator[T] => Iterator[U]): Dataset[U]
mapPartitions
returns a new Dataset
(of type U
) with the function func
applied to each partition.
Caution
|
FIXME Example |
Creating Zero or More Records — flatMap
Method
flatMap[U: Encoder](func: T => TraversableOnce[U]): Dataset[U]
flatMap
returns a new Dataset
(of type U
) with all records (of type T
) mapped over using the function func
and then flattening the results.
Note
|
flatMap can create new records. It deprecated explode .
|
final case class Sentence(id: Long, text: String)
val sentences = Seq(Sentence(0, "hello world"), Sentence(1, "witaj swiecie")).toDS
scala> sentences.flatMap(s => s.text.split("\\s+")).show
+-------+
| value|
+-------+
| hello|
| world|
| witaj|
|swiecie|
+-------+
Internally, flatMap
calls mapPartitions with the partitions flatMap(ped)
.
Repartitioning Dataset — repartition
Method
repartition(numPartitions: Int): Dataset[T]
repartition
repartition the Dataset
to exactly numPartitions
partitions.
Projecting Columns — select
Methods
select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1]
select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): Dataset[(U1, U2)]
select[U1, U2, U3](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)]
select[U1, U2, U3, U4](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)]
select[U1, U2, U3, U4, U5](
c1: TypedColumn[T, U1],
c2: TypedColumn[T, U2],
c3: TypedColumn[T, U3],
c4: TypedColumn[T, U4],
c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)]
Caution
|
FIXME |
filter
Method
Caution
|
FIXME |
where
Methods
where(condition: Column): Dataset[T]
where(conditionExpr: String): Dataset[T]
where
is a synonym for filter operator, i.e. it simply passes the parameters on to filter
.
Projecting Columns using Expressions — selectExpr
Method
selectExpr(exprs: String*): DataFrame
selectExpr
is like select
, but accepts SQL expressions exprs
.
val ds = spark.range(5)
scala> ds.selectExpr("rand() as random").show
16/04/14 23:16:06 INFO HiveSqlParser: Parsing command: rand() as random
+-------------------+
| random|
+-------------------+
| 0.887675894185651|
|0.36766085091074086|
| 0.2700020856675186|
| 0.1489033635529543|
| 0.5862990791950973|
+-------------------+
Internally, it executes select
with every expression in exprs
mapped to Column (using SparkSqlParser.parseExpression).
scala> ds.select(expr("rand() as random")).show
+------------------+
| random|
+------------------+
|0.5514319279894851|
|0.2876221510433741|
|0.4599999092045741|
|0.5708558868374893|
|0.6223314406247136|
+------------------+
Note
|
A new feature in Spark 2.0.0. |
randomSplit
Methods
randomSplit(weights: Array[Double]): Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long): Array[Dataset[T]]
randomSplit
randomly splits the Dataset
per weights
.
weights
doubles should sum up to 1
and will be normalized if they do not.
You can define seed
and if you don’t, a random seed
will be used.
Note
|
It is used in TrainValidationSplit to split dataset into training and validation datasets. |
val ds = spark.range(10)
scala> ds.randomSplit(Array[Double](2, 3)).foreach(_.show)
+---+
| id|
+---+
| 0|
| 1|
| 2|
+---+
+---+
| id|
+---+
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
Note
|
A new feature in Spark 2.0.0. |
explain
Method
explain(): Unit
explain(extended: Boolean): Unit
explain
prints the logical and physical plans to the console. Use it for debugging structured queries.
Tip
|
If you are serious about query debugging you could also use the Debugging Query Execution facility. |
Internally, explain
executes a ExplainCommand logical command.
scala> spark.range(10).explain(extended = true)
== Parsed Logical Plan ==
Range (0, 10, step=1, splits=Some(8))
== Analyzed Logical Plan ==
id: bigint
Range (0, 10, step=1, splits=Some(8))
== Optimized Logical Plan ==
Range (0, 10, step=1, splits=Some(8))
== Physical Plan ==
*Range (0, 10, step=1, splits=Some(8))
toJSON
method
toJSON
maps the content of Dataset
to a Dataset
of JSON strings.
Note
|
A new feature in Spark 2.0.0. |
scala> val ds = Seq("hello", "world", "foo bar").toDS
ds: org.apache.spark.sql.Dataset[String] = [value: string]
scala> ds.toJSON.show
+-------------------+
| value|
+-------------------+
| {"value":"hello"}|
| {"value":"world"}|
|{"value":"foo bar"}|
+-------------------+
Internally, toJSON
grabs the RDD[InternalRow]
(of the QueryExecution of the Dataset
) and maps the records (per RDD partition) into JSON.
Note
|
toJSON uses Jackson’s JSON parser — jackson-module-scala.
|
Accessing Schema — schema
Method
A Dataset
has a schema.
schema: StructType
Tip
|
You may also use the following methods to learn about the schema:
|
Converting Dataset into RDD — rdd
Attribute
rdd: RDD[T]
Whenever you are in need to convert a Dataset
into a RDD
, executing rdd
method gives you the RDD of the proper input object type (not Row as in DataFrames) that sits behind the Dataset
.
scala> val rdd = tokens.rdd
rdd: org.apache.spark.rdd.RDD[Token] = MapPartitionsRDD[11] at rdd at <console>:30
Internally, it looks ExpressionEncoder (for the Dataset
) up and accesses the deserializer
expression. That gives the DataType of the result of evaluating the expression.
Note
|
A deserializer expression is used to decode an InternalRow to an object of type T . See ExpressionEncoder.
|
It then executes a DeserializeToObject
logical plan that will produce a RDD[InternalRow]
that is converted into the proper RDD[T]
using the DataType
and T
.
Note
|
It is a lazy operation that "produces" a RDD[T] .
|
isStreaming
Method
isStreaming
returns true
when Dataset
contains StreamingRelation or StreamingExecutionRelation streaming sources.
Note
|
Streaming datasets are created using DataFrameReader.stream method (for StreamingRelation) and contain StreamingExecutionRelation after DataStreamWriter.start. |
val reader = spark.read
val helloStream = reader.stream("hello")
scala> helloStream.isStreaming
res9: Boolean = true
Note
|
A new feature in Spark 2.0.0. |
Is Dataset Local — isLocal
method
isLocal: Boolean
isLocal
is a flag that says whether operators like collect
or take
could be run locally, i.e. without using executors.
Internally, isLocal
checks whether the logical query plan of a Dataset
is LocalRelation.