DataFrameReader — Reading from External Data Sources

DataFrameReader is an interface to read data from external data sources, e.g. files, Hive tables or JDBC (including Spark Thrift Server), into a DataFrame.

You use SparkSession.read to access an instance of DataFrameReader.

val spark: SparkSession = SparkSession.builder.getOrCreate

import org.apache.spark.sql.DataFrameReader
val reader: DataFrameReader = spark.read

DataFrameReader supports many file formats and interface for new ones.

Note
DataFrameReader assumes parquet file format by default that you can change using spark.sql.sources.default setting.

As of Spark 2.0, DataFrameReader can read text files using textFile methods that return Dataset[String] (not DataFrames which are Dataset[Row] and therefore untyped).

After you describe the external data source using DataFrameReader's methods you can trigger the loading using one of load methods.

spark.read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("*.csv")

Specifying Data Format — format method

format(source: String): DataFrameReader

You use format to configure DataFrameReader to use appropriate source format.

Supported data formats:

  • json

  • csv (since 2.0.0)

  • parquet (see Parquet)

  • orc

  • text

  • jdbc

  • libsvm — only when used in format("libsvm")

Note
You can improve your understanding of format("jdbc") with the exercise Creating DataFrames from Tables using JDBC and PostgreSQL.

Specifying Input Schema — schema method

schema(schema: StructType): DataFrameReader

You can specify a schema of the input data source.

Tip
Refer to Schema.

Adding Extra Configuration Options — option and options methods

option(key: String, value: String): DataFrameReader
option(key: String, value: Boolean): DataFrameReader  (1)
option(key: String, value: Long): DataFrameReader     (1)
option(key: String, value: Double): DataFrameReader   (1)
  1. Available since Spark 2.0.0

You can also use options method to describe different options in a single Map.

options(options: scala.collection.Map[String, String]): DataFrameReader

Loading Datasets (into DataFrame) — load methods

load(): DataFrame
load(path: String): DataFrame
load(paths: String*): DataFrame

load loads input data as a DataFrame.

Internally, load creates a DataSource (for the current SparkSession, a user-specified schema, a source format and options). It then immediately resolves it and converts BaseRelation into a DataFrame.

Creating DataFrames from Files

DataFrameReader supports the following file formats:

json method

json(path: String): DataFrame
json(paths: String*): DataFrame
json(jsonRDD: RDD[String]): DataFrame

New in 2.0.0: prefersDecimal

csv method

csv(path: String): DataFrame
csv(paths: String*): DataFrame

parquet method

parquet(path: String): DataFrame
parquet(paths: String*): DataFrame

The supported options:

New in 2.0.0: snappy is the default Parquet codec. See [SPARK-14482][SQL] Change default Parquet codec from gzip to snappy.

The compressions supported:

  • none or uncompressed

  • snappy - the default codec in Spark 2.0.0.

  • gzip - the default codec in Spark before 2.0.0

  • lzo

val tokens = Seq("hello", "henry", "and", "harry")
  .zipWithIndex
  .map(_.swap)
  .toDF("id", "token")

val parquetWriter = tokens.write
parquetWriter.option("compression", "none").save("hello-none")

// The exception is mostly for my learning purposes
// so I know where and how to find the trace to the compressions
// Sorry...
scala> parquetWriter.option("compression", "unsupported").save("hello-unsupported")
java.lang.IllegalArgumentException: Codec [unsupported] is not available. Available codecs are uncompressed, gzip, lzo, snappy, none.
  at org.apache.spark.sql.execution.datasources.parquet.ParquetOptions.<init>(ParquetOptions.scala:43)
  at org.apache.spark.sql.execution.datasources.parquet.DefaultSource.prepareWrite(ParquetRelation.scala:77)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$4.apply(InsertIntoHadoopFsRelation.scala:122)
  at org.apache.spark.sql.execution.datasources.BaseWriterContainer.driverSideSetup(WriterContainer.scala:103)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:141)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:116)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:61)
  at org.apache.spark.sql.execution.command.ExecutedCommand.sideEffectResult(commands.scala:59)
  at org.apache.spark.sql.execution.command.ExecutedCommand.doExecute(commands.scala:73)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:137)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:134)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:117)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:65)
  at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:390)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
  ... 48 elided

orc method

orc(path: String): DataFrame
orc(paths: String*): DataFrame

Optimized Row Columnar (ORC) file format is a highly efficient columnar format to store Hive data with more than 1,000 columns and improve performance. ORC format was introduced in Hive version 0.11 to use and retain the type information from the table definition.

Tip
Read ORC Files document to learn about the ORC file format.

text method

text method loads a text file.

text(path: String): DataFrame
text(paths: String*): DataFrame
Example
val lines: Dataset[String] = spark.read.text("README.md").as[String]

scala> lines.show
+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|                    |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
|                    |
|<http://spark.apa...|
|                    |
|                    |
|## Online Documen...|
|                    |
|You can find the ...|
|guide, on the [pr...|
|and [project wiki...|
|This README file ...|
|                    |
|   ## Building Spark|
+--------------------+
only showing top 20 rows

Creating DataFrames from Tables

table method

table(tableName: String): DataFrame

table method returns the tableName table as a DataFrame.

scala> spark.sql("SHOW TABLES").show(false)
+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|dafa     |false      |
+---------+-----------+

scala> spark.read.table("dafa").show(false)
+---+-------+
|id |text   |
+---+-------+
|1  |swiecie|
|0  |hello  |
+---+-------+
Caution
FIXME The method uses spark.sessionState.sqlParser.parseTableIdentifier(tableName) and spark.sessionState.catalog.lookupRelation. Would be nice to learn a bit more on their internals, huh?

Accessing JDBC Data Sources — jdbc method

Note
jdbc method uses java.util.Properties (and appears so Java-centric). Use format("jdbc") instead.
jdbc(url: String, table: String, properties: Properties): DataFrame
jdbc(url: String, table: String,
  parts: Array[Partition],
  connectionProperties: Properties): DataFrame
jdbc(url: String, table: String,
  predicates: Array[String],
  connectionProperties: Properties): DataFrame
jdbc(url: String, table: String,
  columnName: String,
  lowerBound: Long,
  upperBound: Long,
  numPartitions: Int,
  connectionProperties: Properties): DataFrame

jdbc allows you to create DataFrame that represents table in the database available as url.

Reading Text Files — textFile methods

textFile(path: String): Dataset[String]
textFile(paths: String*): Dataset[String]

textFile methods query text files as a Dataset[String].

spark.read.textFile("README.md")
Note
textFile are similar to text family of methods in that they both read text files but text methods return untyped DataFrame while textFile return typed Dataset[String].

Internally, textFile passes calls on to text method and selects the only value column before it applies Encoders.STRING encoder.

results matching ""

    No results matching ""