import org.apache.spark.streaming.kafka010.KafkaUtils
KafkaUtils — Creating Kafka DStreams and RDDs
KafkaUtils
is the object with the factory methods to create dstreams and RDDs from messages in Apache Kafka.
Tip
|
Enable Add the following line to
Refer to Logging. |
Creating Kafka DStream — createDirectStream
Method
createDirectStream[K, V](
ssc: StreamingContext,
locationStrategy: LocationStrategy,
consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]]
createDirectStream
is a method that creates a DirectKafkaInputDStream from a StreamingContext, LocationStrategy, and ConsumerStrategy.
Tip
|
Enable The following DEBUGs are from when a
Add the following line to
Refer to Logging. |
Using KafkaUtils.createDirectStream to Connect to Kafka Brokers
// Include org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT dependency in the CLASSPATH, e.g.
// $ ./bin/spark-shell --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0-SNAPSHOT
import org.apache.spark.streaming._
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate
val ssc = new StreamingContext(sc, Seconds(5))
import org.apache.spark.streaming.kafka010._
val preferredHosts = LocationStrategies.PreferConsistent
val topics = List("topic1", "topic2", "topic3")
import org.apache.kafka.common.serialization.StringDeserializer
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "spark-streaming-notes",
"auto.offset.reset" -> "earliest"
)
import org.apache.kafka.common.TopicPartition
val offsets = Map(new TopicPartition("topic3", 0) -> 2L)
val dstream = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))
dstream.foreachRDD { rdd =>
// Get the offset ranges in the RDD
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to ${o.untilOffset}")
}
}
ssc.start
// the above code is printing out topic details every 5 seconds
// until you stop it.
ssc.stop(stopSparkContext = false)