KafkaRDD

KafkaRDD class is a RDD of Kafka’s ConsumerRecords from topics in Apache Kafka. It has support for HasOffsetRanges.

Note
Kafka’s ConsumerRecord holds a topic name, a partition number, the offset of the record in the Kafka partition and the record itself (as a key-value pair).

It uses KafkaRDDPartition for partitions that know their preferred locations as the host of the topic (not port however!). It then nicely maps a RDD partition to a Kafka topic partition.

Note
KafkaRDD is a private[spark] class.

KafkaRDD overrides methods of RDD class to base them on offsetRanges, i.e. partitions.

You can create a KafkaRDD using KafkaUtils.createRDD or a dstream of KafkaRDD as DirectKafkaInputDStream using KafkaUtils.createDirectStream.

Tip

Enable INFO logging level for org.apache.spark.streaming.kafka.KafkaRDD logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.streaming.kafka.KafkaRDD=INFO

Refer to Logging.

Computing Partitions

To compute a partition, KafkaRDD, checks for validity of beginning and ending offsets (so they range over at least one element) and returns an (internal) KafkaRDDIterator.

You should see the following INFO message in the logs:

INFO KafkaRDD: Computing topic [topic], partition [partition] offsets [fromOffset] -> [toOffset]

It creates a new KafkaCluster every time it is called as well as kafka.serializer.Decoder for the key and the value (that come with a constructor that accepts kafka.utils.VerifiableProperties).

It fetches batches of kc.config.fetchMessageMaxBytes size per topic, partition, and offset (it uses kafka.consumer.SimpleConsumer.fetch(kafka.api.FetchRequest) method).

Caution
FIXME Review

results matching ""

    No results matching ""