BlockInfoManager

BlockInfoManager manages memory blocks (aka memory pages). It controls concurrent access to memory blocks by read and write locks (for existing and new ones).

Note
Locks are the mechanism to control concurrent access to data and prevent destructive interaction between operations that use the same resource.
Note
BlockInfoManager is a private[storage] class that belongs to org.apache.spark.storage package.
Tip

Enable TRACE logging level for org.apache.spark.storage.BlockInfoManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.storage.BlockInfoManager=TRACE

Refer to Logging.

Obtaining Read Lock (lockForReading method)

lockForReading(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

lockForReading locks blockId memory block for reading when the block was registered earlier and no writer tasks use it.

When executed, lockForReading prints out the following TRACE message to the logs:

TRACE BlockInfoManager: Task [currentTaskAttemptId] trying to acquire read lock for [blockId]

It looks up the metadata (in infos registry).

If no metadata could be found, it returns None which means that the block does not exist or was removed (and anybody could acquire a write lock).

Otherwise, when the metadata was found, i.e. registered, it checks so-called writerTask. Only when the block has no writer tasks, a read lock can be acquired (i.e. BlockInfo.writerTask is BlockInfo.NO_WRITER). If so, the readerCount of the block metadata is incremented and the block is recorded in the internal readLocksByTask registry. You should see the following TRACE message in the logs:

TRACE BlockInfoManager: Task [taskAttemptId] acquired read lock for [blockId]

The BlockInfo for the blockId block is returned.

Note
-1024 is a special taskAttemptId used to mark a non-task thread, e.g. by a driver thread or by unit test code.

For blocks with writerTask other than NO_WRITER, when blocking is enabled, lockForReading waits (until another thread invokes the Object.notify method or the Object.notifyAll methods for this object).

With blocking enabled, it will repeat the waiting-for-read-lock sequence until either None or the lock is obtained.

When blocking is disabled and the lock could not be obtained, None is returned immediately.

Note
lockForReading is a synchronized method, i.e. no two objects can use this and other instance methods.

Obtaining Write Lock (lockForWriting method)

lockForWriting(
  blockId: BlockId,
  blocking: Boolean = true): Option[BlockInfo]

When executed, lockForWriting prints out the following TRACE message to the logs:

TRACE Task [currentTaskAttemptId] trying to acquire write lock for [blockId]

It looks up blockId in the internal infos registry. When no BlockInfo could be found, None is returned. Otherwise, BlockInfo is checked for writerTask to be BlockInfo.NO_WRITER with no readers (i.e. readerCount is 0) and only then the lock is returned.

When the write lock can be returned, BlockInfo.writerTask is set to currentTaskAttemptId and a new binding is added to the internal writeLocksByTask registry. You should see the following TRACE message in the logs:

TRACE Task [currentTaskAttemptId] acquired write lock for [blockId]

If, for some reason, blockId has a writer (i.e. info.writerTask is not BlockInfo.NO_WRITER) or the number of readers is positive (i.e. BlockInfo.readerCount is greater than 0), the method will wait (based on the input blocking flag) and attempt the write lock acquisition process until it finishes with a write lock.

Note
(deadlock possible) The method is synchronized and can block, i.e. wait that causes the current thread to wait until another thread invokes Object.notify or Object.notifyAll methods for this object.

lockForWriting return None for no blockId in the internal infos registry or when blocking flag is disabled and the write lock could not be acquired.

Obtaining Write Lock for New Block (lockNewBlockForWriting method)

lockNewBlockForWriting(
  blockId: BlockId,
  newBlockInfo: BlockInfo): Boolean

lockNewBlockForWriting obtains a write lock for blockId but only when the method could register the block.

Note
lockNewBlockForWriting is similar to lockForWriting method but for brand new blocks.

When executed, lockNewBlockForWriting prints out the following TRACE message to the logs:

TRACE Task [currentTaskAttemptId] trying to put [blockId]

If some other thread has already created the block, it finishes returning false. Otherwise, when the block does not exist, newBlockInfo is recorded in the internal infos registry and the block is locked for this client for writing. It then returns true.

Note
lockNewBlockForWriting executes itself in synchronized block so once the BlockInfoManager is locked the other internal registries should be available only for the currently-executing thread.

Unlocking Memory Block (unlock method)

Caution
FIXME

Releasing All Locks Obtained by Task (releaseAllLocksForTask method)

Caution
FIXME

Removing Memory Block (removeBlock method)

Caution
FIXME

assertBlockIsLockedForWriting

Caution
FIXME

Internal Registries

infos

infos is used to track BlockInfo per block (identified by BlockId).

readLocksByTask

readLocksByTask is used to track tasks (by TaskAttemptId) and the blocks they locked for reading (identified by BlockId)

writeLocksByTask

writeLocksByTask is used to track tasks (by TaskAttemptId) and the blocks they locked for writing (identified by BlockId).

results matching ""

    No results matching ""