第 9 章 需求6: 广告点击量实时统计

9.1 需求简介

每天各地区各城市各广告的点击流量实时统计


9.2 思路

结果保存在 redis 中, 值的类型为 hash.

key ->  "date:area:city:ads"    
kv -> field:   2018-11-26:华北:北京:5     value: 12001
rdd[AdsInfo] => rdd[date:area:city:ads, 1] => reduceByKey(_+_)

=> updaeStateByKey(...)

设置 checkpoint

9.3 具体的业务实现

package com.atgugu.sparkmall.realtime.app

import com.atgugu.sparkmall.realtime.bean.AdsInfo
import com.atguigu.sparkmall.common.util.RedisUtil
import org.apache.spark.SparkContext
import org.apache.spark.streaming.dstream.DStream
import redis.clients.jedis.Jedis

object AreaCityAdsPerDay {
    def statAreaCityAdsPerDay(filteredAdsInfoDStream: DStream[AdsInfo], sc: SparkContext) = {
        // 1. 统计数据

        sc.setCheckpointDir("./checkpoint")
        val resultDSteam: DStream[(String, Long)] = filteredAdsInfoDStream.map(adsInfo => {
            (s"${adsInfo.dayString}:${adsInfo.area}:${adsInfo.city}:${adsInfo.adsId}", 1L)
        }).reduceByKey(_ + _).updateStateByKey((seq: Seq[Long], opt:Option[Long]) => {
            Some(seq.sum + opt.getOrElse(0L))
        })

        // 2. 写入到 redis
        resultDSteam.foreachRDD(rdd => {
            val jedisClient: Jedis = RedisUtil.getJedisClient
            val totalCountArr: Array[(String, Long)] = rdd.collect
            totalCountArr.foreach {
                case (field, count) => jedisClient.hset("day:area:city:adsCount", field, count.toString)
            }
            jedisClient.close()
        })
    }
}

/*
1. 统计数据
    rdd[AdsInfo] => rdd[date:area:city:ads, 1] => reduceByKey(_+_)

    => updaeStateByKey(...)

    设置 checkpoint

2. 写入到 redis
*/

注意:

  • 如果checkpoint的目录想要设置在hdfs上, 则一定要在创建SparkConf前设置HADOOP_USER_HOME: System.setProperty("HADOOP_USER_NAME", "atguigu"). 否则会出现权限不足的情况.
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-20 02:00:56

results matching ""

    No results matching ""