// Batch reading
val people: DataFrame = spark.read
.format("csv")
.load("people.csv")
// Streamed reading
val messages: DataFrame = spark.readStream
.format("kafka")
.option("subscribe", "topic")
.option("kafka.bootstrap.servers", "localhost:9092")
.load
DataSource
— Pluggable Data Source
DataSource
belongs to the Data Source API (along with DataFrameReader for loading datasets, DataFrameWriter for saving datasets and StreamSourceProvider for creating streaming sources).
DataSource
is an internal class that represents a pluggable data source in Spark SQL with few extension points to further enrich the capabilities of Spark SQL.
Extension Point | Description |
---|---|
Used in: 1. sourceSchema and createSource for streamed reading 2. createSink for streamed writing 3. resolveRelation for resolved BaseRelation. |
|
|
Used in: 1. sourceSchema for streamed reading 2. write for writing a |
|
Used in write for writing a |
As a user, you interact with DataSource
by DataFrameReader (when you execute spark.read or spark.readStream) or CREATE TABLE USING DDL
.
DataSource
uses a SparkSession, a class name, a collection of paths
, optional user-specified schema, a collection of partition columns, a bucket specification, and configuration options.
createSink
Method
Caution
|
FIXME |
Creating DataSource
Instance
class DataSource(
sparkSession: SparkSession,
className: String,
paths: Seq[String] = Nil,
userSpecifiedSchema: Option[StructType] = None,
partitionColumns: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
options: Map[String, String] = Map.empty,
catalogTable: Option[CatalogTable] = None)
When being created, DataSource
first looks up the providing class given className
(considering it an alias or a fully-qualified class name) and computes the name and schema of the data source.
Note
|
DataSource does the initialization lazily on demand and only once.
|
sourceSchema
Internal Method
sourceSchema(): SourceInfo
sourceSchema
returns the name and schema of the data source for streamed reading.
Caution
|
FIXME Why is the method called? Why does this bother with streamed reading and data sources?! |
It supports two class hierarchies, i.e. StreamSourceProvider and FileFormat
data sources.
Internally, sourceSchema
first creates an instance of the data source and…
Caution
|
FIXME Finish… |
For StreamSourceProvider
data sources, sourceSchema
relays calls to StreamSourceProvider.sourceSchema
.
For FileFormat
data sources, sourceSchema
makes sure that path
option was specified.
Tip
|
path is looked up in a case-insensitive way so paTh and PATH and pAtH are all acceptable. Use the lower-case version of path , though.
|
Note
|
path can use glob pattern (not regex syntax), i.e. contain any of {}[]*?\ characters.
|
It checks whether the path exists if a glob pattern is not used. In case it did not exist you will see the following AnalysisException
exception in the logs:
scala> spark.read.load("the.file.does.not.exist.parquet")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/Users/jacek/dev/oss/spark/the.file.does.not.exist.parquet;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:375)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:364)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:364)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:149)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:132)
... 48 elided
If spark.sql.streaming.schemaInference is disabled and the data source is different than TextFileFormat
, and the input userSpecifiedSchema
is not specified, the following IllegalArgumentException
exception is thrown:
Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.
Caution
|
FIXME I don’t think the exception will ever happen for non-streaming sources since the schema is going to be defined earlier. When? |
Eventually, it returns a SourceInfo
with FileSource[path]
and the schema (as calculated using the inferFileFormatSchema internal method).
For any other data source, it throws UnsupportedOperationException
exception:
Data source [className] does not support streamed reading
inferFileFormatSchema
Internal Method
inferFileFormatSchema(format: FileFormat): StructType
inferFileFormatSchema
private method computes (aka infers) schema (as StructType). It returns userSpecifiedSchema
if specified or uses FileFormat.inferSchema
. It throws a AnalysisException
when is unable to infer schema.
It uses path
option for the list of directory paths.
Note
|
It is used by DataSource.sourceSchema and DataSource.createSource when FileFormat is processed.
|
write
Method
write(
mode: SaveMode,
data: DataFrame): BaseRelation
write
does…
Internally, write
makes sure that CalendarIntervalType
is not used in the schema of data
DataFrame
and throws a AnalysisException
when there is one.
write
then looks up the data source implementation (using the constructor’s className
).
Note
|
The DataSource implementation can be of type CreatableRelationProvider or FileFormat .
|
For FileFormat
data sources, write
takes all paths
and path
option and makes sure that there is only one.
Note
|
write uses Hadoop’s Path to access the FileSystem and calculate the qualified output path.
|
write
does PartitioningUtils.validatePartitionColumn
.
Caution
|
FIXME What is PartitioningUtils.validatePartitionColumn for?
|
When appending to a table, …FIXME
In the end, write
(for a FileFormat
data source) prepares a InsertIntoHadoopFsRelationCommand
logical plan with executes it.
Caution
|
FIXME Is toRdd a job execution?
|
For CreatableRelationProvider
data sources, CreatableRelationProvider.createRelation
is executed.
Note
|
write is executed when…
|
lookupDataSource
Internal Method
lookupDataSource(provider0: String): Class[_]
Internally, lookupDataSource
first searches the classpath for available DataSourceRegister providers (using Java’s ServiceLoader.load method) to find the requested data source by short name (alias), e.g. parquet
or kafka
.
If a DataSource
could not be found by short name, lookupDataSource
tries to load the class given the input provider0
or its variant provider0.DefaultSource
(with .DefaultSource
suffix).
Note
|
You can reference your own custom DataSource in your code by DataFrameWriter.format method which is the alias or fully-qualified class name.
|
There has to be one data source registered only or you will see the following RuntimeException
:
Multiple sources found for [provider] ([comma-separated class names]), please specify the fully qualified class name.
Creating BaseRelation
for Reading or Writing — resolveRelation
Method
resolveRelation(checkFilesExist: Boolean = true): BaseRelation
resolveRelation
resolves (i.e. creates) a BaseRelation to read from or write to a DataSource
.
Internally, resolveRelation
creates an instance of providingClass
(for a DataSource
) and acts according to its type, i.e. SchemaRelationProvider
, RelationProvider
or FileFormat
.
Provider | Behaviour |
---|---|
|
Executes |
|
Executes |
|
Creates a HadoopFsRelation. |