第 5 章 需求2: Top10热门品类中每个品类的 Top10 活跃 Session 统计
5.1 需求分析
对于排名前 10 的品类,分别获取
这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。
这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。计算完成之后,将数据保存到 MySQL 数据库中。
5.2 思路
过滤出来 category Top10的日志
- 需要用到
需求1
的结果, 然后只需要得到categoryId
就可以了
- 需要用到
转换结果为
RDD[(categoryId, sessionId), 1]
然后统计数量=> RDD[(categoryId, sessionId), count]
统计每个品类 top10.
=> RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]
对每个
Iterable[(sessionId, count)]
进行排序, 并取每个Iterable
的前10把数据封装到
CategorySession
中写入到 mysql 数据库
首先创建 Mysql 表
-- ---------------------------- -- create table category_top10_session_count -- ---------------------------- CREATE TABLE `category_top10_session_count` ( `taskId` TEXT, `categoryId` TEXT, `sessionId` TEXT, `clickCount` BIGINT(20) DEFAULT NULL ) ENGINE=INNODB DEFAULT CHARSET=utf8
5.3 具体代码实现
1. bean
类
CategorySession
类
封装最终写入到数据库的数据
package com.atguigu.sparkmall.offline.bean
case class CategorySession(taskId: String,
categoryId: String,
sessionId: String,
clickCount: Long)
2. 业务实现类
入口的类的变更
OfflineApp
package com.atguigu.sparkmall.offline
import java.util.UUID
import com.alibaba.fastjson.JSON
import com.atguigu.sparkmall.common.bean.UserVisitAction
import com.atguigu.sparkmall.common.util.ConfigurationUtil
import com.atguigu.sparkmall.offline.app.{CategorySessionApp, CategoryTop10App}
import com.atguigu.sparkmall.offline.bean.Condition
import org.apache.spark.sql.SparkSession
object OfflineApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("OfflineApp")
.enableHiveSupport()
.config("spark.sql.warehouse.dir", "hdfs://hadoop201:9000/user/hive/warehouse")
.getOrCreate()
val taskId = UUID.randomUUID().toString
// 根据条件过滤取出需要的 RDD, 过滤条件定义在配置文件中
val userVisitActionRDD = readUserVisitActionRDD(spark, readConditions)
userVisitActionRDD.cache // 做缓存
println("任务1: 开始")
// 保存任务1的结果, 任务2 要用. 需要去 CategoryTop10App类中添加返回值
val categoryTop10 = CategoryTop10App.statCategoryTop10(spark, userVisitActionRDD, taskId) println("任务1: 结束")
println("任务2: 开始")
CategorySessionApp.statCategoryTop10Session(spark, categoryTop10, userVisitActionRDD, taskId)
println("任务2: 结束")
}
}
具体业务类
CategorySessionApp
package com.atguigu.sparkmall.offline.app
import com.atguigu.sparkmall.common.bean.UserVisitAction
import com.atguigu.sparkmall.common.util.JDBCUtil
import com.atguigu.sparkmall.offline.bean.{CategoryCountInfo, CategorySession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object CategorySessionApp {
def statCategoryTop10Session(spark: SparkSession, categoryTop10: List[CategoryCountInfo], userVisitActionRDD: RDD[UserVisitAction], taskId: String) = {
// 1. 过滤掉 category 不在前 10 的日志
// 1.1 得到 top10 的 categoryId
val categoryIdTop10 = categoryTop10.map(_.categoryId)
val categoryIdTop10BD = spark.sparkContext.broadcast(categoryIdTop10) // 广播变量
// 1.2 过滤出来categorytop10的日志
val filteredActionRDD = userVisitActionRDD.filter(info => categoryIdTop10BD.value.contains(info.click_category_id + ""))
// 2. 转换结果为 RDD[(categoryId, sessionId), 1] 然后统计数量
val categorySessionCountRDD = filteredActionRDD
.map(userAction => ((userAction.click_category_id, userAction.session_id), 1))
.reduceByKey(_ + _)
// 3. 统计每个品类top10. => RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]
val categorySessionGrouped = categorySessionCountRDD.map {
case ((cid, sid), count) => (cid, (sid, count))
}.groupByKey
// 4. 对每个 Iterable[(sessionId, count)]进行排序, 并取每个Iterable的前10
// 5. 把数据封装到 CategorySession 中
val sortedCategorySession = categorySessionGrouped.flatMap {
case (cid, it) => {
it.toList.sortBy(_._2)(Ordering.Int.reverse).take(10).map{
item => CategorySession(taskId, cid.toString, item._1, item._2)
}
}
}
// 6. 写入到 mysql 数据库
val categorySessionArr = sortedCategorySession.collect.map(item => Array(item.taskId, item.categoryId, item.sessionId, item.clickCount))
JDBCUtil.executeUpdate("truncate category_top10_session_count", null)
JDBCUtil.executeBatchUpdate("insert into category_top10_session_count values(?, ?, ?, ?)", categorySessionArr)
}
}