15.2 自定义数据源
使用及说明
其实就是自定义接收器
需要继承Receiver
,并实现onStart
、onStop
方法来自定义数据源采集。
案例实操
需求:
自定义数据源,实现监控某个端口号,获取该端口号内容。
代码
自定义数据源
object MySource{
def apply(host: String, port: Int): MySource = new MySource(host, port)
}
class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){
/*
接收器启动的时候调用该方法. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data.
这个函数内部必须初始化一些读取数据必须的资源
该方法不能阻塞, 所以 读取数据要在一个新的线程中进行.
*/
override def onStart(): Unit = {
// 启动一个新的线程来接收数据
new Thread("Socket Receiver"){
override def run(): Unit = {
receive()
}
}.start()
}
// 此方法用来接收数据
def receive()={
val socket = new Socket(host, port)
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
var line: String = null
// 当 receiver没有关闭, 且reader读取到了数据则循环发送给spark
while (!isStopped && (line = reader.readLine()) != null ){
// 发送给spark
store(line)
}
// 循环结束, 则关闭资源
reader.close()
socket.close()
// 重启任务
restart("Trying to connect again")
}
override def onStop(): Unit = {
}
}
使用自定义数据源
object MySourceDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")
// 1. 创建SparkStreaming的入口对象: StreamingContext 参数2: 表示事件间隔
val ssc = new StreamingContext(conf, Seconds(5))
// 2. 创建一个DStream
val lines: ReceiverInputDStream[String] = ssc.receiverStream[String](MySource("hadoop201", 9999))
// 3. 一个个的单词
val words: DStream[String] = lines.flatMap(_.split("""\s+"""))
// 4. 单词形成元组
val wordAndOne: DStream[(String, Int)] = words.map((_, 1))
// 5. 统计单词的个数
val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)
//6. 显示
count.print
//7. 启动流式任务开始计算
ssc.start()
//8. 等待计算结束才退出主程序
ssc.awaitTermination()
ssc.stop(false)
}
}
开启端口
nc -lk 10000