14.1 wordcount 案例
需求
使用 netcat 工具向 9999 端口不断的发送数据,通过 Spark Streaming 读取端口数据并统计不同单词出现的次数
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.1</version>
</dependency>
编写代码
package day06
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object StreamingWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
// 1. 创建SparkStreaming的入口对象: StreamingContext 参数2: 表示事件间隔 内部会创建 SparkContext
val ssc = new StreamingContext(conf, Seconds(3))
// 2. 创建一个DStream
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop201", 9999)
// 3. 一个个的单词
val words: DStream[String] = lines.flatMap(_.split("""\s+"""))
// 4. 单词形成元组
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
// 5. 统计单词的个数
val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//6. 显示
println("aaa")
count.print
//7. 开始接受数据并计算
ssc.start()
//8. 等待计算结束(要么手动退出,要么出现异常)才退出主程序
ssc.awaitTermination()
}
}
测试
hadoop201
上启动 netcat
nc -lk 9999
可以打包到 linux 启动我们的 wordcount, 也可以在 idea 直接启动.
查看输出结果. 每 3 秒统计一次数据的输入情况.
注意:
- 日志太多, 可以把日志级别修改为
ERROR
几点需要注意的
一旦
StreamingContext
已经启动, 则不能再添加添加新的 streaming computations一旦一个
StreamingContext
已经停止(StreamingContext.stop()
), 他也不能再重启在一个 JVM 内, 同一时间只能启动一个
StreamingContext
stop()
的方式停止StreamingContext
, 也会把SparkContext
停掉. 如果仅仅想停止StreamingContext
, 则应该这样:stop(false)
一个
SparkContext
可以重用去创建多个StreamingContext
, 前提是以前的StreamingContext
已经停掉,并且SparkContext
没有被停掉