DirectKafkaInputDStream — Direct Kafka DStream

DirectKafkaInputDStream is an input dstream of KafkaRDD batches.

DirectKafkaInputDStream is also a CanCommitOffsets object.

As an input dstream, DirectKafkaInputDStream implements the mandatory abstract methods (from DStream Contract and InputDStream Contract):

  1. dependencies: List[DStream[_]] returns an empty collection, i.e. it has no dependencies on other streams (other than Kafka brokers to read data from).

  2. slideDuration: Duration passes all calls on to DStreamGraph.batchDuration.

  3. compute(validTime: Time): Option[RDD[T]] - consult Computing RDDs (using compute method) section.

  4. start to start polling for messages from Kafka.

  5. stop to close the Kafka consumer (and therefore polling for messages from Kafka).

The name of a DirectKafkaInputDStream is Kafka 0.10 direct stream [id] (that you can use to differentiate between the different implementations for Kafka 0.10+ and older releases).

Tip
You can find the name of a input dstream in the Streaming tab in web UI (in the details of a batch in Input Metadata section).

It uses spark.streaming.kafka.maxRetries setting while computing latestLeaderOffsets (i.e. a mapping of kafka.common.TopicAndPartition and LeaderOffset).

Tip

Enable INFO logging level for org.apache.spark.streaming.kafka010.DirectKafkaInputDStream logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.kafka010.DirectKafkaInputDStream=INFO

Refer to Logging.

Creating DirectKafkaInputDStream Instance

You can create a DirectKafkaInputDStream instance using KafkaUtils.createDirectStream factory method.

import org.apache.spark.streaming.kafka010.KafkaUtils

// WARN: Incomplete to show only relevant parts
val dstream = KafkaUtils.createDirectStream[String, String](
  ssc = streamingContext,
  locationStrategy = hosts,
  consumerStrategy = ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))

Internally, when a DirectKafkaInputDStream instance is created, it initializes the internal executorKafkaParams using the input consumerStrategy's executorKafkaParams.

Tip
Use ConsumerStrategy for a Kafka Consumer configuration.

With WARN logging level enabled for the KafkaUtils logger, you may see the following WARN messages and one ERROR in the logs (the number of messages depends on how correct the Kafka Consumer configuration is):

WARN KafkaUtils: overriding enable.auto.commit to false for executor
WARN KafkaUtils: overriding auto.offset.reset to none for executor
ERROR KafkaUtils: group.id is null, you should probably set it
WARN KafkaUtils: overriding executor group.id to spark-executor-null
WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
Tip

You should always set group.id in Kafka parameters for DirectKafkaInputDStream.

It initializes the internal currentOffsets property.

It creates an instance of DirectKafkaInputDStreamCheckpointData as checkpointData.

It sets up rateController as DirectKafkaRateController when backpressure is enabled.

It sets up maxRateLimitPerPartition as spark.streaming.kafka.maxRatePerPartition.

It initializes commitQueue and commitCallback properties.

currentOffsets Property

currentOffsets: Map[TopicPartition, Long]

currentOffsets holds the latest (highest) available offsets for all the topic partitions the dstream is subscribed to (as set by latestOffsets and compute).

currentOffsets is initialized when DirectKafkaInputDStream is created afresh (it could also be re-created from a checkpoint).

The ConsumerStrategy (that was used to initialize DirectKafkaInputDStream) uses it to create a Kafka Consumer.

It is then set to the available offsets when DirectKafkaInputDStream is started.

commitCallback Property

commitCallback: AtomicReference[OffsetCommitCallback]

commitCallback is initialized when DirectKafkaInputDStream is created. It is set to a OffsetCommitCallback that is the input parameter of commitAsync when it is called (as part of the CanCommitOffsets contract that DirectKafkaInputDStream implements).

commitQueue Property

commitQueue: ConcurrentLinkedQueue[OffsetRange]

commitQueue is initialized when DirectKafkaInputDStream is created. It is used in commitAsync (that is part of the CanCommitOffsets contract that DirectKafkaInputDStream implements) to queue up offsets for commit to Kafka at a future time (i.e. when the internal commitAll is called).

executorKafkaParams method

Caution
FIXME

Starting DirectKafkaInputDStream — start method

start(): Unit

start creates a Kafka consumer and fetches available records in the subscribed list of topics and partitions (using Kafka’s Consumer.poll with 0 timeout that says to return immediately with any records that are available currently).

Note
start is part of the InputDStream Contract.

After the polling, start checks if the internal currentOffsets is empty, and if it is, it requests Kafka for topic (using Kafka’s Consumer.assignment) and builds a map with topics and their offsets (using Kafka’s Consumer.position).

Ultimately, start pauses all partitions (using Kafka’s Consumer.pause with the internal collection of topics and their current offsets).

