第 3 章 需求 2: Top10热门品类中每个品类的 Top10 活跃 Session 统计
3.1 需求分析
对于排名前 10 的品类,分别获取
这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。
这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。
3.2 思路
过滤出来 category Top10的日志
- 需要用到
需求1
的结果, 然后只需要得到categoryId
就可以了
- 需要用到
转换结果为
RDD[(categoryId, sessionId), 1]
然后统计数量=> RDD[(categoryId, sessionId), count]
统计每个品类 top10.
=> RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]
对每个
Iterable[(sessionId, count)]
进行排序, 并取每个Iterable
的前10把数据封装到
CategorySession
中
3.3 具体代码实现
1. bean
类
CategorySession
类
封装最终写入到数据库的数据
case class CategorySession(categoryId: String,
sessionId: String,
clickCount: Long)
2. 具体实现
package com.atguigu.practice.app
import com.atguigu.practice.app.bean.{CategoryCountInfo, CategorySession, UserVisitAction}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
* Author lzc
* Date 2019/8/9 10:49 AM
*/
object CategorySessionApp {
def statCategoryTop10Session(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction], categoryTop10: List[CategoryCountInfo]) = {
// 1. 得到top10的品类的id
val categoryIdTop10: List[String] = categoryTop10.map(_.categoryId)
// 2. 过去出来只包含 top10 品类id的那些用户行为
val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(UserVisitAction => {
categoryIdTop10.contains(UserVisitAction.click_category_id.toString)
})
// 3. 聚合操作
// => RDD[(品类id, sessionId))] map
// => RDD[(品类id, sessionId), 1)]
val categorySessionOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD
.map(userVisitAction => ((userVisitAction.click_category_id, userVisitAction.session_id), 1))
// RDD[(品类id, sessionId), count)]
val categorySessionCount: RDD[(Long, (String, Int))] =
categorySessionOne.reduceByKey(_ + _).map {
case ((cid, sid), count) => (cid, (sid, count))
}
// 4. 按照品类 id 进行分组
// RDD[品类id, Iterator[(sessionId, count)]]
val categorySessionCountGrouped: RDD[(Long, Iterable[(String, Int)])] = categorySessionCount.groupByKey
// 5. 排序取前 10
val categorySessionRDD: RDD[CategorySession] = categorySessionCountGrouped.flatMap {
case (cid, it) => {
val list: List[(String, Int)] = it.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
val result: List[CategorySession] = list.map {
case (sid, count) => CategorySession(cid.toString, sid, count)
}
result
}
}
categorySessionRDD.collect.foreach(println)
}
}
/*
1. 得到top10的品类的id
2. 过去出来只包含 top10 品类id的那些用户行为
3. 分组计算
=> RDD[(品类id, sessionId))] map
=> RDD[(品类id, sessionId), 1)] reduceByKey
=> RDD[(品类id, sessionId), count)] map
=> RDD[品类id, (sessionId, count)] groupByKey
RDD[品类id, Iterator[(sessionId, count)]]
*/
3. 前面的实现存在的问题
下面的代码可能存在的问题:
// 5. 排序取前 10
val categorySessionRDD: RDD[CategorySession] = categorySessionCountGrouped.flatMap {
case (cid, it) => {
val list: List[(String, Int)] = it.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
val result: List[CategorySession] = list.map {
case (sid, count) => CategorySession(cid.toString, sid, count)
}
result
}
}
上面的操作中, 有一个操作是把迭代器中的数据转换成List
之后再进行排序, 这里存在内存溢出的可能. 如果迭代器的数据足够大, 当转变成 List 的时候, 会把这个迭代器的所有数据都加载到内存中, 所以有可能造成内存的溢出.
前面的排序是使用的 Scala 的排序操作, 由于 scala 排序的时候需要把数据全部加载到内存中才能完成排序, 所以理论上都存在内存溢出的风险.
如果使用 RDD 提供的排序功能, 可以避免内存溢出的风险, 因为 RDD 的排序需要 shuffle, 是采用了内存+磁盘来完成的排序.
4. 解决方案 1
使用 RDD 的排序功能, 但是由于 RDD 排序是对所有的数据整体排序, 所以一次只能针对一个 CategoryId 进行排序操作.
参考下面的代码:
def statCategoryTop10Session_1(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction], categoryTop10: List[CategoryCountInfo]) = {
// 1. 得到top10的品类的id
val categoryIdTop10: List[String] = categoryTop10.map(_.categoryId)
// 2. 过去出来只包含 top10 品类id的那些用户行为
val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(UserVisitAction => {
categoryIdTop10.contains(UserVisitAction.click_category_id.toString)
})
// 3. 聚合操作
// => RDD[(品类id, sessionId))] map
// => RDD[(品类id, sessionId), 1)]
val categorySessionOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD
.map(userVisitAction => ((userVisitAction.click_category_id, userVisitAction.session_id), 1))
// RDD[(品类id, sessionId), count)]
val categorySessionCount: RDD[(Long, (String, Int))] =
categorySessionOne.reduceByKey(_ + _).map {
case ((cid, sid), count) => (cid, (sid, count))
}
// 4. 每个品类 id 排序取前 10的 session
categoryIdTop10.foreach(cid => {
// 针对某个具体的 CategoryId, 过滤出来只只包含这个CategoryId的 RDD, 然后整体j降序p排列
val top10: Array[CategorySession] = categorySessionCount
.filter(_._1 == cid.toLong)
.sortBy(_._2._2, ascending = false)
.take(10)
.map {
case (cid, (sid, count)) => CategorySession(cid.toString, sid, count)
}
top10.foreach(println)
})
}
5. 解决方案 2
方案 1 解决了内存溢出的问题, 但是也有另外的问题: 提交的 job 比较多, 有一个品类 id 就有一个 job, 在本案例中就有了 10 个 job.
有没有更加好的方案呢?
可以把同一个品类的数据都进入到同一个分区内, 然后对每个分区的数据进行排序!
需要用到自定义分区器.
自定义分区器
class MyPartitioner(categoryIdTop10: List[String]) extends Partitioner {
// 给每个 cid 配一个分区号(使用他们的索引就行了)
private val cidAndIndex: Map[String, Int] = categoryIdTop10.zipWithIndex.toMap
override def numPartitions: Int = categoryIdTop10.size
override def getPartition(key: Any): Int = {
key match {
case (cid: Long, _) => cidAndIndex(cid.toString)
}
}
}
CategorySession
修改
case class CategorySession(categoryId: String,
sessionId: String,
clickCount: Long) extends Ordered[CategorySession] {
override def compare(that: CategorySession): Int = {
if (this.clickCount <= that.clickCount) 1
else -1
}
}
具体方法
def statCategoryTop10Session_2(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction], categoryTop10: List[CategoryCountInfo]) = {
// 1. 得到top10的品类的id
val categoryIdTop10: List[String] = categoryTop10.map(_.categoryId)
// 2. 过去出来只包含 top10 品类id的那些用户行为
val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(UserVisitAction => {
categoryIdTop10.contains(UserVisitAction.click_category_id.toString)
})
// 3. 聚合操作
// => RDD[(品类id, sessionId))] map
// => RDD[(品类id, sessionId), 1)]
val categorySessionOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD
.map(userVisitAction => ((userVisitAction.click_category_id, userVisitAction.session_id), 1))
// RDD[(品类id, sessionId), count)] 在 reduceByKey 的时候指定分区器
val categorySessionCount: RDD[CategorySession] = categorySessionOne
.reduceByKey(new MyPartitioner(categoryIdTop10), _ + _) // 指定分区器 (相比以前有变化)
.map {
case ((cid, sid), count) => CategorySession(cid.toString, sid, count)
}
// 4. 对每个分区内的数据排序取前 10(相比以前有变化)
val categorySessionRDD: RDD[CategorySession] = categorySessionCount.mapPartitions(it => {
// 这个时候也不要把 it 变化 list 之后再排序, 否则仍然会有可能出现内存溢出.
// 我们可以把数据存储到能够自动排序的集合中 比如 TreeSet 或者 TreeMap 中, 并且永远保持这个集合的长度为 10
// 让TreeSet默认安装 count 的降序排列, 需要让CategorySession现在 Ordered 接口(Comparator)
var top10: mutable.TreeSet[CategorySession] = mutable.TreeSet[CategorySession]()
it.foreach(cs => {
top10 += cs // 把 CategorySession 添加到 TreeSet 中
if (top10.size > 10) { // 如果 TreeSet 的长度超过 10, 则移除最后一个
top10 = top10.take(10)
}
})
top10.toIterator
})
categorySessionRDD.collect.foreach(println)
}