Encoders — Internal Row Converters

Encoder is the fundamental concept in the serialization and deserialization (SerDe) framework in Spark SQL 2.0. Spark SQL uses the SerDe framework for IO to make it efficient time- and space-wise.

Tip
Spark has borrowed the idea from the Hive SerDe library so it might be worthwhile to get familiar with Hive a little bit, too.

Encoders are modelled in Spark SQL 2.0 as Encoder[T] trait.

trait Encoder[T] extends Serializable {
  def schema: StructType
  def clsTag: ClassTag[T]
}

The type T stands for the type of records a Encoder[T] can deal with. An encoder of type T, i.e. Encoder[T], is used to convert (encode and decode) any JVM object or primitive of type T (that could be your domain object) to and from Spark SQL’s InternalRow which is the internal binary row format representation (using Catalyst expressions and code generation).

Note
Encoder is also called "a container of serde expressions in Dataset".
Note
The one and only implementation of the Encoder trait in Spark SQL 2.0 is ExpressionEncoder.

Encoders are integral (and internal) part of any Dataset[T] (of records of type T) with a Encoder[T] that is used to serialize and deserialize the records of this dataset.

Note
Dataset[T] type is a Scala type constructor with the type parameter T. So is Encoder[T] that handles serialization and deserialization of T to the internal representation.

Encoders know the schema of the records. This is how they offer significantly faster serialization and deserialization (comparing to the default Java or Kryo serializers).

// The domain object for your records in a large dataset
case class Person(id: Long, name: String)

import org.apache.spark.sql.Encoders

scala> val personEncoder = Encoders.product[Person]
personEncoder: org.apache.spark.sql.Encoder[Person] = class[id[0]: bigint, name[0]: string]

scala> personEncoder.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(id,LongType,false), StructField(name,StringType,true))

scala> personEncoder.clsTag
res1: scala.reflect.ClassTag[Person] = Person

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder

scala> val personExprEncoder = personEncoder.asInstanceOf[ExpressionEncoder[Person]]
personExprEncoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[Person] = class[id[0]: bigint, name[0]: string]

// ExpressionEncoders may or may not be flat
scala> personExprEncoder.flat
res2: Boolean = false

// The Serializer part of the encoder
scala> personExprEncoder.serializer
res3: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#0L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#1)

// The Deserializer part of the encoder
scala> personExprEncoder.deserializer
res4: org.apache.spark.sql.catalyst.expressions.Expression = newInstance(class Person)

scala> personExprEncoder.namedExpressions
res5: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression] = List(assertnotnull(input[0, Person, true], top level non-flat input object).id AS id#2L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, Person, true], top level non-flat input object).name, true) AS name#3)

// A record in a Dataset[Person]
// A mere instance of Person case class
// There could be a thousand of Person in a large dataset
val jacek = Person(0, "Jacek")

// Serialize a record to the internal representation, i.e. InternalRow
scala> val row = personExprEncoder.toRow(jacek)
row: org.apache.spark.sql.catalyst.InternalRow = [0,0,1800000005,6b6563614a]

// Spark uses InternalRows internally for IO
// Let's deserialize it to a JVM object, i.e. a Scala object
import org.apache.spark.sql.catalyst.dsl.expressions._

// in spark-shell there are competing implicits
// That's why DslSymbol is used explicitly in the following line
scala> val attrs = Seq(DslSymbol('id).long, DslSymbol('name).string)
attrs: Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference] = List(id#8L, name#9)

scala> val jacekReborn = personExprEncoder.resolveAndBind(attrs).fromRow(row)
jacekReborn: Person = Person(0,Jacek)

// Are the jacek instances same?
scala> jacek == jacekReborn
res6: Boolean = true

You can create custom encoders using static methods of Encoders object. Note however that encoders for common Scala types and their product types are already available in implicits object.

val spark = SparkSession.builder.getOrCreate()
import spark.implicits._
Tip
The default encoders are already imported in spark-shell.

Encoders map columns (of your dataset) to fields (of your JVM object) by name. It is by Encoders that you can bridge JVM objects to data sources (CSV, JDBC, Parquet, Avro, JSON, Cassandra, Elasticsearch, memsql) and vice versa.

Note
In Spark SQL 2.0 DataFrame type is a mere type alias for Dataset[Row] with RowEncoder being the encoder.

ExpressionEncoder

case class ExpressionEncoder[T](
  schema: StructType,
  flat: Boolean,
  serializer: Seq[Expression],
  deserializer: Expression,
  clsTag: ClassTag[T])
extends Encoder[T]

ExpressionEncoder is the one and only implementation of the Encoder trait in Spark 2.0 with additional properties, i.e. flat, one or many serializers and a deserializer expressions.

A ExpressionEncoder can be flat is which case there is only one Catalyst expression for the serializer.

Serializer expressions are used to encode an object of type T to a InternalRow. It is assumed that all serializer expressions contain at least one and the same BoundReference.

Caution
FIXME What’s BoundReference?

Deserializer expression is used to decode an InternalRow to an object of type T.

Internally, a ExpressionEncoder creates a UnsafeProjection (for the input serializer), a InternalRow (of size 1), and a safe Projection (for the input deserializer). They are all internal lazy attributes of the encoder.

Creating Custom Encoders (Encoders object)

Encoders factory object defines methods to create Encoder instances.

Import org.apache.spark.sql package to have access to the Encoders factory object.

import org.apache.spark.sql.Encoders

scala> Encoders.LONG
res1: org.apache.spark.sql.Encoder[Long] = class[value[0]: bigint]

You can find methods to create encoders for Java’s object types, e.g. Boolean, Integer, Long, Double, String, java.sql.Timestamp or Byte array, that could be composed to create more advanced encoders for Java bean classes (using bean method).

import org.apache.spark.sql.Encoders

scala> Encoders.STRING
res2: org.apache.spark.sql.Encoder[String] = class[value[0]: string]

You can also create encoders based on Kryo or Java serializers.

import org.apache.spark.sql.Encoders

case class Person(id: Int, name: String, speaksPolish: Boolean)

scala> Encoders.kryo[Person]
res3: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]

scala> Encoders.javaSerialization[Person]
res5: org.apache.spark.sql.Encoder[Person] = class[value[0]: binary]

You can create encoders for Scala’s tuples and case classes, Int, Long, Double, etc.

import org.apache.spark.sql.Encoders

scala> Encoders.tuple(Encoders.scalaLong, Encoders.STRING, Encoders.scalaBoolean)
res9: org.apache.spark.sql.Encoder[(Long, String, Boolean)] = class[_1[0]: bigint, _2[0]: string, _3[0]: boolean]

Further reading or watching

results matching ""

    No results matching ""