第 2 章 最近 1 小时广告点击量实时统计
统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量
具体实现
import java.text.SimpleDateFormat
import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.uitl.RedisUtil
import org.apache.spark.streaming.{Minutes, Seconds}
import org.apache.spark.streaming.dstream.DStream
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis
/**
* Author lzc
* Date 2019/9/26 10:36 PM
*/
object LastHourAdsClickApp {
def statLastHourAdsClick(adsInfoDSteam: DStream[AdsInfo]) = {
// 统计最近一小时的数据(每分钟点击量), 每 5 秒统计一次
val windowDStream: DStream[AdsInfo] = adsInfoDSteam.window(Minutes(60), Seconds(5))
val groupAdsCountDStream: DStream[(String, Iterable[(String, Int)])] = windowDStream.map(adsInfo => {
((adsInfo.adsId, adsInfo.hmString), 1)
}).reduceByKey(_ + _).map {
case ((adsId, hourMinutes), count) => (adsId, (hourMinutes, count))
}.groupByKey
val jsonCountDStream: DStream[(String, String)] = groupAdsCountDStream.map {
case (adsId, it) => {
import org.json4s.JsonDSL._
val hourMinutesJson: String = JsonMethods.compact(JsonMethods.render(it))
(adsId, hourMinutesJson)
}
}
jsonCountDStream.foreachRDD(rdd => {
val result: Array[(String, String)] = rdd.collect
import collection.JavaConversions._
val client: Jedis = RedisUtil.getJedisClient
client.hmset("last_hour_ads_click", result.toMap)
client.close()
})
}
}
/*
最近一小时每分钟的点击量
*/