15.3 Kafka 数据源
用法及说明
在工程中需要引入 Maven 依赖 spark-streaming-kafka_2.11
来使用它。
包内提供的 KafkaUtils
对象可以在 StreamingContext
和JavaStreamingContext
中以你的 Kafka 消息创建出 DStream
。
两个核心类:KafkaUtils、KafkaCluster
导入依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
代码
package com.atguigu.streaming.kafka
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object HighKafka {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")
val ssc = new StreamingContext(conf, Seconds(3))
// kafka 参数
//kafka参数声明
val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"
val topic = "first"
val group = "bigdata"
val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"
val kafkaParams = Map(
ConsumerConfig.GROUP_ID_CONFIG -> group,
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization
)
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, Set(topic))
dStream.print()
ssc.start()
ssc.awaitTermination()
}
}