第 2 章 最近 1 小时广告点击量实时统计

统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量

具体实现


import java.text.SimpleDateFormat

import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.uitl.RedisUtil
import org.apache.spark.streaming.{Minutes, Seconds}
import org.apache.spark.streaming.dstream.DStream
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis

/**
  * Author lzc
  * Date 2019/9/26 10:36 PM
  */
object LastHourAdsClickApp {
    def statLastHourAdsClick(adsInfoDSteam: DStream[AdsInfo]) = {
        // 统计最近一小时的数据(每分钟点击量), 每 5 秒统计一次
        val windowDStream: DStream[AdsInfo] = adsInfoDSteam.window(Minutes(60), Seconds(5))

        val groupAdsCountDStream: DStream[(String, Iterable[(String, Int)])] = windowDStream.map(adsInfo => {
            ((adsInfo.adsId, adsInfo.hmString), 1)
        }).reduceByKey(_ + _).map {
            case ((adsId, hourMinutes), count) => (adsId, (hourMinutes, count))
        }.groupByKey

        val jsonCountDStream: DStream[(String, String)] = groupAdsCountDStream.map {
            case (adsId, it) => {
                import org.json4s.JsonDSL._
                val hourMinutesJson: String = JsonMethods.compact(JsonMethods.render(it))
                (adsId, hourMinutesJson)
            }
        }

        jsonCountDStream.foreachRDD(rdd => {
            val result: Array[(String, String)] = rdd.collect
            import collection.JavaConversions._
            val client: Jedis = RedisUtil.getJedisClient
            client.hmset("last_hour_ads_click", result.toMap)
            client.close()
        })
    }
}
/*
最近一小时每分钟的点击量



 */
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""