val spark: SparkSession = SparkSession.builder.getOrCreate
import org.apache.spark.sql.DataFrameReader
val reader: DataFrameReader = spark.read
DataFrameReader
— Reading from External Data Sources
DataFrameReader
is an interface to read data from external data sources, e.g. files, Hive tables or JDBC (including Spark Thrift Server), into a DataFrame.
Note
|
You can define your own custom file formats. |
You use SparkSession.read to access an instance of DataFrameReader
.
DataFrameReader
supports many file formats and interface for new ones.
Note
|
DataFrameReader assumes parquet file format by default that you can change using spark.sql.sources.default setting.
|
As of Spark 2.0, DataFrameReader
can read text files using textFile methods that return Dataset[String]
(not DataFrames
which are Dataset[Row]
and therefore untyped).
After you describe the external data source using DataFrameReader
's methods you can trigger the loading using one of load methods.
spark.read
.format("csv")
.option("header", true)
.option("inferSchema", true)
.load("*.csv")
Specifying Data Format — format
method
format(source: String): DataFrameReader
You use format
to configure DataFrameReader
to use appropriate source
format.
Supported data formats:
-
json
-
csv
(since 2.0.0) -
parquet
(see Parquet) -
orc
-
text
-
jdbc
-
libsvm
— only when used informat("libsvm")
Note
|
You can improve your understanding of format("jdbc") with the exercise Creating DataFrames from Tables using JDBC and PostgreSQL.
|
Specifying Input Schema — schema
method
schema(schema: StructType): DataFrameReader
You can specify a schema
of the input data source.
Tip
|
Refer to Schema. |
Adding Extra Configuration Options — option
and options
methods
option(key: String, value: String): DataFrameReader
option(key: String, value: Boolean): DataFrameReader (1)
option(key: String, value: Long): DataFrameReader (1)
option(key: String, value: Double): DataFrameReader (1)
-
Available since Spark 2.0.0
You can also use options
method to describe different options in a single Map
.
options(options: scala.collection.Map[String, String]): DataFrameReader
Loading Datasets (into DataFrame
) — load
methods
load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrame
load
loads input data as a DataFrame
.
Internally, load
creates a DataSource
(for the current SparkSession, a user-specified schema, a source format and options). It then immediately resolves it and converts BaseRelation
into a DataFrame
.
Creating DataFrames from Files
DataFrameReader
supports the following file formats:
json
method
json(path: String): DataFrame
json(paths: String*): DataFrame
json(jsonRDD: RDD[String]): DataFrame
New in 2.0.0: prefersDecimal
parquet
method
parquet(path: String): DataFrame
parquet(paths: String*): DataFrame
The supported options:
-
compression (default:
snappy
)
New in 2.0.0: snappy
is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.
-
none
oruncompressed
-
snappy
- the default codec in Spark 2.0.0. -
gzip
- the default codec in Spark before 2.0.0 -
lzo
val tokens = Seq("hello", "henry", "and", "harry")
.zipWithIndex
.map(_.swap)
.toDF("id", "token")
val parquetWriter = tokens.write
parquetWriter.option("compression", "none").save("hello-none")
// The exception is mostly for my learning purposes
// so I know where and how to find the trace to the compressions
// Sorry...
scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported")
java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none.
at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43)
at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116)
at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61)
at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59)
at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
... 48 elided
orc
method
orc(path: String): DataFrame
orc(paths: String*): DataFrame
Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.
Tip
|
Read ORC Files document to learn about the ORC file format. |
text
method
text
method loads a text file.
text(path: String): DataFrame
text(paths: String*): DataFrame
Example
val lines: Dataset[String] = spark.read.text("README.md").as[String]
scala> lines.show
+--------------------+
| value|
+--------------------+
| # Apache Spark|
| |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
| |
|<http://spark.apa...|
| |
| |
|## Online Documen...|
| |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
| |
| ## Building Spark|
+--------------------+
only showing top 20 rows
Creating DataFrames from Tables
table
method
table(tableName: String): DataFrame
table
method returns the tableName
table as a DataFrame
.
scala> spark.sql("SHOW TABLES").show(false)
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa |false |
+---------+-----------+
scala> spark.read.table("dafa").show(false)
+---+-------+
|id |text |
+---+-------+
|1 |swiecie|
|0 |hello |
+---+-------+
Caution
|
FIXME The method uses spark.sessionState.sqlParser.parseTableIdentifier(tableName) and spark.sessionState.catalog.lookupRelation . Would be nice to learn a bit more on their internals, huh?
|
Accessing JDBC Data Sources — jdbc
method
Note
|
jdbc method uses java.util.Properties (and appears so Java-centric). Use format("jdbc") instead.
|
jdbc(url: String, table: String, properties: Properties): DataFrame
jdbc(url: String, table: String,
parts: Array[Partition],
connectionProperties: Properties): DataFrame
jdbc(url: String, table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame
jdbc(url: String, table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame
jdbc
allows you to create DataFrame
that represents table
in the database available as url
.
Tip
|
Review the exercise Creating DataFrames from Tables using JDBC and PostgreSQL. |
Reading Text Files — textFile
methods
textFile(path: String): Dataset[String]
textFile(paths: String*): Dataset[String]
textFile
methods query text files as a Dataset[String]
.
spark.read.textFile("README.md")
Note
|
textFile are similar to text family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String] .
|