BlockManager

BlockManager is a key-value store for blocks of data in Spark. BlockManager acts as a local cache that runs on every node in Spark cluster, i.e. the driver and executors. It provides interface for uploading and fetching blocks both locally and remotely using various stores, i.e. memory, disk, and off-heap. See Stores in this document.

A BlockManager is a BlockDataManager, i.e. manages the storage for blocks that can represent cached RDD partitions, intermediate shuffle outputs, broadcasts, etc. It is also a BlockEvictionHandler that drops a block from memory and storing it on a disk if applicable.

Cached blocks are blocks with non-zero sum of memory and disk sizes.

BlockManager is created as a Spark application starts.

A BlockManager must be initialized before it is fully operable.

When the External Shuffle Service is enabled, BlockManager uses ExternalShuffleClient to read other executors' shuffle files.

Tip

Enable INFO, DEBUG or TRACE logging level for org.apache.spark.storage.BlockManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManager=TRACE

Refer to Logging.

Tip

You may want to shut off WARN messages being printed out about the current state of blocks using the following line to cut the noise:

log4j.logger.org.apache.spark.storage.BlockManager=OFF

initialize Method

Caution
FIXME

getRemoteBytes Method

Caution
FIXME

Using External Shuffle Service (externalShuffleServiceEnabled flag)

When the External Shuffle Service is enabled for a Spark application, BlockManager uses ExternalShuffleClient to read other executors' shuffle files.

Caution
FIXME How is shuffleClient used?

registerTask

Caution
FIXME

Stores

A Store is the place where blocks are held.

There are the following possible stores:

  • MemoryStore for memory storage level.

  • DiskStore for disk storage level.

  • ExternalBlockStore for OFF_HEAP storage level.

Storing Block (putBytes method)

putBytes(
  blockId: BlockId,
  bytes: ChunkedByteBuffer,
  level: StorageLevel,
  tellMaster: Boolean = true): Boolean

putBytes puts the blockId block of bytes bytes and level storage level to BlockManager.

It simply passes the call on to the internal doPutBytes.

doPutBytes

def doPutBytes[T](
  blockId: BlockId,
  bytes: ChunkedByteBuffer,
  level: StorageLevel,
  classTag: ClassTag[T],
  tellMaster: Boolean = true,
  keepReadLock: Boolean = false): Boolean

doPutBytes is an internal method that calls the internal helper doPut with putBody being a function that accepts a BlockInfo and does the uploading.

If the replication storage level is greater than 1, replication starts in a separate thread (using the internal replicate method).

Caution
FIXME When is replication storage level greater than 1?

For a memory storage level, depending on whether it is a deserialized one or not, putIteratorAsValues or putBytes of MemoryStore are used, respectively. If the put did not succeed and the storage level is also a disk one, you should see the following WARN message in the logs:

WARN BlockManager: Persisting block [blockId] to disk instead.

DiskStore.putBytes is called.

Note
DiskStore is only used when MemoryStore has failed for memory and disk storage levels.

If the storage level is a disk one only, DiskStore.putBytes is called.

doPutBytes requests current block status and if the block was successfully stored, and the driver should know about it (tellMaster), it reports current storage status of the block to the driver. The current TaskContext metrics are updated with the updated block status.

Regardless of the block being successfully stored or not, you should see the following DEBUG message in the logs:

DEBUG BlockManager: Put block [blockId] locally took [time] ms

For replication level greater than 1, doPutBytes waits for the earlier asynchronous replication to finish.

The final result of doPutBytes is the result of storing the block successful or not (as computed earlier).

replicate

Caution
FIXME

doPutIterator

Caution
FIXME

doPut

doPut[T](
  blockId: BlockId,
  level: StorageLevel,
  classTag: ClassTag[_],
  tellMaster: Boolean,
  keepReadLock: Boolean)(putBody: BlockInfo => Option[T]): Option[T]

doPut is an internal helper method for doPutBytes and doPutIterator.

doPut executes the input putBody function with a BlockInfo being a new BlockInfo object that BlockInfoManager managed to create a lock for writing.

