BlockManagerMaster — BlockManager for Driver

BlockManagerMaster runs on the driver and executors.

BlockManagerMaster uses BlockManagerMasterEndpoint registered under BlockManagerMaster RPC endpoint name on the driver (with the endpoint references on executors) to allow executors for sending block status updates to it and hence keep track of block statuses.

Note
An instance of BlockManagerMaster is created in SparkEnv (for the driver and executors), and immediately used to create their BlockManagers.
Tip

Enable INFO or DEBUG logging level for org.apache.spark.storage.BlockManagerMaster logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerMaster=INFO

Refer to Logging.

Creating BlockManagerMaster Instance

An instance of BlockManagerMaster requires a BlockManagerMaster RPC endpoint reference, SparkConf, and the isDriver flag to control whether it is created for the driver or executors.

Note
An instance of BlockManagerMaster is created as part of creating an instance of SparkEnv for the driver and executors.

Removing Executor (removeExecutor method)

removeExecutor(execId: String): Unit

removeExecutor posts RemoveExecutor(execId) to BlockManagerMaster RPC endpoint and waits for a response.

If false in response comes in, a SparkException is thrown with the following message:

BlockManagerMasterEndpoint returned false, expected true.

If all goes fine, you should see the following INFO message in the logs:

INFO BlockManagerMaster: Removed executor [execId]

Removing Block — removeBlock Method

removeBlock(blockId: BlockId): Unit

removeBlock simply posts a RemoveBlock blocking message to BlockManagerMaster RPC endpoint (and ultimately disregards the reponse).

Removing RDD Blocks (removeRdd method)

removeRdd(rddId: Int, blocking: Boolean)

removeRdd removes all the blocks of rddId RDD, possibly in a blocking fashion.

It posts a RemoveRdd(rddId) message to BlockManagerMaster RPC endpoint on a separate thread.

If there is an issue, you should see the following WARN message in the logs and the entire exception:

WARN Failed to remove RDD [rddId] - [exception]

If it is a blocking operation, it waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

Removing Shuffle Blocks (removeShuffle method)

removeShuffle(shuffleId: Int, blocking: Boolean)

removeShuffle removes all the blocks of shuffleId shuffle, possibly in a blocking fashion.

It posts a RemoveShuffle(shuffleId) message to BlockManagerMaster RPC endpoint on a separate thread.

If there is an issue, you should see the following WARN message in the logs and the entire exception:

WARN Failed to remove shuffle [shuffleId] - [exception]

If it is a blocking operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

Removing Broadcast Blocks (removeBroadcast method)

removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean)

removeBroadcast removes all the blocks of broadcastId broadcast, possibly in a blocking fashion.

It posts a RemoveBroadcast(broadcastId, removeFromMaster) message to BlockManagerMaster RPC endpoint on a separate thread.

If there is an issue, you should see the following WARN message in the logs and the entire exception:

WARN Failed to remove broadcast [broadcastId] with removeFromMaster = [removeFromMaster] - [exception]

If it is a blocking operation, it waits for the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

Stopping BlockManagerMaster (stop method)

stop(): Unit

stop sends a StopBlockManagerMaster message to BlockManagerMaster RPC endpoint and waits for a response.

Note
It is only executed for the driver.

If all goes fine, you should see the following INFO message in the logs:

INFO BlockManagerMaster: BlockManagerMaster stopped

Otherwise, a SparkException is thrown.

BlockManagerMasterEndpoint returned false, expected true.

Registering BlockManager to Driver (registerBlockManager method)

registerBlockManager(
  blockManagerId: BlockManagerId,
  maxMemSize: Long,
  slaveEndpoint: RpcEndpointRef): Unit

When registerBlockManager runs, you should see the following INFO message in the logs:

INFO BlockManagerMaster: Trying to register BlockManager
spark BlockManagerMaster RegisterBlockManager.png
Figure 1. Registering BlockManager with the Driver

It then informs the driver about the new BlockManager by sending RegisterBlockManager to BlockManagerMaster RPC endpoint and waiting for a response.

If all goes fine, you should see the following INFO message in the logs:

INFO BlockManagerMaster: Registered BlockManager

Otherwise, a SparkException is thrown.

BlockManagerMasterEndpoint returned false, expected true.
Note
registerBlockManager is called while BlockManager is being initialized (on the driver and executors) and while re-registering blocks to the driver.

Sending UpdateBlockInfo to Driver (updateBlockInfo method)

updateBlockInfo(
  blockManagerId: BlockManagerId,
  blockId: BlockId,
  storageLevel: StorageLevel,
  memSize: Long,
  diskSize: Long): Boolean

updateBlockInfo sends a UpdateBlockInfo message to BlockManagerMaster RPC endpoint and waits for a response.

You should see the following DEBUG message in the logs:

DEBUG BlockManagerMaster: Updated info of block [blockId]

The response from the BlockManagerMaster RPC endpoint is returned.

Get Block Locations of One Block (getLocations method)

getLocations(blockId: BlockId): Seq[BlockManagerId]

getLocations posts GetLocations(blockId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

Get Block Locations for Multiple Blocks (getLocations method)

getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]]

getLocations posts GetLocationsMultipleBlockIds(blockIds) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

getPeers

getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId]

getPeers posts GetPeers(blockManagerId) message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

getExecutorEndpointRef

getExecutorEndpointRef(executorId: String): Option[RpcEndpointRef]

