val queries = spark.streams
StreamingQueryManager — Streaming Query Management
Note
|
StreamingQueryManager is an experimental feature of Spark 2.0.0.
|
A StreamingQueryManager
is the Management API for continuous queries per SQLContext
.
Note
|
There is a single StreamingQueryManager instance per SQLContext session.
|
You can access StreamingQueryManager
for the current SQLContext using SQLContext.streams method. It is lazily created when a SQLContext
instance starts.
Initialization
StreamingQueryManager
manages the following instances:
-
StateStoreCoordinatorRef
(asstateStoreCoordinator
) -
StreamingQueryListenerBus (as
listenerBus
) -
activeQueries
which is a mutable mapping between query names andStreamingQuery
objects.
startQuery
startQuery(name: String,
checkpointLocation: String,
df: DataFrame,
sink: Sink,
trigger: Trigger = ProcessingTime(0)): StreamingQuery
startQuery
is a private[sql]
method to start a StreamingQuery.
Note
|
It is called exclusively by DataStreamWriter.start. |
Note
|
By default, trigger is ProcessingTime(0).
|
startQuery
makes sure that activeQueries
internal registry does not contain the query under name
. It throws an IllegalArgumentException
if it does.
It transforms the LogicalPlan of the input DataFrame df
so all StreamingRelation "nodes" become StreamingExecutionRelation. It uses DataSource.createSource(metadataPath) where metadataPath
is $checkpointLocation/sources/$nextSourceId
. Otherwise, it returns the LogicalPlan
untouched.
It finally creates StreamExecution and starts it. It also registers the StreamExecution
instance in activeQueries
internal registry.
Return All Active Continuous Queries per SQLContext
active: Array[StreamingQuery]
active
method returns a collection of StreamingQuery instances for the current SQLContext
.
Getting Active Continuous Query By Name
get(name: String): StreamingQuery
get
method returns a StreamingQuery by name
.
It may throw an IllegalArgumentException
when no StreamingQuery exists for the name
.
java.lang.IllegalArgumentException: There is no active query with name hello
at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
at org.apache.spark.sql.StreamingQueryManager$$anonfun$get$1.apply(StreamingQueryManager.scala:59)
at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
at scala.collection.AbstractMap.getOrElse(Map.scala:59)
at org.apache.spark.sql.StreamingQueryManager.get(StreamingQueryManager.scala:58)
... 49 elided
StreamingQueryListener Management - Adding or Removing Listeners
-
addListener(listener: StreamingQueryListener): Unit
addslistener
to the internallistenerBus
. -
removeListener(listener: StreamingQueryListener): Unit
removeslistener
from the internallistenerBus
.
postListenerEvent
postListenerEvent(event: StreamingQueryListener.Event): Unit
postListenerEvent
posts a StreamingQueryListener.Event
to listenerBus
.
StreamingQueryListener
Caution
|
FIXME |
StreamingQueryListener
is an interface for listening to query life cycle events, i.e. a query start, progress and termination events.
lastTerminatedQuery - internal barrier
Caution
|
FIXME Why is lastTerminatedQuery needed?
|
Used in:
-
awaitAnyTermination
-
awaitAnyTermination(timeoutMs: Long)
They all wait 10
millis before doing the check of lastTerminatedQuery
being non-null.
It is set in:
-
resetTerminated()
resetslastTerminatedQuery
, i.e. sets it tonull
. -
notifyQueryTermination(terminatedQuery: StreamingQuery)
setslastTerminatedQuery
to beterminatedQuery
and notifies all the threads that wait onawaitTerminationLock
.It is called from StreamExecution.runBatches.