If the block has already been created, the following WARN message is printed out to the logs:

WARN Block [blockId] already exists on this machine; not re-adding it

It releases the read lock for the block when keepReadLock flag is disabled. doPut returns None immediately.

putBody is executed.

If the result of putBody is None the block is considered saved successfully.

For successful save and keepReadLock enabled, blockInfoManager.downgradeLock(blockId) is called.

For successful save and keepReadLock disabled, blockInfoManager.unlock(blockId) is called.

For unsuccessful save, blockInfoManager.removeBlock(blockId) is called and the following WARN message is printed out to the logs:

WARN Putting block [blockId] failed

Ultimately, the following DEBUG message is printed out to the logs:

DEBUG Putting block [blockId] [withOrWithout] replication took [usedTime] ms

Removing Block From Memory and Disk (removeBlock method)

removeBlock(blockId: BlockId, tellMaster: Boolean = true): Unit

removeBlock removes the blockId block from the MemoryStore and DiskStore.

When executed, it prints out the following DEBUG message to the logs:

DEBUG Removing block [blockId]

It requests BlockInfoManager for lock for writing for the blockId block. If it receives none, it prints out the following WARN message to the logs and quits.

WARN Asked to remove block [blockId], which does not exist

Otherwise, with a write lock for the block, the block is removed from MemoryStore and DiskStore (see Removing Block in MemoryStore and Removing Block in DiskStore).

If both removals fail, it prints out the following WARN message:

WARN Block [blockId] could not be removed as it was not found in either the disk, memory, or external block store

The block is removed from BlockInfoManager.

It then calculates the current block status that is used to report the block status to the driver (if the input tellMaster and the info’s tellMaster are both enabled, i.e. true) and the current TaskContext metrics are updated with the change.

Removing RDD Blocks (removeRdd method)

removeRdd(rddId: Int): Int

removeRdd removes all the blocks that belong to the rddId RDD.

It prints out the following INFO message to the logs:

INFO Removing RDD [rddId]

It then requests RDD blocks from BlockInfoManager and removes them (from memory and disk) (without informing the driver).

The number of blocks removed is the final result.

Removing Broadcast Blocks (removeBroadcast method)

removeBroadcast(broadcastId: Long, tellMaster: Boolean): Int

removeBroadcast removes all the blocks of the input broadcastId broadcast.

Internally, it starts by printing out the following DEBUG message to the logs:

DEBUG Removing broadcast [broadcastId]

It then requests all the BroadcastBlockId objects that belong to the broadcastId broadcast from BlockInfoManager and removes them (from memory and disk).

The number of blocks removed is the final result.

Getting Block Status (getStatus method)

Caution
FIXME

Creating BlockManager Instance

A BlockManager needs the following services to be created:

Note
executorId is SparkContext.DRIVER_IDENTIFIER, i.e. driver for the driver and the value of --executor-id command-line argument for CoarseGrainedExecutorBackend executors or MesosExecutorBackend.
Caution
FIXME Elaborate on the executor backends and executor ids.

When a BlockManager instance is created it sets the internal externalShuffleServiceEnabled flag to the value of spark.shuffle.service.enabled setting.

It creates an instance of DiskBlockManager (requesting deleteFilesOnStop when an external shuffle service is not in use).

It creates an instance of BlockInfoManager (as blockInfoManager).

It creates block-manager-future daemon cached thread pool with 128 threads maximum (as futureExecutionContext).

It creates a MemoryStore and DiskStore.

MemoryManager gets the MemoryStore object assigned.

It requests the current maximum memory from MemoryManager (using maxOnHeapStorageMemory as maxMemory).

It calculates the port used by the external shuffle service (as externalShuffleServicePort).

Note
It is computed specially in Spark on YARN.
Caution
FIXME Describe the YARN-specific part.

It creates a client to read other executors' shuffle files (as shuffleClient). If the external shuffle service is used an ExternalShuffleClient is created or the input BlockTransferService is used.

