4.3 Kafka source
参考文档: http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
导入依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>2.4.3</version>
</dependency
4.3.1 以 Streaming 模式创建 Kafka 工作流
package com.atguigu.ss
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* Author lzc
* Date 2019/8/13 10:23 AM
*/
object KafkaSourceDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("KafkaSourceDemo")
.getOrCreate()
// 得到的 df 的 schema 是固定的: key,value,topic,partition,offset,timestamp,timestampType
val df: DataFrame = spark.readStream
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "topic1") // 也可以订阅多个主题: "topic1,topic2"
.load
df.writeStream
.outputMode("update")
.format("console")
.trigger(Trigger.Continuous(1000))
.start
.awaitTermination()
}
}
package com.atguigu.ss
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Author lzc
* Date 2019/8/13 10:23 AM
*/
object KafkaSourceDemo2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("KafkaSourceDemo")
.getOrCreate()
import spark.implicits._
// 得到的 df 的 schema 是固定的: key,value,topic,partition,offset,timestamp,timestampType
val lines: Dataset[String] = spark.readStream
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "topic1") // 也可以订阅多个主题: "topic1,topic2"
.load
.selectExpr("CAST(value AS 26)")
.as[String]
val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()
query.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", "./ck1") // 下次启动的时候, 可以从上次的位置开始读取
.start
.awaitTermination()
}
}
4.3.2 通过 Batch 模式创建 Kafka 工作流
这种模式一般需要设置消费的其实偏移量和结束偏移量, 如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest.
该模式为一次性作业(批处理), 而非持续性的处理数据.
package com.atguigu.ss
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
/**
* Author lzc
* Date 2019/8/13 10:23 AM
*/
object KafkaSourceDemo3 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("KafkaSourceDemo")
.getOrCreate()
import spark.implicits._
val lines: Dataset[String] = spark.read // 使用 read 方法,而不是 readStream 方法
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load
.selectExpr("CAST(value AS STRING)")
.as[String]
val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()
query.write // 使用 write 而不是 writeStream
.format("console")
.save()
}
}