Computing KafkaRDD for Batch Interval — compute method

compute(validTime: Time): Option[KafkaRDD[K, V]]
Note
compute is a part of the DStream Contract.

compute always computes a KafkaRDD (despite the return type that allows for no RDDs). It is that it is left to KafkaRDD itself to decide what to do when no Kafka records exist in topic partitions to process.

When compute is called, it calls latestOffsets and clamp. The result topic partition offsets are then mapped to OffsetRanges with a topic, a partition, and current offset for the given partition and the result offset. That in turn is used to create KafkaRDD (with the current SparkContext, executorKafkaParams, the OffsetRanges, preferred hosts, and useConsumerCache enabled).

Caution
FIXME We all would appreciate if Jacek made the above less technical.
Caution
FIXME What’s useConsumerCache?

With that, compute builds a StreamInputInfo that it then passes on to InputInfoTracker.reportInfo (removing empty offset ranges). The metadata for the StreamInputInfo holds information about the offsets, the number thereof and a human-friendly description.

At the end, compute sets the just-calculated offsets as current offsets, asynchronously commits all queued offsets (from commitQueue) and returns the newly-created KafkaRDD.

Committing Queued Offsets to Kafka — commitAll method

commitAll(): Unit

commitAll commits all queued OffsetRanges in commitQueue (using Kafka’s Consumer.commitAsync).

Note
commitAll is used for every batch interval (when compute is called to generate a KafkaRDD).

Internally, commitAll walks through OffsetRanges in commitQueue and calculates the offsets for every topic partition. It uses them to create a collection of Kafka’s TopicPartition and OffsetAndMetadata pairs for Kafka’s Consumer.commitAsync using the internal Kafka consumer reference.

clamp method

clamp(offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]

clamp calls maxMessagesPerPartition on the input offsets collection (of topic partitions with their offsets)…​

Caution
FIXME

maxMessagesPerPartition method

Caution
FIXME

Creating Kafka Consumer — consumer method

consumer(): Consumer[K, V]

consumer creates a Kafka Consumer with keys of type K and values of type V (specified when the DirectKafkaInputDStream is created).

consumer starts the ConsumerStrategy (that was used when the DirectKafkaInputDStream was created). It passes the internal collection of TopicPartitions and their offsets.

Caution
FIXME A note with What ConsumerStrategy is for?

Calculating Preferred Hosts Using LocationStrategy — getPreferredHosts method

getPreferredHosts: java.util.Map[TopicPartition, String]

getPreferredHosts calculates preferred hosts per topic partition (that are later used to map KafkaRDD partitions to host leaders of topic partitions that Spark executors read records from).

getPreferredHosts relies exclusively on the LocationStrategy that was passed in when creating a DirectKafkaInputDStream instance.

Table 1. DirectKafkaInputDStream.getPreferredHosts and Location Strategies
Location Strategy DirectKafkaInputDStream.getPreferredHosts

PreferBrokers

Calls Kafka broker(s) for topic partition assignments.

PreferConsistent

No host preference. Returns an empty collection of preferred hosts per topic partition.

It does not call Kafka broker(s) for topic assignments.

PreferFixed

Returns the preferred hosts that were passed in when PreferFixed was created.

It does not call Kafka broker(s) for topic assignments.

Note
getPreferredHosts is used when creating a KafkaRDD for a batch interval.

Requesting Partition Assignments from Kafka — getBrokers method

getBrokers: ju.Map[TopicPartition, String]

getBrokers uses the internal Kafka Consumer instance to request Kafka broker(s) for partition assignments, i.e. the leader host per topic partition.

Note
getBrokers uses Kafka’s Consumer.assignment().

Stopping DirectKafkaInputDStream — stop method

stop(): Unit

stop closes the internal Kafka consumer.

Note
stop is a part of the InputDStream Contract.

Requesting Latest Offsets from Kafka Brokers — latestOffsets method

latestOffsets(): Map[TopicPartition, Long]

latestOffsets uses the internal Kafka consumer to poll for the latest topic partition offsets, including partitions that have been added recently.

latestOffsets calculates the topic partitions that are new (comparing to current offsets) and adds them to currentOffsets.

Note
latestOffsets uses poll(0), assignment, position (twice for every TopicPartition), pause, seekToEnd method calls. They seem quite performance-heavy. Are they?

The new partitions are paused and the current offsets seekToEnded.

Caution
FIXME Why are new partitions paused? Make the description more user-friendly.
Note
latestOffsets is used when computing a KafkaRDD for batch intervals.

Back Pressure

Caution
FIXME

Back pressure for Direct Kafka input dstream can be configured using spark.streaming.backpressure.enabled setting.

Note
Back pressure is disabled by default.

results matching ""

    No results matching ""