JobScheduler
Streaming scheduler (JobScheduler
) schedules streaming jobs to be run as Spark jobs. It is created as part of creating a StreamingContext and starts with it.
It tracks jobs submitted for execution (as JobSets via submitJobSet method) in jobSets internal map.
Note
|
JobSets are submitted by JobGenerator. |
It uses a streaming scheduler queue for streaming jobs to be executed.
Tip
|
Enable Add the following line to
Refer to Logging. |
Starting JobScheduler (start method)
start(): Unit
When JobScheduler
starts (i.e. when start
is called), you should see the following DEBUG message in the logs:
DEBUG JobScheduler: Starting JobScheduler
It then goes over all the dependent services and starts them one by one as depicted in the figure.
It first starts JobSchedulerEvent Handler.
It asks DStreamGraph for input dstreams and registers their RateControllers (if defined) as streaming listeners. It starts StreamingListenerBus afterwards.
It creates ReceiverTracker and InputInfoTracker. It then starts the ReceiverTracker
.
It starts JobGenerator.
Just before start
finishes, you should see the following INFO message in the logs:
INFO JobScheduler: Started JobScheduler
Pending Batches to Process (getPendingTimes method)
Caution
|
FIXME |
Stopping JobScheduler (stop method)
stop(processAllReceivedData: Boolean): Unit
stop
stops JobScheduler
.
Note
|
It is called when StreamingContext is being stopped. |
You should see the following DEBUG message in the logs:
DEBUG JobScheduler: Stopping JobScheduler
Note
|
ReceiverTracker is only assigned (and started) while JobScheduler is starting. |
You should see the following DEBUG message in the logs:
DEBUG JobScheduler: Stopping job executor
jobExecutor Thread Pool is shut down (using jobExecutor.shutdown()
).
If the stop should wait for all received data to be processed (the input parameter processAllReceivedData
is true
), stop
awaits termination of jobExecutor Thread Pool for 1 hour (it is assumed that it is enough and is not configurable). Otherwise, it waits for 2 seconds.
jobExecutor Thread Pool is forcefully shut down (using jobExecutor.shutdownNow()
) unless it has terminated already.
You should see the following DEBUG message in the logs:
DEBUG JobScheduler: Stopped job executor
StreamingListenerBus and eventLoop - JobSchedulerEvent Handler are stopped.
You should see the following INFO message in the logs:
INFO JobScheduler: Stopped JobScheduler
Submitting Collection of Jobs for Execution — submitJobSet
method
When submitJobSet(jobSet: JobSet)
is called, it reacts appropriately per jobSet
JobSet given.
Note
|
The method is called by JobGenerator only (as part of JobGenerator.generateJobs and JobGenerator.restart). |
When no streaming jobs are inside the jobSet
, you should see the following INFO in the logs:
INFO JobScheduler: No jobs added for time [jobSet.time]
Otherwise, when there is at least one streaming job inside the jobSet
, StreamingListenerBatchSubmitted (with data statistics of every registered input stream for which the streaming jobs were generated) is posted to StreamingListenerBus.
The JobSet is added to the internal jobSets registry.
It then goes over every streaming job in the jobSet
and executes a JobHandler (on jobExecutor Thread Pool).
At the end, you should see the following INFO message in the logs:
INFO JobScheduler: Added jobs for time [jobSet.time] ms
JobHandler
JobHandler
is a thread of execution for a streaming job (that simply calls Job.run
).
Note
|
It is called when a new JobSet is submitted (see submitJobSet in this document). |
When started, it prepares the environment (so the streaming job can be nicely displayed in the web UI under /streaming/batch/?id=[milliseconds]
) and posts JobStarted
event to JobSchedulerEvent event loop.
It runs the streaming job that executes the job function as defined while generating a streaming job for an output stream.
Note
|
This is when Spark is requested to run a Spark job. |
You may see similar-looking INFO messages in the logs (it depends on the operators you use):
INFO SparkContext: Starting job: print at <console>:39
INFO DAGScheduler: Got job 0 (print at <console>:39) with 1 output partitions
...
INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (KafkaRDD[2] at createDirectStream at <console>:36)
...
INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 987 bytes result sent to driver
...
INFO DAGScheduler: Job 0 finished: print at <console>:39, took 0.178689 s
It posts JobCompleted
event to JobSchedulerEvent event loop.
jobExecutor Thread Pool
While JobScheduler
is instantiated, the daemon thread pool streaming-job-executor-ID
with spark.streaming.concurrentJobs threads is created.
It is used to execute JobHandler for jobs in JobSet (see submitJobSet in this document).
It shuts down when StreamingContext stops.
eventLoop - JobSchedulerEvent Handler
JobScheduler uses EventLoop
for JobSchedulerEvent
events. It accepts JobStarted and JobCompleted events. It also processes ErrorReported
events.
JobStarted and JobScheduler.handleJobStart
When JobStarted
event is received, JobScheduler.handleJobStart
is called.
Note
|
It is JobHandler to post JobStarted .
|
handleJobStart(job: Job, startTime: Long)
takes a JobSet
(from jobSets
) and checks whether it has already been started.
It posts StreamingListenerBatchStarted
to StreamingListenerBus when the JobSet is about to start.
It posts StreamingListenerOutputOperationStarted
to StreamingListenerBus.
You should see the following INFO message in the logs:
INFO JobScheduler: Starting job [job.id] from job set of time [jobSet.time] ms
JobCompleted and JobScheduler.handleJobCompletion
When JobCompleted
event is received, it triggers JobScheduler.handleJobCompletion(job: Job, completedTime: Long)
.
Note
|
JobHandler posts JobCompleted events when it finishes running a streaming job.
|
handleJobCompletion
looks the JobSet up (from the jobSets internal registry) and calls JobSet.handleJobCompletion(job) (that marks the JobSet
as completed when no more streaming jobs are incomplete). It also calls Job.setEndTime(completedTime)
.
It posts StreamingListenerOutputOperationCompleted
to StreamingListenerBus.
You should see the following INFO message in the logs:
INFO JobScheduler: Finished job [job.id] from job set of time [jobSet.time] ms
If the entire JobSet is completed, it removes it from jobSets, and calls JobGenerator.onBatchCompletion.
You should see the following INFO message in the logs:
INFO JobScheduler: Total delay: [totalDelay] s for time [time] ms (execution: [processingDelay] s)
It posts StreamingListenerBatchCompleted
to StreamingListenerBus.
It reports an error if the job’s result is a failure.
StreamingListenerBus and StreamingListenerEvents
StreamingListenerBus
is a asynchronous listener bus to post StreamingListenerEvent
events to streaming listeners.
Internal Registries
JobScheduler
maintains the following information in internal registries:
-
jobSets
- a mapping between time and JobSets. See JobSet.
JobSet
A JobSet
represents a collection of streaming jobs that were created at (batch) time
for output streams (that have ultimately produced a streaming job as they may opt out).
JobSet
tracks what streaming jobs are in incomplete state (in incompleteJobs
internal registry).
Note
|
At the beginning (when JobSet is created) all streaming jobs are incomplete.
|
Caution
|
FIXME There is a duplication in how streaming jobs are tracked as completed since a Job knows about its _endTime . Is this a optimization? How much time does it buy us?
|
A JobSet
tracks the following moments in its lifecycle:
-
submissionTime
being the time when the instance was created. -
processingStartTime
being the time when the first streaming job in the collection was started. -
processingEndTime
being the time when the last streaming job in the collection finished processing.
A JobSet
changes state over time. It can be in the following states:
-
Created after a
JobSet
was created.submissionTime
is set. -
Started after
JobSet.handleJobStart
was called.processingStartTime
is set. -
Completed after
JobSet.handleJobCompletion
and no more jobs are incomplete (inincompleteJobs
internal registry).processingEndTime
is set.
Given the states a JobSet
has delays:
-
Processing delay is the time spent for processing all the streaming jobs in a
JobSet
from the time the very first job was started, i.e. the time between started and completed states. -
Total delay is the time from the batch time until the
JobSet
was completed.
Note
|
Total delay is always longer than processing delay. |
You can map a JobSet
to a BatchInfo
using toBatchInfo
method.
Note
|
BatchInfo is used to create and post StreamingListenerBatchSubmitted, StreamingListenerBatchStarted, and StreamingListenerBatchCompleted events.
|
JobSet
is used (created or processed) in: