第 2 章 需求 1: Top10 热门品类

2.1 需求 1 简介

品类是指的产品的的分类, 一些电商品类分多级, 咱们的项目中品类类只有一级. 不同的公司可能对热门的定义不一样. 我们按照每个品类的 点击、下单、支付 的量来统计热门品类.

2.2 需求1 思路

2.2.1 思路 1

分别统计每个品类点击的次数, 下单的次数和支付的次数.

缺点: 统计 3 次, 需要启动 3 个 job, 每个 job 都有对原始数据遍历一次, 非常好使

2.2.2 思路 2

最好的办法应该是遍历一次能够计算出来上述的 3 个指标.

使用累加器可以达成我们的需求.

  1. 遍历全部日志数据, 根据品类 id 和操作类型分别累加. 需要用到累加器

    • 定义累加器
    • 当碰到订单和支付业务的时候注意拆分字段才能得到品类 id
  2. 遍历完成之后就得到每个每个品类 id 和操作类型的数量.

  3. 按照点击下单支付的顺序来排序

  4. 取出 Top10

2.3 需求 1 具体实现

1. 用来封装用户行为的bean

/**
  * 用户访问动作表
  *
  * @param date               用户点击行为的日期
  * @param user_id            用户的ID
  * @param session_id         Session的ID
  * @param page_id            某个页面的ID
  * @param action_time        动作的时间点
  * @param search_keyword     用户搜索的关键词
  * @param click_category_id  某一个商品品类的ID
  * @param click_product_id   某一个商品的ID
  * @param order_category_ids 一次订单中所有品类的ID集合
  * @param order_product_ids  一次订单中所有商品的ID集合
  * @param pay_category_ids   一次支付中所有品类的ID集合
  * @param pay_product_ids    一次支付中所有商品的ID集合
  * @param city_id            城市 id
  */
case class UserVisitAction(date: String,
                           user_id: Long,
                           session_id: String,
                           page_id: Long,
                           action_time: String,
                           search_keyword: String,
                           click_category_id: Long,
                           click_product_id: Long,
                           order_category_ids: String,
                           order_product_ids: String,
                           pay_category_ids: String,
                           pay_product_ids: String,
                           city_id: Long)
case class CategoryCountInfo(categoryId: String,
                             clickCount: Long,
                             orderCount: Long,
                             payCount: Long)

2. 定义用到的累加器

需要统计每个品类的点击量, 下单量和支付量, 所以我们在累加器中使用 Map 来存储这些数据: Map(cid, "click"-> 100, cid, "order"-> 50, ....)

import org.apache.spark.util.AccumulatorV2

import scala.collection.mutable

class MapAccumulator extends AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] {
    val map: mutable.Map[(String, String), Long] = mutable.Map[(String, String), Long]()

    override def isZero: Boolean = map.isEmpty

    override def copy(): AccumulatorV2[(String, String), mutable.Map[(String, String), Long]] = {
        val newAcc = new MapAccumulator
        map.synchronized {
            newAcc.map ++= map
        }
        newAcc
    }

    override def reset(): Unit = map.clear


    override def add(v: (String, String)): Unit = {
        map(v) = map.getOrElseUpdate(v, 0) + 1
    }

    // otherMap: (1, click) -> 20 this: (1, click) -> 10 thisMap: (1,2) -> 30
    // otherMap: (1, order) -> 5 thisMap: (1,3) -> 5
    override def merge(other: AccumulatorV2[(String, String), mutable.Map[(String, String), Long]]): Unit = {
        val otherMap: mutable.Map[(String, String), Long] = other.value
        otherMap.foreach {
            kv => map.put(kv._1, map.getOrElse(kv._1, 0L) + kv._2)
        }
    }

    override def value: mutable.Map[(String, String), Long] = map
}

3. 具体实现

整体入口

import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

/**
  * Author lzc
  * Date 2019/8/9 10:56 AM
  */
object PracticeApp {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("Practice").setMaster("local[2]")
        val sc = new SparkContext(conf)
        // 1. 读取文件中的数据
        val lineRDD: RDD[String] = sc.textFile("/Users/lzc/Desktop/user_visit_action.txt")
        // 2. 类型调整
        val userVisitActionRDD: RDD[UserVisitAction] = lineRDD.map(line => {
            val splits: Array[String] = line.split("_")
            UserVisitAction(
                splits(0),
                splits(1).toLong,
                splits(2),
                splits(3).toLong,
                splits(4),
                splits(5),
                splits(6).toLong,
                splits(7).toLong,
                splits(8),
                splits(9),
                splits(10),
                splits(11),
                splits(12).toLong)
        })

        // 需求 1
        val categoryTop10: List[CategoryCountInfo] = CategoryTop10App.statCategoryTop10(sc, userVisitActionRDD)
        println(CategoryCountInfoList)

        sc.stop()
    }
}

需求 1 的具体实现

import com.atguigu.practice.app.acc.MapAccumulator
import com.atguigu.practice.app.bean.{CategoryCountInfo, UserVisitAction}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object CategoryTop10App {
    def statCategoryTop10(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction]): List[CategoryCountInfo] = {
        // 1. 注册累加器
        val acc = new MapAccumulator
        sc.register(acc, "CategoryActionAcc")

        // 2. 遍历日志
        userVisitActionRDD.foreach {
            visitAction => {
                if (visitAction.click_category_id != -1) {
                    acc.add((visitAction.click_category_id.toString, "click"))
                } else if (visitAction.order_category_ids != "null") {
                    visitAction.order_category_ids.split(",").foreach {
                        oid => acc.add((oid, "order"))
                    }
                } else if (visitAction.pay_category_ids != "null") {
                    visitAction.pay_category_ids.split(",").foreach {
                        pid => acc.add((pid, "pay"))
                    }
                }
            }
        }

        // 3. 遍历完成之后就得到每个每个品类 id 和操作类型的数量. 然后按照 CategoryId 进行进行分组
        val actionCountByCategoryIdMap: Map[String, mutable.Map[(String, String), Long]] = acc.value.groupBy(_._1._1)

        // 4. 转换成 CategoryCountInfo 类型的集合, 方便后续处理
        val categoryCountInfoList: List[CategoryCountInfo] = actionCountByCategoryIdMap.map {
            case (cid, actionMap) => CategoryCountInfo(
                cid,
                actionMap.getOrElse((cid, "click"), 0),
                actionMap.getOrElse((cid, "order"), 0),
                actionMap.getOrElse((cid, "pay"), 0)
            )
        }.toList

        // 5. 按照 点击 下单 支付 的顺序降序来排序
        val sortedCategoryInfoList: List[CategoryCountInfo] = categoryCountInfoList.sortBy(info => (info.clickCount, info.orderCount, info.payCount))(Ordering.Tuple3(Ordering.Long.reverse, Ordering.Long.reverse, Ordering.Long.reverse))

        // 6. 截取前 10
        val top10: List[CategoryCountInfo] = sortedCategoryInfoList.take(10)
        // 7. 返回 top10 品类 id
        top10
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""