14.1 wordcount 案例

需求

使用 netcat 工具向 9999 端口不断的发送数据,通过 Spark Streaming 读取端口数据并统计不同单词出现的次数

添加依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

编写代码

package day06

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object StreamingWordCount {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
        // 1. 创建SparkStreaming的入口对象: StreamingContext  参数2: 表示事件间隔   内部会创建 SparkContext
        val ssc = new StreamingContext(conf, Seconds(3))
        // 2. 创建一个DStream
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop201", 9999)
        // 3. 一个个的单词
        val words: DStream[String] = lines.flatMap(_.split("""\s+"""))
        // 4. 单词形成元组
        val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
        // 5. 统计单词的个数
        val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
        //6. 显示
        println("aaa")
        count.print
        //7. 开始接受数据并计算
        ssc.start()
        //8. 等待计算结束(要么手动退出,要么出现异常)才退出主程序
        ssc.awaitTermination()
    }
}

测试

  1. hadoop201上启动 netcat
nc -lk 9999
  1. 可以打包到 linux 启动我们的 wordcount, 也可以在 idea 直接启动.

  2. 查看输出结果. 每 3 秒统计一次数据的输入情况.

注意:

  • 日志太多, 可以把日志级别修改为ERROR

几点需要注意的

  • 一旦StreamingContext已经启动, 则不能再添加添加新的 streaming computations

  • 一旦一个StreamingContext已经停止(StreamingContext.stop()), 他也不能再重启

  • 在一个 JVM 内, 同一时间只能启动一个StreamingContext

  • stop() 的方式停止StreamingContext, 也会把SparkContext停掉. 如果仅仅想停止StreamingContext, 则应该这样: stop(false)

  • 一个SparkContext可以重用去创建多个StreamingContext, 前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-09 00:21:43

results matching ""

    No results matching ""