
CoarseGrainedExecutorBackend
CoarseGrainedExecutorBackend
is an ExecutorBackend to manage a single coarse-grained executor (that lives as long as the owning executor backend).
CoarseGrainedExecutorBackend
registers itself as a ThreadSafeRpcEndpoint
under the name Executor to communicate with the driver.
Note
|
The internal executor reference is created after a connection to the driver is established. |

When launched, CoarseGrainedExecutorBackend
immediately connects to the owning CoarseGrainedSchedulerBackend to inform that it can run tasks. It is launched as a command-line application by:
-
Spark Standalone’s StandaloneSchedulerBackend
-
Spark on YARN’s ExecutorRunnable
-
Spark on Mesos’s MesosCoarseGrainedSchedulerBackend.
When it cannot connect to driverUrl
, it terminates (with the exit code 1
).
Caution
|
What are SPARK_LOG_URL_ env vars? Who sets them?
|
When the driver terminates, CoarseGrainedExecutorBackend
exits (with exit code 1
).
ERROR Driver [remoteAddress] disassociated! Shutting down.
All task status updates are sent along to driverRef
as StatusUpdate
messages.
Tip
|
Enable Add the following line to
|
Extracting Log URLs — extractLogUrls
Method
Caution
|
FIXME |
Creating CoarseGrainedExecutorBackend
Instance
CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv,
driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
userClassPath: Seq[URL],
env: SparkEnv)
extends ThreadSafeRpcEndpoint with ExecutorBackend
While being created, CoarseGrainedExecutorBackend
initializes the internal properties (e.g. executor and driver) and creates a SerializerInstance
(using SparkEnv.closureSerializer).
Note
|
CoarseGrainedExecutorBackend is created when…FIXME
|
Starting RpcEndpoint — onStart
Method
Note
|
onStart is a RpcEndpoint callback method that is executed before a RPC endpoint starts to handle messages.
|
When executed, you should see the following INFO message in the logs:
INFO CoarseGrainedExecutorBackend: Connecting to driver: [driverUrl]
It then retrieves the RpcEndpointRef of the driver asynchronously (using the constructor’s driverUrl) and initializes the internal driver property that it will send a blocking RegisterExecutor message to.
If there is an issue while registering the executor, you should see the following ERROR message in the logs and process exits (with the exit code 1
).
ERROR Cannot register with driver: [driverUrl]
Note
|
The RegisterExecutor message contains executorId , the RpcEndpointRef to itself, cores , and log URLs of the CoarseGrainedExecutorBackend .
|
driver
RpcEndpointRef
driver
is an optional RpcEndpointRef for the driver.
Tip
|
See Starting RpcEndpoint — onStart Method for how driver is initialized.
|
Driver’s URL
The driver’s URL is of the format spark://[RpcEndpoint name]@[hostname]:[port]
, e.g. spark://[email protected]:64859
.
Launching CoarseGrainedExecutorBackend
As Standalone Application — main
Method
CoarseGrainedExecutorBackend
is a command-line application (it comes with main
method).
It accepts the following options:
-
--driver-url
(required) - the driver’s URL. See driver’s URL.
-
--executor-id
(required) - the executor’s id -
--hostname
(required) - the name of the host -
--cores
(required) - the number of cores (must be more than0
) -
--app-id
(required) - the id of the application -
--worker-url
- the worker’s URL, e.g.spark://[email protected]:64557
-
--user-class-path
- a URL/path to a resource to be added to CLASSPATH; can be specified multiple times.
Unrecognized options or required options missing cause displaying usage help and exit.
$ ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend
Usage: CoarseGrainedExecutorBackend [options]
Options are:
--driver-url <driverUrl>
--executor-id <executorId>
--hostname <hostname>
--cores <cores>
--app-id <appid>
--worker-url <workerUrl>
--user-class-path <url>
It first fetches Spark properties from CoarseGrainedSchedulerBackend (using the driverPropsFetcher
RPC Environment and the endpoint reference given in driver’s URL).
For this, it creates SparkConf
, reads spark.executor.port
setting (defaults to 0
) and creates the driverPropsFetcher
RPC Environment in client mode. The RPC environment is used to resolve the driver’s endpoint to post RetrieveSparkProps
message.
It sends a (blocking) RetrieveSparkProps
message to the driver (using the value for driverUrl
command-line option). When the response (the driver’s SparkConf
) arrives it adds spark.app.id
(using the value for appid
command-line option) and creates a brand new SparkConf
.
If spark.yarn.credentials.file
is set, …FIXME
A SparkEnv is created using SparkEnv.createExecutorEnv (with isLocal
being false
).
Caution
|
FIXME |
Setting Up Executor RPC Endpoint (and WorkerWatcher Perhaps) — run
Internal Method
run(driverUrl: String,
executorId: String,
hostname: String,
cores: Int,
appId: String,
workerUrl: Option[String],
userClassPath: scala.Seq[URL]): Unit
run
requests the driver for the Spark properties and sets up the Executor RPC endpoint (with CoarseGrainedExecutorBackend
as the RPC endpoint) and optionally the WorkerWatcher RPC endpoint. It keeps running (yet the main thread is blocked and only the RPC endpoints process RPC messages) until the RpcEnv
terminates.
When executed, you should see the following INFO message in the logs:
INFO Started daemon with process name: [processName]
run
then runs in a secured environment as a Spark user.
run
first creates a brand new SparkConf to get spark.executor.port from. It then creates a RpcEnv
called driverPropsFetcher.
Note
|
The host name and port for the driverPropsFetcher RpcEnv are given as the input argument hostname and got from SparkConf , respectively.
|
Caution
|
FIXME What’s clientMode in RpcEnv.create ?
|
run
uses the driverPropsFetcher
RpcEnv
to request driverUrl
endpoint for the Spark properties to use only. The Spark properties are extended with spark.app.id
Spark property with the value of appId
.
run
uses the Spark properties to create a SparkEnv
for the executor (with isLocal
disabled).
Note
|
executorId , hostname , and cores to create the SparkEnv are the input arguments of run .
|
Caution
|
FIXME Describe spark.yarn.credentials.file .
|
After the SparkEnv
has been created, run
sets up the endpoint under the name Executor with CoarseGrainedExecutorBackend
as the RPC endpoint.
If the optional workerUrl
is specified, run
sets up another endpoint under the name WorkerWatcher and WorkerWatcher
RPC endpoint.
Caution
|
FIXME When is workerUrl specified?
|
run
's thread is blocked until RpcEnv
terminates (and so the other threads of the RPC endpoints could run).
Once RpcEnv
has terminated, run
stops the thread for credential updates.
Caution
|
FIXME Think of the place for Utils.initDaemon , Utils.getProcessName et al.
|
Note
|
run is executed when CoarseGrainedExecutorBackend command-line application is launched.
|
executor
Internal Property
executor
is the internal reference to a coarse-grained executor…FIXME
Caution
|
FIXME |
RPC Messages
RegisteredExecutor
RegisteredExecutor
extends CoarseGrainedClusterMessage with RegisterExecutorResponse
When a RegisteredExecutor
message arrives, you should see the following INFO in the logs:
INFO CoarseGrainedExecutorBackend: Successfully registered with driver
The internal executor is created (passing in the constructor’s parameters) with isLocal
disabled.
Note
|
RegisteredExecutor is sent when CoarseGrainedSchedulerBackend is notified about a new executor.
|
RegisterExecutorFailed
RegisterExecutorFailed(message)
When a RegisterExecutorFailed
message arrives, the following ERROR is printed out to the logs:
ERROR CoarseGrainedExecutorBackend: Slave registration failed: [message]
CoarseGrainedExecutorBackend
then exits with the exit code 1
.
LaunchTask
LaunchTask(data: SerializableBuffer)
The LaunchTask
handler deserializes TaskDescription
from data
(using the global closure Serializer).
Note
|
LaunchTask message is sent by CoarseGrainedSchedulerBackend.launchTasks.
|
INFO CoarseGrainedExecutorBackend: Got assigned task [taskId]
It then launches the task on the executor (using Executor.launchTask method).
If however the internal executor
field has not been created yet, it prints out the following ERROR to the logs:
ERROR CoarseGrainedExecutorBackend: Received LaunchTask command but executor was null
And it then exits.
KillTask
KillTask(taskId, _, interruptThread)
message kills a task (calls Executor.killTask
).
If an executor has not been initialized yet (FIXME: why?), the following ERROR message is printed out to the logs and CoarseGrainedExecutorBackend exits:
ERROR Received KillTask command but executor was null
StopExecutor
StopExecutor
message handler is receive-reply and blocking. When received, the handler prints the following INFO message to the logs:
INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
It then sends a Shutdown
message to itself.
Shutdown
Shutdown
stops the executor, itself and RPC Environment.