4.3 Kafka source

参考文档: http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

导入依赖:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
    <version>2.4.3</version>
</dependency

4.3.1 以 Streaming 模式创建 Kafka 工作流

package com.atguigu.ss

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 10:23 AM
  */
object KafkaSourceDemo {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("KafkaSourceDemo")
            .getOrCreate()

        // 得到的 df 的 schema 是固定的: key,value,topic,partition,offset,timestamp,timestampType
        val df: DataFrame = spark.readStream
            .format("kafka") // 设置 kafka 数据源
            .option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
            .option("subscribe", "topic1") // 也可以订阅多个主题:   "topic1,topic2"
            .load


        df.writeStream
            .outputMode("update")
            .format("console")
            .trigger(Trigger.Continuous(1000))
            .start
            .awaitTermination()


    }
}

package com.atguigu.ss

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 10:23 AM
  */
object KafkaSourceDemo2 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("KafkaSourceDemo")
            .getOrCreate()
        import spark.implicits._
        // 得到的 df 的 schema 是固定的: key,value,topic,partition,offset,timestamp,timestampType
        val lines: Dataset[String] = spark.readStream
            .format("kafka") // 设置 kafka 数据源
            .option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
            .option("subscribe", "topic1") // 也可以订阅多个主题:   "topic1,topic2"
            .load
            .selectExpr("CAST(value AS 26)")
            .as[String]
        val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()
        query.writeStream
            .outputMode("complete")
            .format("console")
            .option("checkpointLocation", "./ck1")  // 下次启动的时候, 可以从上次的位置开始读取
            .start
            .awaitTermination()
    }
}

4.3.2 通过 Batch 模式创建 Kafka 工作流

这种模式一般需要设置消费的其实偏移量和结束偏移量, 如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest.

该模式为一次性作业(批处理), 而非持续性的处理数据.

package com.atguigu.ss

import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**
  * Author lzc
  * Date 2019/8/13 10:23 AM
  */
object KafkaSourceDemo3 {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("KafkaSourceDemo")
            .getOrCreate()
        import spark.implicits._

        val lines: Dataset[String] = spark.read  // 使用 read 方法,而不是 readStream 方法
            .format("kafka") // 设置 kafka 数据源
            .option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
            .option("subscribe", "topic1")
            .option("startingOffsets", "earliest")
            .option("endingOffsets", "latest")
            .load
            .selectExpr("CAST(value AS STRING)")
            .as[String]

        val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()

        query.write   // 使用 write 而不是 writeStream
            .format("console")
            .save()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""