DataSource — Pluggable Data Source

DataSource belongs to the Data Source API (along with DataFrameReader for loading datasets, DataFrameWriter for saving datasets and StreamSourceProvider for creating streaming sources).

DataSource is an internal class that represents a pluggable data source in Spark SQL with few extension points to further enrich the capabilities of Spark SQL.

Table 1. DataSource's Extension Points
Extension Point Description

StreamSourceProvider

Used in:

1. sourceSchema and createSource for streamed reading

2. createSink for streamed writing

3. resolveRelation for resolved BaseRelation.

FileFormat

Used in:

1. sourceSchema for streamed reading

2. write for writing a DataFrame to a DataSource (as part of creating a table as select)

CreatableRelationProvider

Used in write for writing a DataFrame to a DataSource (as part of creating a table as select).

As a user, you interact with DataSource by DataFrameReader (when you execute spark.read or spark.readStream) or CREATE TABLE USING DDL.

// Batch reading
val people: DataFrame = spark.read
  .format("csv")
  .load("people.csv")

// Streamed reading
val messages: DataFrame = spark.readStream
  .format("kafka")
  .option("subscribe", "topic")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .load

DataSource uses a SparkSession, a class name, a collection of paths, optional user-specified schema, a collection of partition columns, a bucket specification, and configuration options.

createSource Method

createSource(metadataPath: String): Source
Caution
FIXME

createSink Method

Caution
FIXME

Creating DataSource Instance

class DataSource(
  sparkSession: SparkSession,
  className: String,
  paths: Seq[String] = Nil,
  userSpecifiedSchema: Option[StructType] = None,
  partitionColumns: Seq[String] = Seq.empty,
  bucketSpec: Option[BucketSpec] = None,
  options: Map[String, String] = Map.empty,
  catalogTable: Option[CatalogTable] = None)

When being created, DataSource first looks up the providing class given className (considering it an alias or a fully-qualified class name) and computes the name and schema of the data source.

Note
DataSource does the initialization lazily on demand and only once.

sourceSchema Internal Method

sourceSchema(): SourceInfo

sourceSchema returns the name and schema of the data source for streamed reading.

Caution
FIXME Why is the method called? Why does this bother with streamed reading and data sources?!

It supports two class hierarchies, i.e. StreamSourceProvider and FileFormat data sources.

Internally, sourceSchema first creates an instance of the data source and…​

Caution
FIXME Finish…​

For StreamSourceProvider data sources, sourceSchema relays calls to StreamSourceProvider.sourceSchema.

For FileFormat data sources, sourceSchema makes sure that path option was specified.

Tip
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path, though.
Note
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.

It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException exception in the logs:

scala> spark.read.load("the.file.does.not.exist.parquet")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
  ... 48 elided

If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat, and the input userSpecifiedSchema is not specified, the following IllegalArgumentException exception is thrown:

Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
Caution
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When?

Eventually, it returns a SourceInfo with FileSource[path] and the schema (as calculated using the inferFileFormatSchema internal method).

For any other data source, it throws UnsupportedOperationException exception:

Data source [className] does not support streamed reading

inferFileFormatSchema Internal Method

inferFileFormatSchema(format: FileFormat): StructType

inferFileFormatSchema private method computes (aka infers) schema (as StructType). It returns userSpecifiedSchema if specified or uses FileFormat.inferSchema. It throws a AnalysisException when is unable to infer schema.

It uses path option for the list of directory paths.

Note
It is used by DataSource.sourceSchema and DataSource.createSource when FileFormat is processed.

write Method

write(
  mode: SaveMode,
  data: DataFrame): BaseRelation

write does…​

Internally, write makes sure that CalendarIntervalType is not used in the schema of data DataFrame and throws a AnalysisException when there is one.

write then looks up the data source implementation (using the constructor’s className).

Note
The DataSource implementation can be of type CreatableRelationProvider or FileFormat.

For FileFormat data sources, write takes all paths and path option and makes sure that there is only one.

Note
write uses Hadoop’s Path to access the FileSystem and calculate the qualified output path.

write does PartitioningUtils.validatePartitionColumn.

Caution
FIXME What is PartitioningUtils.validatePartitionColumn for?

When appending to a table, …​FIXME

In the end, write (for a FileFormat data source) prepares a InsertIntoHadoopFsRelationCommand logical plan with executes it.

Caution
FIXME Is toRdd a job execution?

For CreatableRelationProvider data sources, CreatableRelationProvider.createRelation is executed.

Note
write is executed when…​

lookupDataSource Internal Method

lookupDataSource(provider0: String): Class[_]

Internally, lookupDataSource first searches the classpath for available DataSourceRegister providers (using Java’s ServiceLoader.load method) to find the requested data source by short name (alias), e.g. parquet or kafka.

If a DataSource could not be found by short name, lookupDataSource tries to load the class given the input provider0 or its variant provider0.DefaultSource (with .DefaultSource suffix).

Note
You can reference your own custom DataSource in your code by DataFrameWriter.format method which is the alias or fully-qualified class name.

There has to be one data source registered only or you will see the following RuntimeException:

Multiple sources found for [provider] ([comma-separated class names]), please specify the fully qualified class name.

Creating BaseRelation for Reading or Writing — resolveRelation Method

resolveRelation(checkFilesExist: Boolean = true): BaseRelation

resolveRelation resolves (i.e. creates) a BaseRelation to read from or write to a DataSource.

Internally, resolveRelation creates an instance of providingClass (for a DataSource) and acts according to its type, i.e. SchemaRelationProvider, RelationProvider or FileFormat.

Table 2. resolveRelation and Resolving BaseRelation per (Schema) Providers
Provider Behaviour

SchemaRelationProvider

Executes SchemaRelationProvider.createRelation with the provided schema.

RelationProvider

Executes RelationProvider.createRelation.

FileFormat

Creates a HadoopFsRelation.

results matching ""

    No results matching ""