第 11 章 新增需求: 最近 1 小时广告点击量实时统计
统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量
11.1 目标数据字段
redis
保存为一条hash
结构的kv
field
保存广告Id
, value
保存分钟和每分钟点击数据的json
串
11.2 思路
窗口操作
Spark Streaming 提供了窗口计算,允许在数据的滑动窗口上应用转换,下图说明了这个滑动窗口:
如图所示,每当窗口滑过源 DStream 时,落在窗口内的源 RDD 被组合并运行,以产生窗口 DStream 的 RDD。
在这种具体情况下,操作应用于最近 3 个时间单位的数据,并以 2 个时间单位滑动。 这表明任何窗口操作都需要指定两个参数。
- 窗口长度 - 窗口的持续时间(此图中窗口长度为 3)。
- 滑动间隔 - 执行窗口操作的间隔(此图中滑动间隔为 2)。
这两个参数必须是源 DStream 的 batch 间隔的倍数(此图中 batch 间隔为 1)。 batch 间隔为切割 RDD 的间隔,滑动间隔为每隔多长时间来计算一次,窗口长度为每次计算的数据量是多少。
11.3 具体实现
package com.atgugu.sparkmall.realtime.app
import java.text.SimpleDateFormat
import com.atgugu.sparkmall.realtime.bean.AdsInfo
import com.atguigu.sparkmall.common.util.RedisUtil
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Minutes, Seconds}
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis
object LastHourAdsClickApp {
def statLastHourAdsClick(filteredDStream: DStream[AdsInfo]) = {
val windowDStream: DStream[AdsInfo] = filteredDStream.window(Minutes(60), Seconds(5))
val groupAdsCountDStream: DStream[(String, Iterable[(String, Int)])] = windowDStream.map(adsInfo => {
val houreMinutes = new SimpleDateFormat("HH:mm").format(adsInfo.ts)
((adsInfo.adsId, houreMinutes), 1)
}).reduceByKey(_ + _).map {
case ((adsId, hourMinutes), count) => (adsId, (hourMinutes, count))
}.groupByKey
val jsonCountDStream: DStream[(String, String)] = groupAdsCountDStream.map {
case (adsId, it) => {
import org.json4s.JsonDSL._
val hourMinutesJson: String = JsonMethods.compact(JsonMethods.render(it))
(adsId, hourMinutesJson)
}
}
jsonCountDStream.foreachRDD(rdd => {
val result: Array[(String, String)] = rdd.collect
import collection.JavaConversions._
val client: Jedis = RedisUtil.getJedisClient
client.hmset("last_hour_ads_click", result.toMap)
client.close()
})
}
}