log4j.logger.org.apache.spark.sql.execution.streaming.StreamExecution=DEBUG
StreamExecution
StreamExecution
manages execution of a streaming query for a SQLContext
and a Sink. It requires a LogicalPlan to know the Source
objects from which records are periodically pulled down.
StreamExecution
is a StreamingQuery with additional attributes:
-
checkpointRoot
-
Trigger
It starts an internal thread (microBatchThread
) to periodically (every 10 milliseconds) poll for new records in the sources and create a batch.
Note
|
The time between batches - 10 milliseconds - is fixed (i.e. not configurable). |
StreamExecution
can be in three states:
-
INITIALIZED
when the instance was created. -
ACTIVE
when batches are pulled from the sources. -
TERMINATED
when batches were successfully processed or the query stopped.
Tip
|
Enable Add the following line to Refer to Logging. |
runBatches
Internal Method
runBatches(): Unit
Caution
|
FIXME |
scala> val out = in.write
.format("memory")
.queryName("memStream")
.startStream()
out: org.apache.spark.sql.StreamingQuery = Continuous Query - memStream [state = ACTIVE]
16/04/16 00:48:47 INFO StreamExecution: Starting new continuous query.
scala> 16/04/16 00:48:47 INFO StreamExecution: Committed offsets for batch 1.
16/04/16 00:48:47 DEBUG StreamExecution: Stream running from {} to {FileSource[hello]: #0}
16/04/16 00:48:47 DEBUG StreamExecution: Retrieving data from FileSource[hello]: None -> #0
16/04/16 00:48:47 DEBUG StreamExecution: Optimized batch in 163.940239ms
16/04/16 00:48:47 INFO StreamExecution: Completed up to {FileSource[hello]: #0} in 703.573981ms
toDebugString
Method
You can call toDebugString
on StreamExecution
to learn about the internals.
scala> out.asInstanceOf[StreamExecution].toDebugString
res3: String =
"
=== Continuous Query ===
Name: memStream
Current Offsets: {FileSource[hello]: #0}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
FileSource[hello]
"