第 2 章 需求 1: Top10 热门品类
2.1 需求 1 简介
品类是指的产品的的分类, 一些电商品类分多级, 咱们的项目中品类类只有一级. 不同的公司可能对热门的定义不一样. 我们按照每个品类的 点击、下单、支付 的量来统计热门品类.
2.2 需求1 思路
2.2.1 思路 1
分别统计每个品类点击的次数, 下单的次数和支付的次数.
缺点: 统计 3 次, 需要启动 3 个 job, 每个 job 都有对原始数据遍历一次, 非常好使
2.2.2 思路 2
最好的办法应该是遍历一次能够计算出来上述的 3 个指标.
使用累加器可以达成我们的需求.
遍历全部日志数据, 根据品类 id 和操作类型分别累加. 需要用到累加器
- 定义累加器
- 当碰到订单和支付业务的时候注意拆分字段才能得到品类 id
遍历完成之后就得到每个每个品类 id 和操作类型的数量.
按照点击下单支付的顺序来排序
取出 Top10
2.3 需求 1 具体实现
1. 用来封装用户行为的bean
类
/**
* 用户访问动作表
*
* @param date 用户点击行为的日期
* @param user_id 用户的ID
* @param session_id Session的ID
* @param page_id 某个页面的ID
* @param action_time 动作的时间点
* @param search_keyword 用户搜索的关键词
* @param click_category_id 某一个商品品类的ID
* @param click_product_id 某一个商品的ID
* @param order_category_ids 一次订单中所有品类的ID集合
* @param order_product_ids 一次订单中所有商品的ID集合
* @param pay_category_ids 一次支付中所有品类的ID集合
* @param pay_product_ids 一次支付中所有商品的ID集合
* @param city_id 城市 id
*/
case class UserVisitAction(date: String,
user_id: Long,
session_id: String,
page_id: Long,
action_time: String,
search_keyword: String,
click_category_id: Long,
click_product_id: Long,
order_category_ids: String,
order_product_ids: String,
pay_category_ids: String,
pay_product_ids: String,
city_id: Long)
case class CategoryCountInfo(categoryId: String,
clickCount: Long,
orderCount: Long,
payCount: Long)
2. 定义用到的累加器
需要统计每个品类的点击量, 下单量和支付量, 所以我们在累加器中使用 Map 来存储这些数据: Map(cid, "click"-> 100, cid, "order"-> 50, ....)
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable
class MapAccumulator extends AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] {
val map: mutable.Map[(String, String), Long] = mutable.Map[(String, String), Long]()
override def isZero: Boolean = map.isEmpty
override def copy(): AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] = {
val newAcc = new MapAccumulator
map.synchronized {
newAcc.map ++= map
}
newAcc
}
override def reset(): Unit = map.clear
override def add(v: (String, String)): Unit = {
map(v) = map.getOrElseUpdate(v, 0) + 1
}
// otherMap: (1, click) -> 20 this: (1, click) -> 10 thisMap: (1,2) -> 30
// otherMap: (1, order) -> 5 thisMap: (1,3) -> 5
override def merge(other: AccumulatorV2[(String, String), mutable.Map[(String, String), Long]]): Unit = {
val otherMap: mutable.Map[(String, String), Long] = other.value
otherMap.foreach {
kv => map.put(kv._1, map.getOrElse(kv._1, 0L) + kv._2)
}
}
override def value: mutable.Map[(String, String), Long] = map
}
3. 具体实现
整体入口
import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
* Author lzc
* Date 2019/8/9 10:56 AM
*/
object PracticeApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("Practice").setMaster("local[2]")
val sc = new SparkContext(conf)
// 1. 读取文件中的数据
val lineRDD: RDD[String] = sc.textFile("/Users/lzc/Desktop/user_visit_action.txt")
// 2. 类型调整
val userVisitActionRDD: RDD[UserVisitAction] = lineRDD.map(line => {
val splits: Array[String] = line.split("_")
UserVisitAction(
splits(0),
splits(1).toLong,
splits(2),
splits(3).toLong,
splits(4),
splits(5),
splits(6).toLong,
splits(7).toLong,
splits(8),
splits(9),
splits(10),
splits(11),
splits(12).toLong)
})
// 需求 1
val categoryTop10: List[CategoryCountInfo] = CategoryTop10App.statCategoryTop10(sc, userVisitActionRDD)
println(CategoryCountInfoList)
sc.stop()
}
}
需求 1 的具体实现
import com.atguigu.practice.app.acc.MapAccumulator
import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import scala.collection.mutable
object CategoryTop10App {
def statCategoryTop10(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction]): List[CategoryCountInfo] = {
// 1. 注册累加器
val acc = new MapAccumulator
sc.register(acc, "CategoryActionAcc")
// 2. 遍历日志
userVisitActionRDD.foreach {
visitAction => {
if (visitAction.click_category_id != -1) {
acc.add((visitAction.click_category_id.toString, "click"))
} else if (visitAction.order_category_ids != "null") {
visitAction.order_category_ids.split(",").foreach {
oid => acc.add((oid, "order"))
}
} else if (visitAction.pay_category_ids != "null") {
visitAction.pay_category_ids.split(",").foreach {
pid => acc.add((pid, "pay"))
}
}
}
}
// 3. 遍历完成之后就得到每个每个品类 id 和操作类型的数量. 然后按照 CategoryId 进行进行分组
val actionCountByCategoryIdMap: Map[String, mutable.Map[(String, String), Long]] = acc.value.groupBy(_._1._1)
// 4. 转换成 CategoryCountInfo 类型的集合, 方便后续处理
val categoryCountInfoList: List[CategoryCountInfo] = actionCountByCategoryIdMap.map {
case (cid, actionMap) => CategoryCountInfo(
cid,
actionMap.getOrElse((cid, "click"), 0),
actionMap.getOrElse((cid, "order"), 0),
actionMap.getOrElse((cid, "pay"), 0)
)
}.toList
// 5. 按照 点击 下单 支付 的顺序降序来排序
val sortedCategoryInfoList: List[CategoryCountInfo] = categoryCountInfoList.sortBy(info => (info.clickCount, info.orderCount, info.payCount))(Ordering.Tuple3(Ordering.Long.reverse, Ordering.Long.reverse, Ordering.Long.reverse))
// 6. 截取前 10
val top10: List[CategoryCountInfo] = sortedCategoryInfoList.take(10)
// 7. 返回 top10 品类 id
top10
}
}