log4j.logger.org.apache.spark.streaming.DStreamGraph=DEBUG
DStreamGraph
DStreamGraph
(is a final helper class that) manages input and output dstreams. It also holds zero time for the other components that marks the time when it was started.
DStreamGraph
maintains the collections of InputDStream instances (as inputStreams
) and output DStream instances (as outputStreams
), but, more importantly, it generates streaming jobs for output streams for a batch (time).
DStreamGraph
holds the batch interval for the other parts of a Streaming application.
Tip
|
Enable Add the following line to Refer to Logging. |
Zero Time (aka zeroTime)
Zero time (internally zeroTime
) is the time when DStreamGraph has been started.
It is passed on down the output dstream graph so output dstreams can initialize themselves.
Start Time (aka startTime)
Start time (internally startTime
) is the time when DStreamGraph has been started or restarted.
Note
|
At regular start start time is exactly zero time. |
Batch Interval (aka batchDuration)
DStreamGraph
holds the batch interval (as batchDuration
) for the other parts of a Streaming application.
setBatchDuration(duration: Duration)
is the method to set the batch interval.
It appears that it is the place for the value since it must be set before JobGenerator can be instantiated.
It is set while StreamingContext is being instantiated and is validated (using validate()
method of StreamingContext
and DStreamGraph
) before StreamingContext
is started.
Maximum Remember Interval (getMaxInputStreamRememberDuration method)
getMaxInputStreamRememberDuration(): Duration
Maximum Remember Interval is the maximum remember interval across all the input dstreams. It is calculated using getMaxInputStreamRememberDuration
method.
Note
|
It is called when JobGenerator is requested to clear metadata and checkpoint data. |
Input DStreams Registry
Caution
|
FIXME |
Output DStreams Registry
DStream
by design has no notion of being an output dstream. To mark a dstream as output you need to register a dstream (using DStream.register method) which happens for…FIXME
Starting DStreamGraph
start(time: Time): Unit
When DStreamGraph
is started (using start
method), it sets zero time and start time.
Note
|
start method is called when JobGenerator starts for the first time (not from a checkpoint).
|
Note
|
You can start DStreamGraph as many times until time is not null and zero time has been set.
|
(output dstreams) start
then walks over the collection of output dstreams and for each output dstream, one at a time, calls their initialize(zeroTime), remember (with the current remember interval), and validateAtStart methods.
(input dstreams) When all the output streams are processed, it starts the input dstreams (in parallel) using start
method.
Restarting DStreamGraph
restart(time: Time): Unit
restart
sets start time to be time
input parameter.
Note
|
This is the only moment when zero time can be different than start time. |
Caution
|
restart doesn’t seem to be called ever.
|
Generating Streaming Jobs for Output Streams for Batch Time
generateJobs(time: Time): Seq[Job]
generateJobs
method generates a collection of streaming jobs for output streams for a given batch time
. It walks over each registered output stream (in outputStreams
internal registry) and requests each stream for a streaming job
Note
|
generateJobs is called by JobGenerator to generate jobs for a given batch time or when restarted from checkpoint.
|
When generateJobs
method executes, you should see the following DEBUG message in the logs:
DEBUG DStreamGraph: Generating jobs for time [time] ms
generateJobs
then walks over each registered output stream (in outputStreams
internal registry) and requests the streams for a streaming job.
Right before the method finishes, you should see the following DEBUG message with the number of streaming jobs generated (as jobs.length
):
DEBUG DStreamGraph: Generated [jobs.length] jobs for time [time] ms
Validation Check
validate()
method checks whether batch duration and at least one output stream have been set. It will throw java.lang.IllegalArgumentException
when either is not.
Note
|
It is called when StreamingContext starts. |
Metadata Cleanup
Note
|
It is called when JobGenerator clears metadata. |
When clearMetadata(time: Time)
is called, you should see the following DEBUG message in the logs:
DEBUG DStreamGraph: Clearing metadata for time [time] ms
It merely walks over the collection of output streams and (synchronously, one by one) asks to do its own metadata cleaning.
When finishes, you should see the following DEBUG message in the logs:
DEBUG DStreamGraph: Cleared old metadata for time [time] ms
Restoring State for Output DStreams (restoreCheckpointData method)
restoreCheckpointData(): Unit
When restoreCheckpointData()
is executed, you should see the following INFO message in the logs:
INFO DStreamGraph: Restoring checkpoint data
Then, every output dstream is requested to restoreCheckpointData.
At the end, you should see the following INFO message in the logs:
INFO DStreamGraph: Restored checkpoint data
Note
|
restoreCheckpointData is executed when StreamingContext is recreated from checkpoint.
|
Updating Checkpoint Data
updateCheckpointData(time: Time): Unit
Note
|
It is called when JobGenerator processes DoCheckpoint events. |
When updateCheckpointData
is called, you should see the following INFO message in the logs:
INFO DStreamGraph: Updating checkpoint data for time [time] ms
It then walks over every output dstream and calls its updateCheckpointData(time).
When updateCheckpointData
finishes it prints out the following INFO message to the logs:
INFO DStreamGraph: Updated checkpoint data for time [time] ms
Checkpoint Cleanup
clearCheckpointData(time: Time)
Note
|
clearCheckpointData is called when JobGenerator clears checkpoint data.
|
When clearCheckpointData
is called, you should see the following INFO message in the logs:
INFO DStreamGraph: Clearing checkpoint data for time [time] ms
It merely walks through the collection of output streams and (synchronously, one by one) asks to do their own checkpoint data cleaning.
When finished, you should see the following INFO message in the logs:
INFO DStreamGraph: Cleared checkpoint data for time [time] ms
Remember Interval
Remember interval is the time to remember (aka cache) the RDDs that have been generated by (output) dstreams in the context (before they are released and garbage collected).
It can be set using remember method.
remember method
remember(duration: Duration): Unit
remember
method simply sets remember interval and exits.
Note
|
It is called by StreamingContext.remember method. |
It first checks whether or not it has been set already and if so, throws java.lang.IllegalArgumentException
as follows:
java.lang.IllegalArgumentException: requirement failed: Remember duration already set as [rememberDuration] ms. Cannot set it again. at scala.Predef$.require(Predef.scala:219) at org.apache.spark.streaming.DStreamGraph.remember(DStreamGraph.scala:79) at org.apache.spark.streaming.StreamingContext.remember(StreamingContext.scala:222) ... 43 elided
Note
|
It only makes sense to call remember method before DStreamGraph is started, i.e. before StreamingContext is started, since the output dstreams are only given the remember interval when DStreamGraph starts.
|