It registers BlockManagerSlaveEndpoint with the input RpcEnv, itself, and MapOutputTracker (as slaveEndpoint).

Note
A BlockManager instance is created while SparkEnv is being created.

shuffleClient

Caution
FIXME

(that is assumed to be a ExternalShuffleClient)

shuffleServerId

Caution
FIXME

Initializing BlockManager (initialize method)

initialize(appId: String): Unit

initialize method is called to initialize the BlockManager instance on the driver and executors (see Creating SparkContext Instance and Creating Executor Instance, respectively).

Note
The method must be called before a BlockManager can be considered fully operable.

It does the following:

  1. It initializes BlockTransferService.

  2. It initializes a shuffle client, be it ExternalShuffleClient or BlockTransferService.

  3. It sets shuffleServerId to an instance of BlockManagerId given an executor id, host name and port for BlockTransferService.

  4. It creates the address of the server that serves this executor’s shuffle files (using shuffleServerId)

Caution
FIXME Describe shuffleServerId. Where is it used?

If the External Shuffle Service is used, the following INFO appears in the logs:

INFO external shuffle service port = [externalShuffleServicePort]

It registers itself to the driver’s BlockManagerMaster passing the BlockManagerId, the maximum memory (as maxMemory), and the BlockManagerSlaveEndpoint.

Ultimately, if the initialization happens on an executor and the External Shuffle Service is used, it registers to the shuffle service.

Note
The method is called when the driver is launched (and SparkContext is created) and when an Executor is launched.

Registering Executor’s BlockManager with External Shuffle Server (registerWithExternalShuffleServer method)

registerWithExternalShuffleServer(): Unit

registerWithExternalShuffleServer is an internal helper method to register the BlockManager for an executor with an external shuffle server.

When executed, you should see the following INFO message in the logs:

INFO Registering executor with local external shuffle service.

It uses shuffleClient to register the block manager using shuffleServerId (i.e. the host, the port and the executorId) and a ExecutorShuffleInfo.

Note
The ExecutorShuffleInfo uses localDirs and subDirsPerLocalDir from DiskBlockManager and the class name of the constructor ShuffleManager.

It tries to register at most 3 times with 5-second sleeps in-between.

Note
The maximum number of attempts and the sleep time in-between are hard-coded, i.e. they are not configured.

Any issues while connecting to the external shuffle service are reported as ERROR messages in the logs:

