log4j.logger.org.apache.spark.HeartbeatReceiver=TRACE
HeartbeatReceiver RPC Endpoint
HeartbeatReceiver
RPC endpoint is a ThreadSafeRpcEndpoint and a SparkListener.
It keeps track of executors (through messages) and informs TaskScheduler and SparkContext about lost executors.
When created, it requires a SparkContext
and a Clock
. Later, it uses the SparkContext
to register itself as a SparkListener
and TaskScheduler
(as scheduler
).
Note
|
HeartbeatReceiver RPC endpoint is registered while SparkContext is being created.
|
Tip
|
Enable Add the following line to Refer to Logging. |
Creating HeartbeatReceiver Instance
HeartbeatReceiver(
sc: SparkContext,
clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint
HeartbeatReceiver
requires a SparkContext and a Clock
.
When created, HeartbeatReceiver
registers itself as a SparkListener
.
Internal Registries and Counters
Name | Description |
---|---|
A registry of executor ids and the timestamps of when the last heartbeat was received. |
Starting — onStart
Method
Note
|
onStart is part of the RpcEndpoint Contract
|
When called, HeartbeatReceiver
sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread - Heartbeat Receiver Event Loop Thread.
Stopping — onStop
Method
Note
|
onStop is part of the RpcEndpoint Contract
|
When called, HeartbeatReceiver
cancels the checking task (that sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread - Heartbeat Receiver Event Loop Thread - see Starting (onStart method)) and shuts down eventLoopThread and killExecutorThread executors.
killExecutorThread
— Kill Executor Thread
killExecutorThread
is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is kill-executor-thread.
Note
|
It is used to request SparkContext to kill the executor. |
eventLoopThread
— Heartbeat Receiver Event Loop Thread
eventLoopThread
is a daemon ScheduledThreadPoolExecutor with a single thread.
The name of the thread pool is heartbeat-receiver-event-loop-thread.
Messages
ExecutorRegistered
ExecutorRegistered(executorId: String)
When ExecutorRegistered
arrives, executorId
is simply added to executorLastSeen internal registry.
Note
|
HeartbeatReceiver sends a ExecutorRegistered message to itself (from addExecutor internal method). It is as a follow-up to SparkListener.onExecutorAdded when a driver announces a new executor registration.
|
Note
|
It is an internal message. |
ExecutorRemoved
ExecutorRemoved(executorId: String)
When ExecutorRemoved
arrives, executorId
is simply removed from executorLastSeen internal registry.
Note
|
HeartbeatReceiver itself sends a ExecutorRegistered message (from removeExecutor internal method). It is as a follow-up to SparkListener.onExecutorRemoved when a driver removes an executor.
|
Note
|
It is an internal message. |
ExpireDeadHosts
ExpireDeadHosts
When ExpireDeadHosts
arrives the following TRACE is printed out to the logs:
TRACE HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.
Each executor (in executorLastSeen registry) is checked whether the time it was last seen is not longer than spark.network.timeout.
For any such executor, the following WARN message is printed out to the logs:
WARN HeartbeatReceiver: Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms
TaskScheduler.executorLost is called (with SlaveLost("Executor heartbeat timed out after [timeout] ms"
).
SparkContext.killAndReplaceExecutor
is asynchronously called for the executor (i.e. on killExecutorThread).
The executor is removed from executorLastSeen.
Note
|
It is an internal message. |
Heartbeat
Heartbeat(executorId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId)
When Heartbeat
arrives and the internal scheduler
is not set yet (no TaskSchedulerIsSet earlier), the following WARN is printed out to the logs:
WARN HeartbeatReceiver: Dropping [heartbeat] because TaskScheduler is not ready yet
And the response is HeartbeatResponse(reregisterBlockManager = true)
.
Note
|
Heartbeats messages are the mechanism of executors to inform that they are alive and update about the state of active tasks.
|
If however the internal scheduler
was set already, HeartbeatReceiver
checks whether the executor executorId
is known (in executorLastSeen).
If the executor is not recognized, the following DEBUG message is printed out to the logs:
DEBUG HeartbeatReceiver: Received heartbeat from unknown executor [executorId]
And the response is HeartbeatResponse(reregisterBlockManager = true)
.
If however the internal scheduler
is set and the executor is recognized (in executorLastSeen), the current time is recorded in executorLastSeen and TaskScheduler.executorHeartbeatReceived is called asynchronously (i.e. on a separate thread) on eventLoopThread
.
The response is HeartbeatResponse(reregisterBlockManager = unknownExecutor)
where unknownExecutor
corresponds to the result of calling TaskScheduler.executorHeartbeatReceived.
Caution
|
FIXME Figure |
TaskSchedulerIsSet
When TaskSchedulerIsSet
arrives, HeartbeatReceiver
sets scheduler
internal attribute (using SparkContext.taskScheduler
).
Note
|
TaskSchedulerIsSet is sent by SparkContext (while it is being created) to inform that the TaskScheduler is now available.
|
Note
|
It is an internal message. |