第 1 章 每天每地区热门广告 Top3

由于涉及到 topN, 目前 Structure streaming 还不支持. 该需要使用 Spark Streaming 来完成

最终数据格式: 存储在 redis 中, 使用 hash 存储

MyKafkaUtil

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object MyKafkaUtil {

    // kafka消费者配置
    val kafkaParam = Map(
        "bootstrap.servers" -> "hadoop201:9092,hadoop202:9092,hadoop203:9092", //用于初始化链接到集群的地址
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        //用于标识这个消费者属于哪个消费团体
        "group.id" -> "commerce-consumer-group",
        //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
        //可以使用这个配置,latest自动重置偏移量为最新的偏移量
        "auto.offset.reset" -> "latest",
        //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
        //如果是false,会需要手动维护kafka偏移量. 本次我们仍然自动维护偏移量
        "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    /*
     创建DStream,返回接收到的输入数据
     LocationStrategies:根据给定的主题和集群地址创建consumer
     LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
     ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
     ConsumerStrategies.Subscribe:订阅一系列主题
     */

    def getDStream(ssc: StreamingContext, topic: String): InputDStream[ConsumerRecord[String, String]] = {
        KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent,  // 标配. 只要 kafka 和 spark 没有部署在一台设备就应该是这个参数
            ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
    }
}

入口: 从 kafka 中读取数据


import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date

import com.atguigu.realtime.app.AreaAdsClickTop3App
import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.uitl.MyKafkaUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Author lzc
  * Date 2019/9/26 8:03 PM
  */
object RealtimeApp {
    def main(args: Array[String]): Unit = {
        // 1. 创建 SparkConf 对象
        val conf: SparkConf = new SparkConf()
            .setAppName("RealTimeApp")
            .setMaster("local[*]")
        // 2. 创建 SparkContext 对象
        val sc = new SparkContext(conf)
        // 3. 创建 StreamingContext
        val ssc = new StreamingContext(sc, Seconds(5))
        ssc.checkpoint("./ck1")
        ssc.sparkContext.setLogLevel("warn")
        // 4. 得到 DStream
        val recordDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getDStream(ssc, "ads_log")

        // 从 kafka 读取数据, 为了方便后续处理, 封装数据到 AdsInfo 样例类中
        val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
        val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")
        // 5. 为了方便后面的计算, 把消费到的字符串封装到对象中
        val adsInfoDStream: DStream[AdsInfo] = recordDStream.map {
            record =>
                val split: Array[String] = record.value.split(",")
                val date: Date = new Date(split(0).toLong)
                AdsInfo(
                    split(0).toLong,
                    new Timestamp(split(0).toLong),
                    dayStringFormatter.format(date),
                    hmStringFormatter.format(date),
                    split(1),
                    split(2),
                    split(3),
                    split(4))

        }
        // 需求 1:  每天每地区广告 top3 广告
        AreaAdsClickTop3App.statAreaClickTop3(adsInfoDStream)
        ssc.start()
        ssc.awaitTermination()
    }
}

该需求具体实现


import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.uitl.RedisUtil
import org.apache.spark.streaming.dstream.DStream
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis

object AreaAdsClickTop3App {
    def statAreaClickTop3(adsInfoDStream: DStream[AdsInfo]) = {

        // 1. 每天每地区每广告的点击率
        val dayAreaCount: DStream[((String, String), (String, Int))] = adsInfoDStream
            .map(adsInfo => ((adsInfo.dayString, adsInfo.area, adsInfo.adsId), 1)) // ((天, 地区, 广告), 1)
            .updateStateByKey((seq: Seq[Int], option: Option[Int]) => Some(seq.sum + option.getOrElse(0))) // ((天, 地区, 广告), 1000)
            .map {
                case ((day, area, adsId), count) =>
                    ((day, area), (adsId, count))
            }

        // 2. 按照 (天, 地区) 分组, 然后每组内排序, 取 top3
        val dayAreaAdsClickTop3: DStream[(String, String, List[(String, Int)])] = dayAreaCount
            .groupByKey
            .map {
                case ((day, area), adsCountIt) =>
                    (day, area, adsCountIt.toList.sortBy(-_._2).take(3))

            }

        // 3. 写入到redis
        dayAreaAdsClickTop3.foreachRDD(rdd => {
            // 建立到 redis 的连接
            val jedisClient: Jedis = RedisUtil.getJedisClient
            val arr: Array[(String, String, List[(String, Int)])] = rdd.collect
            // 写到 redis:  key-> "area:das:top3:"+2019-09-25   value:
            //                                                  field               value
            //                                                  {东北:  {3: 1000, 2:800, 10:500}  }
            arr.foreach{
                case (day, area, adsIdCountList) => {
                    import org.json4s.JsonDSL._
                    // list 结合转成 json 字符串
                    val adsCountJsonString = JsonMethods.compact(JsonMethods.render(adsIdCountList))
                    jedisClient.hset(s"area:day:top3:$day", area, adsCountJsonString)
                }
            }

            jedisClient.close()
        })

    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""