Structured Streaming — Streaming Datasets

Structured Streaming is a new computation model introduced in Spark 2.0.0 for building end-to-end streaming applications termed as continuous applications. Structured streaming offers a high-level declarative streaming API built on top of Datasets (inside Spark SQL’s engine) for continuous incremental execution of structured queries.

The semantics of the Structured Streaming model is as follows (see the article Structured Streaming In Apache Spark):

At any time, the output of a continuous application is equivalent to executing a batch job on a prefix of the data.

Structured streaming is an attempt to unify streaming, interactive, and batch queries that paves the way for continuous applications like continuous aggregations using groupBy operator or continuous windowed aggregations using groupBy operator with window function.

// business object
case class Person(id: Long, name: String, city: String)

// you could build your schema manually
import org.apache.spark.sql.types._
val schema = StructType(
  StructField("id", LongType, false) ::
  StructField("name", StringType, false) ::
  StructField("city", StringType, false) :: Nil)

// ...but is error-prone and time-consuming, isn't it?
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Person].schema

val people = spark.readStream
  .schema(schema)
  .csv("in/*.csv")
  .as[Person]

// people has this additional capability of being streaming
scala> people.isStreaming
res0: Boolean = true

// ...but it is still a Dataset.
// (Almost) any Dataset operation is available
val population = people.groupBy('city).agg(count('city) as "population")

// Start the streaming query
// print the result out to the console
// Only Complete output mode supported for groupBy
import org.apache.spark.sql.streaming.OutputMode.Complete
val populationStream = population.writeStream
  .format("console")
  .outputMode(Complete)
  .queryName("Population")
  .start

