Spark Listeners — Intercepting Events from Spark Scheduler

Spark Listeners intercept events from the Spark scheduler that are emitted over the course of execution of Spark applications.

A Spark listener is an implementation of the SparkListener developer API that is an extension of SparkListenerInterface where all the callback methods are no-op/do-nothing.

Spark uses Spark listeners for web UI, event persistence (for History Server), dynamic allocation of executors and other services.

You can develop your own custom Spark listeners using the SparkListener developer API and register them using SparkContext.addSparkListener method or spark.extraListeners setting. With SparkListener you can focus on Spark events of your liking and process a subset of scheduling events.

Tip
Developing a custom SparkListener is an excellent introduction to low-level details of Spark’s Execution Model. Check out the exercise Developing Custom SparkListener to monitor DAGScheduler in Scala.
Tip

Enable INFO logging level for org.apache.spark.SparkContext logger to see when custom Spark listeners are registered.

INFO SparkContext: Registered listener org.apache.spark.scheduler.StatsReportListener

SparkListenerInterface

SparkListenerInterface is an internal interface for listeners of events from the Spark scheduler.

Table 1. SparkListenerInterface Methods
Method Description

onStageSubmitted

onStageCompleted

onTaskStart

onTaskGettingResult

onTaskEnd

onJobStart

onJobEnd

onEnvironmentUpdate

onBlockManagerAdded

onBlockManagerRemoved

onUnpersistRDD

onApplicationStart

onApplicationEnd

onExecutorMetricsUpdate

onExecutorAdded

onExecutorRemoved

onBlockUpdated

onOtherEvent

Built-In Spark Listeners

Table 2. Built-In Spark Listeners
Spark Listener Description

EventLoggingListener

Logs JSON-encoded events to a file that can later be read by History Server

StatsReportListener

SparkFirehoseListener

Allows users to receive all SparkListenerEvent events by overriding the single onEvent method only.

ExecutorAllocationListener

HeartbeatReceiver

StreamingJobProgressListener

ExecutorsListener

Prepares information for Executors tab in web UI

StorageStatusListener, RDDOperationGraphListener, EnvironmentListener, BlockStatusListener and StorageListener

For web UI

SpillListener

ApplicationEventListener

StreamingQueryListenerBus

SQLListener / SQLHistoryListener

Support for History Server

StreamingListenerBus

JobProgressListener

SparkListenerEvents

Caution
FIXME Give a less code-centric description of the times for the events.

SparkListenerExecutorMetricsUpdate

Caution
FIXME

SparkListenerApplicationStart

SparkListenerApplicationStart(
  appName: String,
  appId: Option[String],
  time: Long,
  sparkUser: String,
  appAttemptId: Option[String],
  driverLogs: Option[Map[String, String]] = None)

SparkListenerApplicationStart is posted when SparkContext does postApplicationStart.

SparkListenerJobStart

SparkListenerJobStart(
  jobId: Int,
  time: Long,
  stageInfos: Seq[StageInfo],
  properties: Properties = null)

SparkListenerJobStart is posted when DAGScheduler does handleJobSubmitted and handleMapStageSubmitted.

SparkListenerStageSubmitted

SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)

SparkListenerStageSubmitted is posted when DAGScheduler does submitMissingTasks.

SparkListenerTaskStart

SparkListenerTaskStart(stageId: Int, stageAttemptId: Int, taskInfo: TaskInfo)

SparkListenerTaskStart is posted when DAGScheduler is informed that a task is being started.

SparkListenerTaskGettingResult

SparkListenerTaskGettingResult(taskInfo: TaskInfo)

SparkListenerTaskGettingResult is posted when DAGScheduler handles GettingResultEvent event.

SparkListenerTaskEnd

SparkListenerTaskEnd(
  stageId: Int,
  stageAttemptId: Int,
  taskType: String,
  reason: TaskEndReason,
  taskInfo: TaskInfo,
  // may be null if the task has failed
  @Nullable taskMetrics: TaskMetrics)

SparkListenerTaskEnd is posted when DAGScheduler handles a task completion.

SparkListenerStageCompleted

SparkListenerStageCompleted(stageInfo: StageInfo)

SparkListenerStageCompleted is posted when DAGScheduler does markStageAsFinished.

SparkListenerJobEnd

SparkListenerJobEnd(
  jobId: Int,
  time: Long,
  jobResult: JobResult)

SparkListenerJobEnd is posted when DAGScheduler does cleanUpAfterSchedulerStop, handleTaskCompletion, failJobAndIndependentStages, and markMapStageJobAsFinished.

SparkListenerApplicationEnd

SparkListenerApplicationEnd(time: Long)

SparkListenerApplicationEnd is posted when SparkContext does postApplicationEnd.

SparkListenerEnvironmentUpdate

SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])

SparkListenerEnvironmentUpdate is posted when SparkContext does postEnvironmentUpdate.

SparkListenerBlockManagerAdded

SparkListenerBlockManagerAdded(
  time: Long,
  blockManagerId: BlockManagerId,
  maxMem: Long)

SparkListenerBlockManagerAdded is posted when BlockManagerMasterEndpoint registers a BlockManager.

SparkListenerBlockManagerRemoved

SparkListenerBlockManagerRemoved(
  time: Long,
  blockManagerId: BlockManagerId)

SparkListenerBlockManagerRemoved is posted when BlockManagerMasterEndpoint removes a BlockManager.

SparkListenerBlockUpdated

SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo)

SparkListenerBlockUpdated is posted when BlockManagerMasterEndpoint receives UpdateBlockInfo message.

SparkListenerUnpersistRDD

SparkListenerUnpersistRDD(rddId: Int)

SparkListenerUnpersistRDD is posted when SparkContext does unpersistRDD.

SparkListenerExecutorAdded

SparkListenerExecutorAdded(
  time: Long,
  executorId: String,
  executorInfo: ExecutorInfo)

SparkListenerExecutorAdded is posted when DriverEndpoint RPC endpoint (of CoarseGrainedSchedulerBackend) handles RegisterExecutor message, MesosFineGrainedSchedulerBackend does resourceOffers, and LocalSchedulerBackendEndpoint starts.

SparkListenerExecutorRemoved

SparkListenerExecutorRemoved(
  time: Long,
  executorId: String,
  reason: String)

SparkListenerExecutorRemoved is posted when DriverEndpoint RPC endpoint (of CoarseGrainedSchedulerBackend) does removeExecutor and MesosFineGrainedSchedulerBackend does removeExecutor.

results matching ""

    No results matching ""