HeartbeatReceiver RPC Endpoint

HeartbeatReceiver RPC endpoint is a ThreadSafeRpcEndpoint and a SparkListener.

It keeps track of executors (through messages) and informs TaskScheduler and SparkContext about lost executors.

When created, it requires a SparkContext and a Clock. Later, it uses the SparkContext to register itself as a SparkListener and TaskScheduler (as scheduler).

Note
HeartbeatReceiver RPC endpoint is registered while SparkContext is being created.
Tip

Enable DEBUG or TRACE logging levels for org.apache.spark.HeartbeatReceiver to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.HeartbeatReceiver=TRACE

Refer to Logging.

Creating HeartbeatReceiver Instance

HeartbeatReceiver(
  sc: SparkContext,
  clock: Clock)
extends SparkListener with ThreadSafeRpcEndpoint

HeartbeatReceiver requires a SparkContext and a Clock.

When created, HeartbeatReceiver registers itself as a SparkListener.

Internal Registries and Counters

Table 1. Internal Registries and Counters
Name Description

executorLastSeen

A registry of executor ids and the timestamps of when the last heartbeat was received.

Starting — onStart Method

Note
onStart is part of the RpcEndpoint Contract

Stopping — onStop Method

Note
onStop is part of the RpcEndpoint Contract

When called, HeartbeatReceiver cancels the checking task (that sends a blocking ExpireDeadHosts every spark.network.timeoutInterval on eventLoopThread - Heartbeat Receiver Event Loop Thread - see Starting (onStart method)) and shuts down eventLoopThread and killExecutorThread executors.

killExecutorThread — Kill Executor Thread

killExecutorThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is kill-executor-thread.

Note
It is used to request SparkContext to kill the executor.

eventLoopThread — Heartbeat Receiver Event Loop Thread

eventLoopThread is a daemon ScheduledThreadPoolExecutor with a single thread.

The name of the thread pool is heartbeat-receiver-event-loop-thread.

Messages

ExecutorRegistered

ExecutorRegistered(executorId: String)

When ExecutorRegistered arrives, executorId is simply added to executorLastSeen internal registry.

Note
HeartbeatReceiver sends a ExecutorRegistered message to itself (from addExecutor internal method). It is as a follow-up to SparkListener.onExecutorAdded when a driver announces a new executor registration.
Note
It is an internal message.

ExecutorRemoved

ExecutorRemoved(executorId: String)

When ExecutorRemoved arrives, executorId is simply removed from executorLastSeen internal registry.

Note
HeartbeatReceiver itself sends a ExecutorRegistered message (from removeExecutor internal method). It is as a follow-up to SparkListener.onExecutorRemoved when a driver removes an executor.
Note
It is an internal message.

ExpireDeadHosts

ExpireDeadHosts

When ExpireDeadHosts arrives the following TRACE is printed out to the logs:

TRACE HeartbeatReceiver: Checking for hosts with no recent heartbeats in HeartbeatReceiver.

Each executor (in executorLastSeen registry) is checked whether the time it was last seen is not longer than spark.network.timeout.

For any such executor, the following WARN message is printed out to the logs:

WARN HeartbeatReceiver: Removing executor [executorId] with no recent heartbeats: [time] ms exceeds timeout [timeout] ms

TaskScheduler.executorLost is called (with SlaveLost("Executor heartbeat timed out after [timeout] ms").

SparkContext.killAndReplaceExecutor is asynchronously called for the executor (i.e. on killExecutorThread).

The executor is removed from executorLastSeen.

Note
It is an internal message.

Heartbeat

Heartbeat(executorId: String,
  accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
  blockManagerId: BlockManagerId)

When Heartbeat arrives and the internal scheduler is not set yet (no TaskSchedulerIsSet earlier), the following WARN is printed out to the logs:

WARN HeartbeatReceiver: Dropping [heartbeat] because TaskScheduler is not ready yet

And the response is HeartbeatResponse(reregisterBlockManager = true).

If however the internal scheduler was set already, HeartbeatReceiver checks whether the executor executorId is known (in executorLastSeen).

If the executor is not recognized, the following DEBUG message is printed out to the logs:

DEBUG HeartbeatReceiver: Received heartbeat from unknown executor [executorId]

And the response is HeartbeatResponse(reregisterBlockManager = true).

If however the internal scheduler is set and the executor is recognized (in executorLastSeen), the current time is recorded in executorLastSeen and TaskScheduler.executorHeartbeatReceived is called asynchronously (i.e. on a separate thread) on eventLoopThread.

The response is HeartbeatResponse(reregisterBlockManager = unknownExecutor) where unknownExecutor corresponds to the result of calling TaskScheduler.executorHeartbeatReceived.

Caution
FIXME Figure

TaskSchedulerIsSet

When TaskSchedulerIsSet arrives, HeartbeatReceiver sets scheduler internal attribute (using SparkContext.taskScheduler).

Note
It is an internal message.

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.storage.blockManagerTimeoutIntervalMs

60s

spark.storage.blockManagerSlaveTimeoutMs

120s

spark.network.timeout

spark.storage.blockManagerSlaveTimeoutMs

See spark.network.timeout in RPC Environment (RpcEnv).

spark.network.timeoutInterval

spark.storage.blockManagerTimeoutIntervalMs

results matching ""

    No results matching ""