ERROR Failed to connect to external shuffle server, will retry [#attempts] more times after waiting 5 seconds...

Re-registering Blocks to Driver (reregister method)

reregister(): Unit

When is called, you should see the following INFO in the logs:

INFO BlockManager: BlockManager re-registering with master

It registers itself to the driver’s BlockManagerMaster (just as it was when BlockManager was initializing). It passes the BlockManagerId, the maximum memory (as maxMemory), and the BlockManagerSlaveEndpoint.

Caution
FIXME Where is maxMemory used once passed to the driver?

reregister will then report all the local blocks to the BlockManagerMaster.

You should see the following INFO message in the logs:

INFO BlockManager: Reporting [blockInfoManager.size] blocks to the master.

If there is an issue communicating to the BlockManagerMaster, you should see the following ERROR message in the logs:

ERROR BlockManager: Failed to report [blockId] to master; giving up.

After the ERROR message, reregister stops reporting.

Calculate Current Block Status (getCurrentBlockStatus method)

getCurrentBlockStatus(blockId: BlockId, info: BlockInfo): BlockStatus

getCurrentBlockStatus returns the current BlockStatus of the BlockId block (with the block’s current StorageLevel, memory and disk sizes). It uses MemoryStore and DiskStore for size and other information.

Note
Most of the information to build BlockStatus is already in BlockInfo except that it may not necessarily reflect the current state per MemoryStore and DiskStore.

Internally, it uses the input BlockInfo to know about the block’s storage level. If the storage level is not set (i.e. null), the returned BlockStatus assumes the default NONE storage level and the memory and disk sizes being 0.

If however the storage level is set, getCurrentBlockStatus uses MemoryStore or DiskStore to check whether the block is stored in the storages or not and request for their sizes in the storages respectively (using their getSize or assume 0).

Note
It is acceptable that the BlockInfo says to use memory or disk yet the block is not in the storages (yet or anymore). The method will give current status.

Removing Blocks From Memory Only (dropFromMemory method)

dropFromMemory(
  blockId: BlockId,
  data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel

When dropFromMemory is executed, you should see the following INFO message in the logs:

INFO BlockManager: Dropping block [blockId] from memory

It then asserts that the blockId block is locked for writing.

If the block’s StorageLevel uses disks and the internal DiskStore object (diskStore) does not contain the block, it is saved then. You should see the following INFO message in the logs:

INFO BlockManager: Writing block [blockId] to disk
Caution
FIXME Describe the case with saving a block to disk.

The block’s memory size is fetched and recorded (using MemoryStore.getSize).

The block is removed from memory if exists. If not, you should see the following WARN message in the logs:

WARN BlockManager: Block [blockId] could not be dropped from memory as it does not exist

It then calculates the current storage status of the block and reports it to the driver. It only happens when info.tellMaster.

Caution
FIXME When would info.tellMaster be true?

A block is considered updated when it was written to disk or removed from memory or both. If either happened, the current TaskContext metrics are updated with the change.

Ultimately, dropFromMemory returns the current storage level of the block.

Note
dropFromMemory is part of the single-method BlockEvictionHandler interface.

Reporting Current Storage Status of Block to Driver (reportBlockStatus method)

reportBlockStatus(
  blockId: BlockId,
  info: BlockInfo,
  status: BlockStatus,
  droppedMemorySize: Long = 0L): Unit

reportBlockStatus is an internal method for reporting a block status to the driver and if told to re-register it prints out the following INFO message to the logs:

INFO BlockManager: Got told to re-register updating block [blockId]

It does asynchronous reregistration (using asyncReregister).

In either case, it prints out the following DEBUG message to the logs:

DEBUG BlockManager: Told master about block [blockId]
Note
reportBlockStatus is called by doPutBytes, doPutIterator, dropFromMemory, and removeBlock.

tryToReportBlockStatus

def tryToReportBlockStatus(
  blockId: BlockId,
  info: BlockInfo,
  status: BlockStatus,
  droppedMemorySize: Long = 0L): Boolean

tryToReportBlockStatus is an internal method to report block status to the driver.

It executes BlockManagerMaster.updateBlockInfo only if the state changes should be reported to the driver (i.e. info.tellMaster is enabled).

It returns true or BlockManagerMaster.updateBlockInfo's response.

BlockEvictionHandler

BlockEvictionHandler is a private[storage] Scala trait with a single method dropFromMemory.

dropFromMemory(
  blockId: BlockId,
  data: () => Either[Array[T], ChunkedByteBuffer]): StorageLevel
Note
A BlockManager is a BlockEvictionHandler.
Note
dropFromMemory is called when MemoryStore evicts blocks from memory to free space.

BlockManagerSlaveEndpoint

BlockManagerSlaveEndpoint is a thread-safe RPC endpoint for remote communication between executors and the driver.

Caution
FIXME the intro needs more love.

While a BlockManager is being created so is the BlockManagerSlaveEndpoint RPC endpoint with the name BlockManagerEndpoint[randomId] to handle RPC messages.

Tip

Enable DEBUG logging level for org.apache.spark.storage.BlockManagerSlaveEndpoint logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockManagerSlaveEndpoint=DEBUG

Refer to Logging.

RemoveBlock Message

RemoveBlock(blockId: BlockId)

When a RemoveBlock message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing block [blockId]
Note
Handling RemoveBlock messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing block [blockId], response is [response]

And true response is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: true to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing block [blockId]

RemoveRdd Message

RemoveRdd(rddId: Int)

When a RemoveRdd message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing RDD [rddId]
Note
Handling RemoveRdd messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing RDD [rddId], response is [response]

And the number of blocks removed is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: [#blocks] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing RDD [rddId]

RemoveShuffle Message

RemoveShuffle(shuffleId: Int)

When a RemoveShuffle message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing shuffle [shuffleId]

If MapOutputTracker was given (when the RPC endpoint was created), it calls MapOutputTracker to unregister the shuffleId shuffle.

Note
Handling RemoveShuffle messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing shuffle [shuffleId], response is [response]

And the result is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: [response] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing shuffle [shuffleId]

RemoveBroadcast Message

RemoveBroadcast(broadcastId: Long)

When a RemoveBroadcast message comes in, you should see the following DEBUG message in the logs:

DEBUG BlockManagerSlaveEndpoint: removing broadcast [broadcastId]
Note
Handling RemoveBroadcast messages happens on a separate thread. See BlockManagerSlaveEndpoint Thread Pool.

When the computation is successful, you should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Done removing broadcast [broadcastId], response is [response]

And the result is sent back. You should see the following DEBUG in the logs:

DEBUG BlockManagerSlaveEndpoint: Sent response: [response] to [senderAddress]

In case of failure, you should see the following ERROR in the logs and the stack trace.

ERROR BlockManagerSlaveEndpoint: Error in removing broadcast [broadcastId]

GetBlockStatus Message

GetBlockStatus(blockId: BlockId)

When a GetBlockStatus message comes in, it responds with the result of calling BlockManager about the status of blockId.

GetMatchingBlockIds Message

GetMatchingBlockIds(filter: BlockId => Boolean)

When a GetMatchingBlockIds message comes in, it responds with the result of calling BlockManager for matching blocks for filter.

TriggerThreadDump Message

When a TriggerThreadDump message comes in, a thread dump is generated and sent back.

BlockManagerSlaveEndpoint Thread Pool

BlockManagerSlaveEndpoint uses block-manager-slave-async-thread-pool daemon thread pool (asyncThreadPool) for some messages to talk to other Spark services, i.e. BlockManager, MapOutputTracker, ShuffleManager in a non-blocking, asynchronous way.

The reason for the async thread pool is that the block-related operations might take quite some time and to release the main RPC thread other threads are spawned to talk to the external services and pass responses on to the clients.

Note
BlockManagerSlaveEndpoint uses Java’s java.util.concurrent.ThreadPoolExecutor.

Broadcast Values

When a new broadcast value is created, TorrentBroadcast blocks are put in the block manager.

You should see the following TRACE message:

TRACE Put for block [blockId] took [startTimeMs] to get into synchronized block

It puts the data in the memory first and drop to disk if the memory store can’t hold it.

DEBUG Put block [blockId] locally took [startTimeMs]

BlockManagerId

DiskBlockManager

DiskBlockManager creates and maintains the logical mapping between logical blocks and physical on-disk locations.

By default, one block is mapped to one file with a name given by its BlockId. It is however possible to have a block map to only a segment of a file.

Block files are hashed among the directories listed in spark.local.dir (or in SPARK_LOCAL_DIRS if set).

Caution
FIXME Review me.

Execution Context

block-manager-future is the execution context for…​FIXME

Metrics

Block Manager uses Spark Metrics System (via BlockManagerSource) to report metrics about internal status.

The name of the source is BlockManager.

It emits the following numbers:

  • memory / maxMem_MB - the maximum memory configured

  • memory / remainingMem_MB - the remaining memory

  • memory / memUsed_MB - the memory used

  • memory / diskSpaceUsed_MB - the disk used

Misc

The underlying abstraction for blocks in Spark is a ByteBuffer that limits the size of a block to 2GB (Integer.MAX_VALUE - see Why does FileChannel.map take up to Integer.MAX_VALUE of data? and SPARK-1476 2GB limit in spark for blocks). This has implication not just for managed blocks in use, but also for shuffle blocks (memory mapped blocks are limited to 2GB, even though the API allows for long), ser-deser via byte array-backed output streams.

When a non-local executor starts, it initializes a BlockManager object for the spark.app.id id.

results matching ""

    No results matching ""