SparkEnv — Spark Runtime Environment

Spark Runtime Environment (SparkEnv) is the runtime environment with Spark’s public services that interact with each other to build the entire Spark computing platform for a Spark application.

Spark Runtime Environment is represented by a SparkEnv object that holds all the required runtime services for a running Spark application with separate environments for the driver and executors.

Table 1. SparkEnv Services
Property Service Description

serializer

Serializer

closureSerializer

Serializer

serializerManager

SerializerManager

mapOutputTracker

MapOutputTracker

shuffleManager

ShuffleManager

broadcastManager

BroadcastManager

blockManager

BlockManager

securityManager

SecurityManager

metricsSystem

MetricsSystem

memoryManager

MemoryManager

outputCommitCoordinator

OutputCommitCoordinator

Tip

Enable INFO or DEBUG logging level for org.apache.spark.SparkEnv logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.SparkEnv=DEBUG

Refer to Logging.

SparkEnv Factory Object

SparkEnv holds the public services in a running Spark instance, using SparkEnv.createDriverEnv() for a driver and SparkEnv.createExecutorEnv() for an executor.

You can access the Spark environment using SparkEnv.get.

scala> import org.apache.spark._
import org.apache.spark._

scala> SparkEnv.get
res0: org.apache.spark.SparkEnv = org.apache.spark.SparkEnv@2220c5f7

Creating "Base" SparkEnv — create Method

create(
  conf: SparkConf,
  executorId: String,
  hostname: String,
  port: Int,
  isDriver: Boolean,
  isLocal: Boolean,
  numUsableCores: Int,
  listenerBus: LiveListenerBus = null,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

create is a internal helper method to create a "base" SparkEnv regardless of the target environment — be it a driver or an executor.

When executed, create creates a Serializer (based on spark.serializer setting). You should see the following DEBUG message in the logs:

DEBUG SparkEnv: Using serializer: [serializer]

It creates another Serializer (based on spark.closure.serializer).

It creates a ShuffleManager based on spark.shuffle.manager setting.

It creates a MemoryManager based on spark.memory.useLegacyMode setting (with UnifiedMemoryManager being the default).

It creates a BlockManagerMaster object with the BlockManagerMaster RPC endpoint reference (by registering or looking it up by name and BlockManagerMasterEndpoint), the input SparkConf, and the input isDriver flag.

sparkenv driver blockmanager.png
Figure 1. Creating BlockManager for the Driver
Note
create registers the BlockManagerMaster RPC endpoint for the driver and looks it up for executors.
sparkenv executor blockmanager.png
Figure 2. Creating BlockManager for Executor

It creates a BlockManager (using the above BlockManagerMaster object and other services).

It creates a BroadcastManager.

It creates a CacheManager.

It creates a MetricsSystem for a driver and a worker separately.

It initializes userFiles temporary directory used for downloading dependencies for a driver while this is the executor’s current working directory for an executor.

An OutputCommitCoordinator is created.

Note
create is called by createDriverEnv and createExecutorEnv.

Registering or Looking up RPC Endpoint by Name — registerOrLookupEndpoint Method

registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint)

registerOrLookupEndpoint registers or looks up a RPC endpoint by name.

If called from the driver, you should see the following INFO message in the logs:

INFO SparkEnv: Registering [name]

And the RPC endpoint is registered in the RPC environment.

Otherwise, it obtains a RPC endpoint reference by name.

Creating SparkEnv for Driver — createDriverEnv Method

`createDriverEnv`(
  conf: SparkConf,
  isLocal: Boolean,
  listenerBus: LiveListenerBus,
  numCores: Int,
  mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv

createDriverEnv creates a SparkEnv execution environment for the driver.

sparkenv driver.png
Figure 3. Spark Environment for driver

The method accepts an instance of SparkConf, whether it runs in local mode or not, LiveListenerBus, the number of driver’s cores to use for execution in local mode or 0 otherwise, and a OutputCommitCoordinator (default: none).

createDriverEnv ensures that spark.driver.host and spark.driver.port settings are set in conf SparkConf.

It then passes the call straight on to the create helper method (with driver executor id, isDriver enabled, and the input parameters).

Note
createDriverEnv is exclusively used by SparkContext to create a SparkEnv (while a SparkContext is being created for the driver).

Creating SparkEnv for Executor — createExecutorEnv Method

createExecutorEnv(
  conf: SparkConf,
  executorId: String,
  hostname: String,
  port: Int,
  numCores: Int,
  isLocal: Boolean): SparkEnv

createExecutorEnv creates an executor’s (execution) environment that is the Spark execution environment for an executor.

sparkenv executor.png
Figure 4. Spark Environment for executor

It uses SparkConf, the executor’s identifier, hostname, port, the number of cores, and whether or not it runs in local mode.

It creates an MapOutputTrackerWorker object and looks up MapOutputTracker RPC endpoint. See MapOutputTracker.

It creates a MetricsSystem for executor and starts it.

An OutputCommitCoordinator is created and OutputCommitCoordinator RPC endpoint looked up.

serializer

Caution
FIXME

closureSerializer

Caution
FIXME

Getting Current SparkEnv — get Method

Caution
FIXME

stop Method

Caution
FIXME

Settings

Table 2. Spark Properties
Spark Property Default Value Description

spark.driver.host

The name of the machine where the driver runs on. It is set when SparkContext is created

spark.driver.port

0

The port the driver listens to. It is first set to 0 in the driver when SparkContext is initialized. It is later set to the port of RpcEnv of the driver (in SparkEnv.create).

spark.serializer

org.apache.spark.serializer.JavaSerializer

The Serializer.

[TIP] ==== Enable DEBUG logging level for org.apache.spark.SparkEnv logger to see the current value.

` DEBUG SparkEnv: Using serializer: [serializer] ` ====

spark.closure.serializer

org.apache.spark.serializer.JavaSerializer

The Serializer

spark.memory.useLegacyMode

false

The flag to control the MemoryManager in use. When enabled (true) it is StaticMemoryManager while UnifiedMemoryManager otherwise (false).

results matching ""

    No results matching ""