TaskMemoryManager

TaskMemoryManager manages the memory allocated by an individual task.

It assumes that:

  • The number of bits to address pages (aka PAGE_NUMBER_BITS) is 13

  • The number of bits to encode offsets in data pages (aka OFFSET_BITS) is 51 (i.e. 64 bits - PAGE_NUMBER_BITS)

  • The number of entries in the page table and allocated pages (aka PAGE_TABLE_SIZE) is 8192 (i.e. 1 << PAGE_NUMBER_BITS)

  • The maximum page size (aka MAXIMUM_PAGE_SIZE_BYTES) is 15GB (i.e. ((1L << 31) - 1) * 8L)

Note
It is used to create a TaskContextImpl instance.
Tip

Enable INFO, DEBUG or even TRACE logging levels for org.apache.spark.memory.TaskMemoryManager logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.memory.TaskMemoryManager=TRACE

Refer to Logging.

Caution
FIXME How to trigger the messages in the logs? What to execute to have them printed out to the logs?

Creating TaskMemoryManager Instance

TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId)

A single TaskMemoryManager manages the memory of a single task (by the task’s taskAttemptId).

Note
Although the constructor parameter taskAttemptId refers to a task’s attempt id it is really a taskId. It should be changed perhaps?

When called, the constructor uses the input MemoryManager to know whether it is in Tungsten memory mode (disabled by default) and saves the MemoryManager and taskAttemptId for later use.

It also initializes the internal consumers to be empty.

Note
When a TaskRunner starts running, it creates a new instance of TaskMemoryManager for the task by taskId. It then assigns the TaskMemoryManager to the individual task before it runs.
spark taskmemorymanager creating instance.png
Figure 1. Creating TaskMemoryManager for Task

Acquire Execution Memory (acquireExecutionMemory method)

long acquireExecutionMemory(long required, MemoryConsumer consumer)

acquireExecutionMemory allocates up to required size of memory for consumer. When no memory could be allocated, it calls spill on every consumer, itself including. Finally, it returns the allocated memory.

Note
It synchronizes on itself, and so no other calls on the object could be completed.
Note
MemoryConsumer knows its mode — on- or off-heap.

It first calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode).

Tip
TaskMemoryManager is a mere wrapper of MemoryManager to track consumers?

When the memory obtained is less than requested (by required), it requests all consumers to spill the remaining required memory.

Note
It requests memory from consumers that work in the same mode except the requesting one.

You may see the following DEBUG message when spill released some memory:

DEBUG Task [taskAttemptId] released [bytes] from [consumer] for [consumer]

acquireExecutionMemory calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) again (it called it at the beginning).

It does the memory acquisition until it gets enough memory or there are no more consumers to request spill from.

You may also see the following ERROR message in the logs when there is an error while requesting spill with OutOfMemoryError followed.

ERROR error while calling spill() on [consumer]

If the earlier spill on the consumers did not work out and there is still not enough memory acquired, acquireExecutionMemory calls spill on the input consumer (that requested more memory!)

If the consumer releases some memory, you should see the following DEBUG message in the logs:

DEBUG Task [taskAttemptId] released [bytes] from itself ([consumer])

acquireExecutionMemory calls memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) once more.

Note
memoryManager.acquireExecutionMemory(required, taskAttemptId, mode) could have been called "three" times, i.e. at the very beginning, for each consumer, and on itself.

It records the consumer in consumers registry.

You should see the following DEBUG message in the logs:

DEBUG Task [taskAttemptId] acquired [bytes] for [consumer]
Note
acquireExecutionMemory is called when a MemoryConsumer tries to acquires a memory and allocatePage.

Getting Page (getPage method)

Caution
FIXME

Getting Page Offset (getOffsetInPage method)

Caution
FIXME

Freeing Memory Page (freePage method)

Caution
FIXME

cleanUpAllAllocatedMemory

It clears page table.

All recorded consumers are queried for the size of used memory. If the memory used is greater than 0, the following WARN message is printed out to the logs:

WARN TaskMemoryManager: leak [bytes] memory from [consumer]

The consumers collection is then cleared.

MemoryManager.releaseExecutionMemory is executed to release the memory that is not used by any consumer.

Before cleanUpAllAllocatedMemory returns, it calls MemoryManager.releaseAllExecutionMemoryForTask that in turn becomes the return value.

Caution
FIXME Image with the interactions to MemoryManager.

Allocating Memory Block for Tungsten Consumers (allocatePage method)

MemoryBlock allocatePage(long size, MemoryConsumer consumer)
Note
It only handles Tungsten Consumers, i.e. MemoryConsumers in tungstenMemoryMode mode.

allocatePage allocates a block of memory (aka page) smaller than MAXIMUM_PAGE_SIZE_BYTES maximum size.

It checks size against the internal MAXIMUM_PAGE_SIZE_BYTES maximum size. If it is greater than the maximum size, the following IllegalArgumentException is thrown:

Cannot allocate a page with more than [MAXIMUM_PAGE_SIZE_BYTES] bytes

It then acquires execution memory (for the input size and consumer).

It finishes by returning null when no execution memory could be acquired.

With the execution memory acquired, it finds the smallest unallocated page index and records the page number (using allocatedPages registry).

If the index is PAGE_TABLE_SIZE or higher, releaseExecutionMemory(acquired, consumer) is called and then the following IllegalStateException is thrown:

Have already allocated a maximum of [PAGE_TABLE_SIZE] pages

It then attempts to allocate a MemoryBlock from Tungsten MemoryAllocator (calling memoryManager.tungstenMemoryAllocator().allocate(acquired)).

Caution
FIXME What is MemoryAllocator?

When successful, MemoryBlock gets assigned pageNumber and it gets added to the internal pageTable registry.

You should see the following TRACE message in the logs:

TRACE Allocate page number [pageNumber] ([acquired] bytes)

The page is returned.

If a OutOfMemoryError is thrown when allocating a MemoryBlock page, the following WARN message is printed out to the logs:

WARN Failed to allocate a page ([acquired] bytes), try again.

And acquiredButNotUsed gets acquired memory space with the pageNumber cleared in allocatedPages (i.e. the index for pageNumber gets false).

Caution
FIXME Why is the code tracking acquiredButNotUsed?

Another allocatePage attempt is recursively tried.

Caution
FIXME Why is there a hope for being able to allocate a page?

releaseExecutionMemory

Caution
FIXME

Internal Registries

pageTable

pageTable is an internal array of size PAGE_TABLE_SIZE with indices being MemoryBlock objects.

When allocating a MemoryBlock page for Tungsten consumers, the index corresponds to pageNumber that points to the MemoryBlock page allocated.

allocatedPages

allocatedPages is an internal collection of flags (true or false values) of size PAGE_TABLE_SIZE with all bits initially disabled (i.e. false).

Tip
allocatedPages is java.util.BitSet.

When allocatePage is called, it will record the page in the registry by setting the bit at the specified index (that corresponds to the allocated page) to true.

consumers

consumers is an internal set of MemoryConsumers.

acquiredButNotUsed

acquiredButNotUsed tracks the size of memory allocated but not used.

pageSizeBytes method

Caution
FIXME

showMemoryUsage method

Caution
FIXME

results matching ""

    No results matching ""