getExecutorEndpointRef posts GetExecutorEndpointRef(executorId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

getMemoryStatus

getMemoryStatus: Map[BlockManagerId, (Long, Long)]

getMemoryStatus posts a GetMemoryStatus message BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

getStorageStatus

getStorageStatus: Array[StorageStatus]

getStorageStatus posts a GetStorageStatus message to BlockManagerMaster RPC endpoint and waits for a response which becomes the return value.

getBlockStatus

getBlockStatus(
  blockId: BlockId,
  askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus]

getBlockStatus posts a GetBlockStatus(blockId, askSlaves) message to BlockManagerMaster RPC endpoint and waits for a response (of type Map[BlockManagerId, Future[Option[BlockStatus]]]).

It then builds a sequence of future results that are BlockStatus statuses and waits for a result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

No result leads to a SparkException with the following message:

BlockManager returned null for BlockStatus query: [blockId]

getMatchingBlockIds

getMatchingBlockIds(
  filter: BlockId => Boolean,
  askSlaves: Boolean): Seq[BlockId]

getMatchingBlockIds posts a GetMatchingBlockIds(filter, askSlaves) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result for spark.rpc.askTimeout, spark.network.timeout or 120 secs.

hasCachedBlocks

hasCachedBlocks(executorId: String): Boolean

hasCachedBlocks posts a HasCachedBlocks(executorId) message to BlockManagerMaster RPC endpoint and waits for a response which becomes the result.

BlockManagerMasterEndpoint — BlockManagerMaster RPC Endpoint

BlockManagerMasterEndpoint is the RPC endpoint for BlockManagerMaster on the driver (aka master node) to track statuses of the block managers on executors.

Note
It is used to register the BlockManagerMaster RPC endpoint when creating SparkEnv.
Tip

Enable INFO logging level for org.apache.spark.storage.BlockManagerMasterEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerMasterEndpoint=INFO

Refer to Logging.

Internal Registries

blockLocations

blockLocations is a collection of BlockId and its locations (as BlockManagerId).

Note
It is used in removeRdd to remove blocks for a RDD, removeBlockManager to remove blocks after a BlockManager gets removed, removeBlockFromWorkers, updateBlockInfo, and getLocations.

RemoveExecutor

RemoveExecutor(execId: String)

When RemoveExecutor is received, executor execId is removed and the response true sent back.

BlockManagerHeartbeat

Caution
FIXME

GetLocations

GetLocations(blockId: BlockId)

When GetLocations comes in, the internal getLocations method is executed and the result becomes the response sent back.

Note
GetLocations is used to get the block locations of a single block.

RegisterBlockManager

RegisterBlockManager(
  blockManagerId: BlockManagerId,
  maxMemSize: Long,
  sender: RpcEndpointRef)

When RegisterBlockManager is received, the internal register method is executed.

Note
RegisterBlockManager is used to register a BlockManager to the driver.
register
register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef): Unit

register records the current time and registers BlockManager by id if it has not been already registered (using the internal blockManagerInfo registry).

Registering a BlockManager can only happen once for an executor (identified by BlockManagerId.executorId using the internal blockManagerIdByExecutor registry).

If another BlockManager has earlier been registered for the executor, you should see the following ERROR message in the logs:

ERROR Got two different block manager registrations on same executor - will replace old one [oldId] with new one [id]

You should see the following INFO message in the logs:

INFO Registering block manager [hostPort] with [bytes] RAM, [id]

The BlockManager is recorded in the internal registries: blockManagerIdByExecutor and blockManagerInfo.

Caution
FIXME Why does blockManagerInfo require a new System.currentTimeMillis() since time was already recorded?
Note
The method can only be executed on the driver where listenerBus is available.
Caution
FIXME Describe listenerBus + omnigraffle it.

Other RPC Messages

  • UpdateBlockInfo

  • GetLocationsMultipleBlockIds

  • GetPeers

  • GetRpcHostPortForExecutor

  • GetMemoryStatus

  • GetStorageStatus

  • GetBlockStatus

  • GetMatchingBlockIds

  • RemoveRdd

  • RemoveShuffle

  • RemoveBroadcast

  • RemoveBlock

  • StopBlockManagerMaster

  • BlockManagerHeartbeat

  • HasCachedBlocks

Removing Executor (removeExecutor method)

removeExecutor(execId: String)

When executed, removeExecutor prints the following INFO message to the logs:

INFO BlockManagerMasterEndpoint: Trying to remove executor [execId] from BlockManagerMaster.

If the execId executor is found in the internal blockManagerIdByExecutor registry, the BlockManager for the executor is removed.

Removing BlockManager (removeBlockManager method)

removeBlockManager(blockManagerId: BlockManagerId)

When executed, removeBlockManager looks up blockManagerId and removes the executor it was working on from the internal blockManagerIdByExecutor as well as from blockManagerInfo.

Note
It is a private helper method that is exclusively used while removing an executor.

It then goes over all the blocks for the BlockManager, and removes the executor for each block from blockLocations registry.

You should then see the following INFO message in the logs:

INFO BlockManagerMasterEndpoint: Removing block manager [blockManagerId]

Get Block Locations (getLocations method)

getLocations(blockId: BlockId): Seq[BlockManagerId]

When executed, getLocations looks up blockId in the blockLocations internal registry and returns the locations (as a collection of BlockManagerId) or an empty collection.

results matching ""

    No results matching ""