StreamingQueryManager — Streaming Query Management

Note
StreamingQueryManager is an experimental feature of Spark 2.0.0.

A StreamingQueryManager is the Management API for continuous queries per SQLContext.

Note
There is a single StreamingQueryManager instance per SQLContext session.

You can access StreamingQueryManager for the current SQLContext using SQLContext.streams method. It is lazily created when a SQLContext instance starts.

val queries = spark.streams

Initialization

StreamingQueryManager manages the following instances:

  • StateStoreCoordinatorRef (as stateStoreCoordinator)

  • StreamingQueryListenerBus (as listenerBus)

  • activeQueries which is a mutable mapping between query names and StreamingQuery objects.

startQuery

startQuery(name: String,
  checkpointLocation: String,
  df: DataFrame,
  sink: Sink,
  trigger: Trigger = ProcessingTime(0)): StreamingQuery

startQuery is a private[sql] method to start a StreamingQuery.

Note
It is called exclusively by DataStreamWriter.start.
Note
By default, trigger is ProcessingTime(0).

startQuery makes sure that activeQueries internal registry does not contain the query under name. It throws an IllegalArgumentException if it does.

It transforms the LogicalPlan of the input DataFrame df so all StreamingRelation "nodes" become StreamingExecutionRelation. It uses DataSource.createSource(metadataPath) where metadataPath is $checkpointLocation/sources/$nextSourceId. Otherwise, it returns the LogicalPlan untouched.

It finally creates StreamExecution and starts it. It also registers the StreamExecution instance in activeQueries internal registry.

Return All Active Continuous Queries per SQLContext

active: Array[StreamingQuery]

active method returns a collection of StreamingQuery instances for the current SQLContext.

Getting Active Continuous Query By Name

get(name: String): StreamingQuery

get method returns a StreamingQuery by name.

It may throw an IllegalArgumentException when no StreamingQuery exists for the name.

java.lang.IllegalArgumentException: There is no active query with name hello
  at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
  at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
  at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
  at scala.collection.AbstractMap.getOrElse(Map.scala:59)
  at org.apache.spark.sql.StreamingQueryManager.get(StreamingQueryManager.scala:58)
  ... 49 elided

StreamingQueryListener Management - Adding or Removing Listeners

  • addListener(listener: StreamingQueryListener): Unit adds listener to the internal listenerBus.

  • removeListener(listener: StreamingQueryListener): Unit removes listener from the internal listenerBus.

postListenerEvent

postListenerEvent(event: StreamingQueryListener.Event): Unit

postListenerEvent posts a StreamingQueryListener.Event to listenerBus.

StreamingQueryListener

Caution
FIXME

StreamingQueryListener is an interface for listening to query life cycle events, i.e. a query start, progress and termination events.

lastTerminatedQuery - internal barrier

Caution
FIXME Why is lastTerminatedQuery needed?

Used in:

  • awaitAnyTermination

  • awaitAnyTermination(timeoutMs: Long)

They all wait 10 millis before doing the check of lastTerminatedQuery being non-null.

It is set in:

  • resetTerminated() resets lastTerminatedQuery, i.e. sets it to null.

  • notifyQueryTermination(terminatedQuery: StreamingQuery) sets lastTerminatedQuery to be terminatedQuery and notifies all the threads that wait on awaitTerminationLock.

    It is called from StreamExecution.runBatches.

results matching ""

    No results matching ""