val spark: SparkSession = ...
import spark.implicits._
import org.apache.spark.sql.Column
scala> val nameCol: Column = 'name
nameCol: org.apache.spark.sql.Column = name
Column Operators
Column
type represents a column in a dataset.
With the implicits converstions imported, you can create columns using Scala’s symbols.
You can also create columns from $
-prefixed strings.
// Note that $ alone creates a ColumnName
scala> val idCol = $"id"
idCol: org.apache.spark.sql.ColumnName = id
import org.apache.spark.sql.Column
// The target type triggers the implicit conversion to Column
scala> val idCol: Column = $"id"
idCol: org.apache.spark.sql.Column = id
Beside using the implicits
conversions to create columns, you can use col and column methods from functions object.
import org.apache.spark.sql.functions._
scala> val nameCol = col("name")
nameCol: org.apache.spark.sql.Column = name
scala> val cityCol = column("city")
cityCol: org.apache.spark.sql.Column = city
Finally, you can create a Column
using the Dataset
it belongs to using Dataset.apply factory method or Dataset.col method.
scala> val textCol = dataset.col("text")
textCol: org.apache.spark.sql.Column = text
scala> val idCol = dataset.apply("id")
idCol: org.apache.spark.sql.Column = id
scala> val idCol = dataset("id")
idCol: org.apache.spark.sql.Column = id
You can reference nested columns using .
(dot).
Caution
|
FIXME |
Adding Column to Dataset — withColumn
Method
withColumn(colName: String, col: Column): DataFrame
withColumn
method returns a new DataFrame
with the new column col
with colName
name added.
Note
|
withColumn can replace an existing colName column.
|
scala> val df = Seq((1, "jeden"), (2, "dwa")).toDF("number", "polish")
df: org.apache.spark.sql.DataFrame = [number: int, polish: string]
scala> df.show
+------+------+
|number|polish|
+------+------+
| 1| jeden|
| 2| dwa|
+------+------+
scala> df.withColumn("polish", lit(1)).show
+------+------+
|number|polish|
+------+------+
| 1| 1|
| 2| 1|
+------+------+
You can add new columns do a Dataset
using withColumn method.
val spark: SparkSession = ...
val dataset = spark.range(5)
// Add a new column called "group"
scala> dataset.withColumn("group", 'id % 2).show
+---+-----+
| id|group|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 0|
| 3| 1|
| 4| 0|
+---+-----+
Referencing Column — apply
Method
val spark: SparkSession = ...
case class Word(id: Long, text: String)
val dataset = Seq(Word(0, "hello"), Word(1, "spark")).toDS
scala> val idCol = dataset.apply("id")
idCol: org.apache.spark.sql.Column = id
// or using Scala's magic a little bit
// the following is equivalent to the above explicit apply call
scala> val idCol = dataset("id")
idCol: org.apache.spark.sql.Column = id
Creating Column — col
method
val spark: SparkSession = ...
case class Word(id: Long, text: String)
val dataset = Seq(Word(0, "hello"), Word(1, "spark")).toDS
scala> val textCol = dataset.col("text")
textCol: org.apache.spark.sql.Column = text
like
Caution
|
FIXME |
scala> df("id") like "0"
res0: org.apache.spark.sql.Column = id LIKE 0
scala> df.filter('id like "0").show
+---+-----+
| id| text|
+---+-----+
| 0|hello|
+---+-----+
Symbols As Column Names
scala> val df = Seq((0, "hello"), (1, "world")).toDF("id", "text")
df: org.apache.spark.sql.DataFrame = [id: int, text: string]
scala> df.select('id)
res0: org.apache.spark.sql.DataFrame = [id: int]
scala> df.select('id).show
+---+
| id|
+---+
| 0|
| 1|
+---+
over function
over(window: expressions.WindowSpec): Column
over
function defines a windowing column that allows for window computations to be applied to a window. Window functions are defined using WindowSpec.
Tip
|
Read about Windows in Windows. |
cast
cast
method casts a column to a data type. It makes for type-safe maps with Row objects of the proper type (not Any
).
cast(to: String): Column
cast(to: DataType): Column
It uses CatalystSqlParser to parse the data type from its canonical string representation.
cast Example
scala> val df = Seq((0f, "hello")).toDF("label", "text")
df: org.apache.spark.sql.DataFrame = [label: float, text: string]
scala> df.printSchema
root
|-- label: float (nullable = false)
|-- text: string (nullable = true)
// without cast
import org.apache.spark.sql.Row
scala> df.select("label").map { case Row(label) => label.getClass.getName }.show(false)
+---------------+
|value |
+---------------+
|java.lang.Float|
+---------------+
// with cast
import org.apache.spark.sql.types.DoubleType
scala> df.select(col("label").cast(DoubleType)).map { case Row(label) => label.getClass.getName }.show(false)
+----------------+
|value |
+----------------+
|java.lang.Double|
+----------------+