6.2.6 ForeachBatch Sink

ForeachBatch Sink 是 spark 2.4 才新增的功能, 该功能只能用于输出批处理的数据.

将统计结果同时输出到本地文件和 mysql 中

package com.atguigu.ss

import java.util.Properties

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/14 7:39 PM
  */
object ForeachBatchSink {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[2]")
            .appName("ForeachBatchSink")
            .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "hadoop201")
            .option("port", 10000)
            .load

        val wordCount: DataFrame = lines.as[String]
            .flatMap(_.split("\\W+"))
            .groupBy("value")
            .count()

        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "aaa")
        val query: StreamingQuery = wordCount.writeStream
            .outputMode("complete")
            .foreachBatch((df, batchId) => {  // 当前分区id, 当前批次id
                if (df.count() != 0) {
                    df.cache()
                    df.write.json(s"./$batchId")
                    df.write.mode("overwrite").jdbc("jdbc:mysql://hadoop201:3306/ss", "word_count", props)
                }
            })
            .start()


        query.awaitTermination()

    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""