第 9 章 需求6: 广告点击量实时统计
9.1 需求简介
9.2 思路
结果保存在 redis 中, 值的类型为 hash.
key -> "date:area:city:ads"
kv -> field: 2018-11-26:华北:北京:5 value: 12001
rdd[AdsInfo] => rdd[date:area:city:ads, 1] => reduceByKey(_+_)
=> updaeStateByKey(...)
设置 checkpoint
9.3 具体的业务实现
package com.atgugu.sparkmall.realtime.app
import com.atgugu.sparkmall.realtime.bean.AdsInfo
import com.atguigu.sparkmall.common.util.RedisUtil
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.DStream
import redis.clients.jedis.Jedis
object AreaCityAdsPerDay {
def statAreaCityAdsPerDay(filteredAdsInfoDStream: DStream[AdsInfo], sc: SparkContext) = {
// 1. 统计数据
sc.setCheckpointDir("./checkpoint")
val resultDSteam: DStream[(String, Long)] = filteredAdsInfoDStream.map(adsInfo => {
(s"${adsInfo.dayString}:${adsInfo.area}:${adsInfo.city}:${adsInfo.adsId}", 1L)
}).reduceByKey(_ + _).updateStateByKey((seq: Seq[Long], opt:Option[Long]) => {
Some(seq.sum + opt.getOrElse(0L))
})
// 2. 写入到 redis
resultDSteam.foreachRDD(rdd => {
val jedisClient: Jedis = RedisUtil.getJedisClient
val totalCountArr: Array[(String, Long)] = rdd.collect
totalCountArr.foreach {
case (field, count) => jedisClient.hset("day:area:city:adsCount", field, count.toString)
}
jedisClient.close()
})
}
}
/*
1. 统计数据
rdd[AdsInfo] => rdd[date:area:city:ads, 1] => reduceByKey(_+_)
=> updaeStateByKey(...)
设置 checkpoint
2. 写入到 redis
*/
注意:
- 如果
checkpoint
的目录想要设置在hdfs
上, 则一定要在创建SparkConf
前设置HADOOP_USER_HOME
:System.setProperty("HADOOP_USER_NAME", "atguigu")
. 否则会出现权限不足的情况.