第 11 章 新增需求: 最近 1 小时广告点击量实时统计

统计各广告最近 1 小时内的点击量趋势:各广告最近 1 小时内各分钟的点击量

11.1 目标数据字段

redis 保存为一条hash结构的kv

field保存广告Idvalue保存分钟和每分钟点击数据的json

11.2 思路

窗口操作

Spark Streaming 提供了窗口计算,允许在数据的滑动窗口上应用转换,下图说明了这个滑动窗口:

如图所示,每当窗口滑过源 DStream 时,落在窗口内的源 RDD 被组合并运行,以产生窗口 DStream 的 RDD。

在这种具体情况下,操作应用于最近 3 个时间单位的数据,并以 2 个时间单位滑动。 这表明任何窗口操作都需要指定两个参数。

  • 窗口长度 - 窗口的持续时间(此图中窗口长度为 3)。
  • 滑动间隔 - 执行窗口操作的间隔(此图中滑动间隔为 2)。

这两个参数必须是源 DStream 的 batch 间隔的倍数(此图中 batch 间隔为 1)。 batch 间隔为切割 RDD 的间隔,滑动间隔为每隔多长时间来计算一次,窗口长度为每次计算的数据量是多少。

11.3 具体实现

package com.atgugu.sparkmall.realtime.app

import java.text.SimpleDateFormat

import com.atgugu.sparkmall.realtime.bean.AdsInfo
import com.atguigu.sparkmall.common.util.RedisUtil
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Minutes, Seconds}
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis

object LastHourAdsClickApp {
    def statLastHourAdsClick(filteredDStream: DStream[AdsInfo]) = {
        val windowDStream: DStream[AdsInfo] = filteredDStream.window(Minutes(60), Seconds(5))

        val groupAdsCountDStream: DStream[(String, Iterable[(String, Int)])] = windowDStream.map(adsInfo => {
            val houreMinutes = new SimpleDateFormat("HH:mm").format(adsInfo.ts)
            ((adsInfo.adsId, houreMinutes), 1)
        }).reduceByKey(_ + _).map {
            case ((adsId, hourMinutes), count) => (adsId, (hourMinutes, count))
        }.groupByKey

        val jsonCountDStream: DStream[(String, String)] = groupAdsCountDStream.map {
            case (adsId, it) => {
                import org.json4s.JsonDSL._
                val hourMinutesJson: String = JsonMethods.compact(JsonMethods.render(it))
                (adsId, hourMinutesJson)
            }
        }

        jsonCountDStream.foreachRDD(rdd => {
            val result: Array[(String, String)] = rdd.collect
            import collection.JavaConversions._
            val client: Jedis = RedisUtil.getJedisClient
            client.hmset("last_hour_ads_click", result.toMap)
            client.close()
        })
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-20 02:00:56

results matching ""

    No results matching ""