scala> populationStream.explain(extended = true)
== Parsed Logical Plan ==
Aggregate [city#112], [city#112, count(city#112) AS population#19L]
+- Relation[id#110L,name#111,city#112] csv

== Analyzed Logical Plan ==
city: string, population: bigint
Aggregate [city#112], [city#112, count(city#112) AS population#19L]
+- Relation[id#110L,name#111,city#112] csv

== Optimized Logical Plan ==
Aggregate [city#112], [city#112, count(city#112) AS population#19L]
+- Project [city#112]
   +- Relation[id#110L,name#111,city#112] csv

== Physical Plan ==
*HashAggregate(keys=[city#112], functions=[count(city#112)], output=[city#112, population#19L])
+- Exchange hashpartitioning(city#112, 200)
   +- *HashAggregate(keys=[city#112], functions=[partial_count(city#112)], output=[city#112, count#118L])
      +- *FileScan csv [city#112] Batched: false, Format: CSV, InputPaths: file:/Users/jacek/dev/oss/spark/in/1.csv, file:/Users/jacek/dev/oss/spark/in/2.csv, file:/Users/j..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<city:string>

// Let's query for all active streams
scala> spark.streams.active.foreach(println)
Streaming Query - Population [state = ACTIVE]

// You may eventually want to stop the streaming query
// spark.streams.active.head.stop

// ...or be more explicit about the right query to stop
scala> populationStream.isActive
res1: Boolean = true

scala> populationStream.stop

scala> populationStream.isActive
res2: Boolean = false

With structured streaming, Spark 2.0 aims at simplifying streaming analytics with little to no need to reason about effective data streaming. It is that Spark 2.0 tries to hide the unnecessary complexity in your streaming analytics architectures with Spark.

Structured streaming introduces the streaming datasets that are infinite datasets with primitives like input streaming sources and output streaming sinks, event time, windowing, and sessions. You can specify output mode of a streaming dataset which is what gets written to a streaming sink when there is new data available.

Tip

A Dataset is streaming when its logical plan is streaming.

Structured streaming is defined by the following data abstractions in org.apache.spark.sql.streaming package:

With Datasets being Spark SQL’s view of structured data, structured streaming checks input sources for new data every trigger (time) and executes the (continuous) queries.

Tip
Watch SPARK-8360 Streaming DataFrames to track progress of the feature.
Tip
Read the official programming guide of Spark about Structured Streaming.
Note
The feature has also been called Streaming Spark SQL Query, Streaming DataFrames, Continuous DataFrame or Continuous Query. There have been lots of names before the Spark project settled on Structured Streaming.

Example — Streaming Query for Running Counts (over Words from Socket with Output to Console)

Note
The example is "borrowed" from the official documentation of Spark. Changes and errors are only mine.
Tip
You need to run nc -lk 9999 first before running the example.
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load
  .as[String]

val words = lines.flatMap(_.split("\\W+"))

scala> words.printSchema
root
 |-- value: string (nullable = true)

val counter = words.groupBy("value").count

// nc -lk 9999 is supposed to be up at this point

import org.apache.spark.sql.streaming.OutputMode.Complete
val query = counter.writeStream
  .outputMode(Complete)
  .format("console")
  .start

query.stop

Example — Streaming Query over CSV Files with Output to Console Every 5 Seconds

Below you can find a complete example of a streaming query in a form of DataFrame of data from csv-logs files in csv format of a given schema into a ConsoleSink every 5 seconds.

Tip
Copy and paste it to Spark Shell in :paste mode to run it.
// Explicit schema with nullables false
import org.apache.spark.sql.types._
val schemaExp = StructType(
  StructField("name", StringType, false) ::
  StructField("city", StringType, true) ::
  StructField("country", StringType, true) ::
  StructField("age", IntegerType, true) ::
  StructField("alive", BooleanType, false) :: Nil
)

// Implicit inferred schema
val schemaImp = spark.read
  .format("csv")
  .option("header", true)
  .option("inferSchema", true)
  .load("csv-logs")
  .schema

val in = spark.readStream
  .schema(schemaImp)
  .format("csv")
  .option("header", true)
  .option("maxFilesPerTrigger", 1)
  .load("csv-logs")

scala> in.printSchema
root
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- alive: boolean (nullable = true)

println("Is the query streaming" + in.isStreaming)

println("Are there any streaming queries?" + spark.streams.active.isEmpty)

import scala.concurrent.duration._
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.streaming.OutputMode.Append
val out = in.writeStream
  .format("console")
  .trigger(ProcessingTime(5.seconds))
  .queryName("consoleStream")
  .outputMode(Append)
  .start()

16/07/13 12:32:11 TRACE FileStreamSource: Listed 3 file(s) in 4.274022 ms
16/07/13 12:32:11 TRACE FileStreamSource: Files are:
	file:///Users/jacek/dev/oss/spark/csv-logs/people-1.csv
	file:///Users/jacek/dev/oss/spark/csv-logs/people-2.csv
	file:///Users/jacek/dev/oss/spark/csv-logs/people-3.csv
16/07/13 12:32:11 DEBUG FileStreamSource: New file: file:///Users/jacek/dev/oss/spark/csv-logs/people-1.csv
16/07/13 12:32:11 TRACE FileStreamSource: Number of new files = 3
16/07/13 12:32:11 TRACE FileStreamSource: Number of files selected for batch = 1
16/07/13 12:32:11 TRACE FileStreamSource: Number of seen files = 1
16/07/13 12:32:11 INFO FileStreamSource: Max batch id increased to 0 with 1 new files
16/07/13 12:32:11 INFO FileStreamSource: Processing 1 files from 0:0
16/07/13 12:32:11 TRACE FileStreamSource: Files are:
	file:///Users/jacek/dev/oss/spark/csv-logs/people-1.csv
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+--------+-------+---+-----+
| name|    city|country|age|alive|
+-----+--------+-------+---+-----+
|Jacek|Warszawa| Polska| 42| true|
+-----+--------+-------+---+-----+

spark.streams
  .active
  .foreach(println)
// Streaming Query - consoleStream [state = ACTIVE]

scala> spark.streams.active(0).explain
== Physical Plan ==
*Scan csv [name#130,city#131,country#132,age#133,alive#134] Format: CSV, InputPaths: file:/Users/jacek/dev/oss/spark/csv-logs/people-3.csv, PushedFilters: [], ReadSchema: struct<name:string,city:string,country:string,age:int,alive:boolean>

Further reading or watching

results matching ""

    No results matching ""