trait Encoder[T] extends Serializable {
def schema: StructType
def clsTag: ClassTag[T]
}
Encoders — Internal Row Converters
Encoder is the fundamental concept in the serialization and deserialization (SerDe) framework in Spark SQL 2.0. Spark SQL uses the SerDe framework for IO to make it efficient time- and space-wise.
Tip
|
Spark has borrowed the idea from the Hive SerDe library so it might be worthwhile to get familiar with Hive a little bit, too. |
Encoders are modelled in Spark SQL 2.0 as Encoder[T]
trait.
The type T
stands for the type of records a Encoder[T]
can deal with. An encoder of type T
, i.e. Encoder[T]
, is used to convert (encode and decode) any JVM object or primitive of type T
(that could be your domain object) to and from Spark SQL’s InternalRow which is the internal binary row format representation (using Catalyst expressions and code generation).
Note
|
Encoder is also called "a container of serde expressions in Dataset".
|
Note
|
The one and only implementation of the Encoder trait in Spark SQL 2.0 is ExpressionEncoder.
|
Encoders are integral (and internal) part of any Dataset[T] (of records of type T
) with a Encoder[T]
that is used to serialize and deserialize the records of this dataset.
Note
|
Dataset[T] type is a Scala type constructor with the type parameter T . So is Encoder[T] that handles serialization and deserialization of T to the internal representation.
|
Encoders know the schema of the records. This is how they offer significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).
// The domain object for your records in a large dataset
case class Person(id: Long, name: String)
import org.apache.spark.sql.Encoders
scala> val personEncoder = Encoders.product[Person]
personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string]
scala> personEncoder.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(name,StringType,true))
scala> personEncoder.clsTag
res1: scala.reflect.ClassTag[Person] = Person
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
scala> val personExprEncoder = personEncoder.asInstanceOf[ExpressionEncoder[Person]]
personExprEncoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] = class[id[0]: bigint, name[0]: string]
// ExpressionEncoders may or may not be flat
scala> personExprEncoder.flat
res2: Boolean = false
// The Serializer part of the encoder
scala> personExprEncoder.serializer
res3: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#0L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#1)
// The Deserializer part of the encoder
scala> personExprEncoder.deserializer
res4: org.apache.spark.sql.catalyst.expressions.Expression = newInstance(class Person)
scala> personExprEncoder.namedExpressions
res5: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#2L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#3)
// A record in a Dataset[Person]
// A mere instance of Person case class
// There could be a thousand of Person in a large dataset
val jacek = Person(0, "Jacek")
// Serialize a record to the internal representation, i.e. InternalRow
scala> val row = personExprEncoder.toRow(jacek)
row: org.apache.spark.sql.catalyst.InternalRow = [0,0,1800000005,6b6563614a]
// Spark uses InternalRows internally for IO
// Let's deserialize it to a JVM object, i.e. a Scala object
import org.apache.spark.sql.catalyst.dsl.expressions._
// in spark-shell there are competing implicits
// That's why DslSymbol is used explicitly in the following line
scala> val attrs = Seq(DslSymbol('id).long, DslSymbol('name).string)
attrs: Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference] = List(id#8L, name#9)
scala> val jacekReborn = personExprEncoder.resolveAndBind(attrs).fromRow(row)
jacekReborn: Person = Person(0,Jacek)
// Are the jacek instances same?
scala> jacek == jacekReborn
res6: Boolean = true
You can create custom encoders using static methods of Encoders
object. Note however that encoders for common Scala types and their product types are already available in implicits
object.
val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
Tip
|
The default encoders are already imported in spark-shell. |
Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.
Note
|
In Spark SQL 2.0 DataFrame type is a mere type alias for Dataset[Row] with RowEncoder being the encoder.
|
ExpressionEncoder
case class ExpressionEncoder[T](
schema: StructType,
flat: Boolean,
serializer: Seq[Expression],
deserializer: Expression,
clsTag: ClassTag[T])
extends Encoder[T]
ExpressionEncoder
is the one and only implementation of the Encoder
trait in Spark 2.0 with additional properties, i.e. flat
, one or many serializers
and a deserializer
expressions.
A ExpressionEncoder
can be flat is which case there is only one Catalyst expression for the serializer.
Serializer expressions are used to encode an object of type T
to a InternalRow. It is assumed that all serializer expressions contain at least one and the same BoundReference
.
Caution
|
FIXME What’s BoundReference ?
|
Deserializer expression is used to decode an InternalRow to an object of type T
.
Internally, a ExpressionEncoder
creates a UnsafeProjection
(for the input serializer), a InternalRow (of size 1
), and a safe Projection
(for the input deserializer). They are all internal lazy attributes of the encoder.
Creating Custom Encoders (Encoders object)
Encoders
factory object defines methods to create Encoder
instances.
Import org.apache.spark.sql
package to have access to the Encoders
factory object.
import org.apache.spark.sql.Encoders
scala> Encoders.LONG
res1: org.apache.spark.sql.Encoder[Long] = class[value[0]: bigint]
You can find methods to create encoders for Java’s object types, e.g. Boolean
, Integer
, Long
, Double
, String
, java.sql.Timestamp
or Byte
array, that could be composed to create more advanced encoders for Java bean classes (using bean
method).
import org.apache.spark.sql.Encoders
scala> Encoders.STRING
res2: org.apache.spark.sql.Encoder[String] = class[value[0]: string]
You can also create encoders based on Kryo or Java serializers.
import org.apache.spark.sql.Encoders
case class Person(id: Int, name: String, speaksPolish: Boolean)
scala> Encoders.kryo[Person]
res3: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]
scala> Encoders.javaSerialization[Person]
res5: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]
You can create encoders for Scala’s tuples and case classes, Int
, Long
, Double
, etc.
import org.apache.spark.sql.Encoders
scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean)
res9: org.apache.spark.sql.Encoder[(Long, String, Boolean)] = class[_1[0]: bigint, _2[0]: string, _3[0]: boolean]
Further reading or watching
-
(video) Modern Spark DataFrame and Dataset (Intermediate Tutorial) by Adam Breindel from Databricks.