第 1 章 每天每地区热门广告 Top3
由于涉及到 topN, 目前 Structure streaming 还不支持. 该需要使用 Spark Streaming 来完成
最终数据格式: 存储在 redis 中, 使用 hash 存储
MyKafkaUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object MyKafkaUtil {
// kafka消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> "hadoop201:9092,hadoop202:9092,hadoop203:9092", //用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> "commerce-consumer-group",
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
//如果是false,会需要手动维护kafka偏移量. 本次我们仍然自动维护偏移量
"enable.auto.commit" -> (true: java.lang.Boolean)
)
/*
创建DStream,返回接收到的输入数据
LocationStrategies:根据给定的主题和集群地址创建consumer
LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
ConsumerStrategies.Subscribe:订阅一系列主题
*/
def getDStream(ssc: StreamingContext, topic: String): InputDStream[ConsumerRecord[String, String]] = {
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // 标配. 只要 kafka 和 spark 没有部署在一台设备就应该是这个参数
ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
}
}
入口: 从 kafka 中读取数据
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.realtime.app.AreaAdsClickTop3App
import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.uitl.MyKafkaUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author lzc
* Date 2019/9/26 8:03 PM
*/
object RealtimeApp {
def main(args: Array[String]): Unit = {
// 1. 创建 SparkConf 对象
val conf: SparkConf = new SparkConf()
.setAppName("RealTimeApp")
.setMaster("local[*]")
// 2. 创建 SparkContext 对象
val sc = new SparkContext(conf)
// 3. 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("./ck1")
ssc.sparkContext.setLogLevel("warn")
// 4. 得到 DStream
val recordDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getDStream(ssc, "ads_log")
// 从 kafka 读取数据, 为了方便后续处理, 封装数据到 AdsInfo 样例类中
val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")
// 5. 为了方便后面的计算, 把消费到的字符串封装到对象中
val adsInfoDStream: DStream[AdsInfo] = recordDStream.map {
record =>
val split: Array[String] = record.value.split(",")
val date: Date = new Date(split(0).toLong)
AdsInfo(
split(0).toLong,
new Timestamp(split(0).toLong),
dayStringFormatter.format(date),
hmStringFormatter.format(date),
split(1),
split(2),
split(3),
split(4))
}
// 需求 1: 每天每地区广告 top3 广告
AreaAdsClickTop3App.statAreaClickTop3(adsInfoDStream)
ssc.start()
ssc.awaitTermination()
}
}
该需求具体实现
import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.uitl.RedisUtil
import org.apache.spark.streaming.dstream.DStream
import org.json4s.jackson.JsonMethods
import redis.clients.jedis.Jedis
object AreaAdsClickTop3App {
def statAreaClickTop3(adsInfoDStream: DStream[AdsInfo]) = {
// 1. 每天每地区每广告的点击率
val dayAreaCount: DStream[((String, String), (String, Int))] = adsInfoDStream
.map(adsInfo => ((adsInfo.dayString, adsInfo.area, adsInfo.adsId), 1)) // ((天, 地区, 广告), 1)
.updateStateByKey((seq: Seq[Int], option: Option[Int]) => Some(seq.sum + option.getOrElse(0))) // ((天, 地区, 广告), 1000)
.map {
case ((day, area, adsId), count) =>
((day, area), (adsId, count))
}
// 2. 按照 (天, 地区) 分组, 然后每组内排序, 取 top3
val dayAreaAdsClickTop3: DStream[(String, String, List[(String, Int)])] = dayAreaCount
.groupByKey
.map {
case ((day, area), adsCountIt) =>
(day, area, adsCountIt.toList.sortBy(-_._2).take(3))
}
// 3. 写入到redis
dayAreaAdsClickTop3.foreachRDD(rdd => {
// 建立到 redis 的连接
val jedisClient: Jedis = RedisUtil.getJedisClient
val arr: Array[(String, String, List[(String, Int)])] = rdd.collect
// 写到 redis: key-> "area:das:top3:"+2019-09-25 value:
// field value
// {东北: {3: 1000, 2:800, 10:500} }
arr.foreach{
case (day, area, adsIdCountList) => {
import org.json4s.JsonDSL._
// list 结合转成 json 字符串
val adsCountJsonString = JsonMethods.compact(JsonMethods.render(adsIdCountList))
jedisClient.hset(s"area:day:top3:$day", area, adsCountJsonString)
}
}
jedisClient.close()
})
}
}