RowEncoder — DataFrame Encoder

RowEncoder is a part of the Encoder framework and acts as the encoder for DataFrames, i.e. Dataset[Row] — Datasets of Rows.

Note
DataFrame type is a mere type alias for Dataset[Row] that expects a Encoder[Row] available in scope which is indeed RowEncoder itself.

RowEncoder is an object in Scala with apply and other factory methods.

RowEncoder can create ExpressionEncoder[Row] from a schema (using apply method).

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) :: Nil)

import org.apache.spark.sql.catalyst.encoders.RowEncoder
scala> val encoder = RowEncoder(schema)
encoder: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder[org.apache.spark.sql.Row] = class[id[0]: bigint, name[0]: string]

// RowEncoder is never flat
scala> encoder.flat
res0: Boolean = false

RowEncoder object belongs to org.apache.spark.sql.catalyst.encoders package.

Creating ExpressionEncoder of Rows — apply method

apply(schema: StructType): ExpressionEncoder[Row]

apply builds ExpressionEncoder of Row, i.e. ExpressionEncoder[Row], from the input StructType (as schema).

Internally, apply creates a BoundReference for the Row type and returns a ExpressionEncoder[Row] for the input schema, a CreateNamedStruct serializer (using serializerFor internal method), a deserializer for the schema, and the Row type.

serializerFor Internal Method

serializerFor(inputObject: Expression, inputType: DataType): Expression

serializerFor creates an Expression that is assumed to be CreateNamedStruct.

serializerFor takes the input inputType and:

  1. Returns the input inputObject as is for native types, i.e. NullType, BooleanType, ByteType, ShortType, IntegerType, LongType, FloatType, DoubleType, BinaryType, CalendarIntervalType.

    Caution
    FIXME What does being native type mean?
  2. For UserDefinedTypes, it takes the UDT class from the SQLUserDefinedType annotation or UDTRegistration object and returns an expression with Invoke to call serialize method on a NewInstance of the UDT class.

  3. For TimestampType, it returns an expression with a StaticInvoke to call fromJavaTimestamp on DateTimeUtils class.

  4. …​FIXME

Caution
FIXME Describe me.

StaticInvoke NonSQLExpression

case class StaticInvoke(
  staticObject: Class[_],
  dataType: DataType,
  functionName: String,
  arguments: Seq[Expression] = Nil,
  propagateNull: Boolean = true) extends NonSQLExpression

StaticInvoke is an Expression with no SQL representation that represents a static method call in Scala or Java. It supports generating Java code to evaluate itself.

StaticInvoke invokes functionName static method on staticObject object with arguments input parameters to produce a value of dataType type. If propagateNull is enabled and any of arguments is null, null is the result (without calling functionName function).

StaticInvoke is used in RowEncoder and Java’s encoders.

import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, nullable = false) ::
  StructField("name", StringType, nullable = false) :: Nil)

import org.apache.spark.sql.catalyst.encoders.RowEncoder
val encoder = RowEncoder(schema)

scala> encoder.serializer
res0: Seq[org.apache.spark.sql.catalyst.expressions.Expression] = List(validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 0, id), LongType) AS id#69L, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 1, name), StringType), true) AS name#70)

results matching ""

    No results matching ""