6.2.4 memory sink

该 sink 也是用于测试, 将其统计结果全部输入内中指定的表中, 然后可以通过 sql 与从表中查询数据.

如果数据量非常大, 可能会发送内存溢出.

package com.atguigu.ss

import java.util.{Timer, TimerTask}

import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * Author lzc
  * Date 2019/8/14 7:39 PM
  */
object MemorySink {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[2]")
            .appName("MemorySink")
            .getOrCreate()
        import spark.implicits._

        val lines: DataFrame = spark.readStream
            .format("socket") // 设置数据源
            .option("host", "localhost")
            .option("port", 10000)
            .load

        val words: DataFrame = lines.as[String]
            .flatMap(_.split("\\W+"))
            .groupBy("value")
            .count()

        val query: StreamingQuery = words.writeStream
            .outputMode("complete")
            .format("memory") // memory sink
            .queryName("word_count") // 内存临时表名
            .start

        // 测试使用定时器执行查询表
        val timer = new Timer(true)
        val task: TimerTask = new TimerTask {
            override def run(): Unit = spark.sql("select * from word_count").show
        }
        timer.scheduleAtFixedRate(task, 0, 2000)

        query.awaitTermination()

    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""