第 5 章 需求2: Top10热门品类中每个品类的 Top10 活跃 Session 统计

5.1 需求分析

对于排名前 10 的品类,分别获取每个品类点击次数排名前 10 的 sessionId。

这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。

这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。计算完成之后,将数据保存到 MySQL 数据库中。


5.2 思路

  1. 过滤出来 category Top10的日志

    • 需要用到需求1的结果, 然后只需要得到categoryId就可以了
  2. 转换结果为 RDD[(categoryId, sessionId), 1] 然后统计数量 => RDD[(categoryId, sessionId), count]

  3. 统计每个品类 top10. => RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]

  4. 对每个 Iterable[(sessionId, count)]进行排序, 并取每个Iterable的前10

  5. 把数据封装到 CategorySession

  6. 写入到 mysql 数据库

  7. 首先创建 Mysql 表

     -- ----------------------------
     -- create table category_top10_session_count
     -- ----------------------------
     CREATE TABLE `category_top10_session_count` (
       `taskId` TEXT,
       `categoryId` TEXT,
       `sessionId` TEXT,
       `clickCount` BIGINT(20) DEFAULT NULL
     ) ENGINE=INNODB DEFAULT CHARSET=utf8
    

5.3 具体代码实现

1. bean

CategorySession

封装最终写入到数据库的数据

package com.atguigu.sparkmall.offline.bean

case class CategorySession(taskId: String,
                           categoryId: String,
                           sessionId: String,
                           clickCount: Long)

2. 业务实现类

入口的类的变更

OfflineApp

package com.atguigu.sparkmall.offline

import java.util.UUID

import com.alibaba.fastjson.JSON
import com.atguigu.sparkmall.common.bean.UserVisitAction
import com.atguigu.sparkmall.common.util.ConfigurationUtil
import com.atguigu.sparkmall.offline.app.{CategorySessionApp, CategoryTop10App}
import com.atguigu.sparkmall.offline.bean.Condition
import org.apache.spark.sql.SparkSession

object OfflineApp {
    def main(args: Array[String]): Unit = {
        val spark: SparkSession = SparkSession
            .builder()
            .master("local[*]")
            .appName("OfflineApp")
            .enableHiveSupport()
            .config("spark.sql.warehouse.dir", "hdfs://hadoop201:9000/user/hive/warehouse")
            .getOrCreate()
        val taskId = UUID.randomUUID().toString
        // 根据条件过滤取出需要的 RDD, 过滤条件定义在配置文件中
        val userVisitActionRDD = readUserVisitActionRDD(spark, readConditions)
        userVisitActionRDD.cache // 做缓存

        println("任务1: 开始")
        // 保存任务1的结果, 任务2 要用. 需要去 CategoryTop10App类中添加返回值
        val categoryTop10 = CategoryTop10App.statCategoryTop10(spark, userVisitActionRDD, taskId)          println("任务1: 结束")

        println("任务2: 开始")
        CategorySessionApp.statCategoryTop10Session(spark, categoryTop10, userVisitActionRDD, taskId)
        println("任务2: 结束")

    }
}

具体业务类

CategorySessionApp

package com.atguigu.sparkmall.offline.app

import com.atguigu.sparkmall.common.bean.UserVisitAction
import com.atguigu.sparkmall.common.util.JDBCUtil
import com.atguigu.sparkmall.offline.bean.{CategoryCountInfo, CategorySession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object CategorySessionApp {

    def statCategoryTop10Session(spark: SparkSession, categoryTop10: List[CategoryCountInfo], userVisitActionRDD: RDD[UserVisitAction], taskId: String) = {
        // 1. 过滤掉 category 不在前 10 的日志
        // 1.1 得到 top10 的 categoryId
        val categoryIdTop10 = categoryTop10.map(_.categoryId)
        val categoryIdTop10BD = spark.sparkContext.broadcast(categoryIdTop10) // 广播变量
        // 1.2 过滤出来categorytop10的日志
        val filteredActionRDD = userVisitActionRDD.filter(info => categoryIdTop10BD.value.contains(info.click_category_id + ""))

        // 2. 转换结果为 RDD[(categoryId, sessionId), 1]  然后统计数量
        val categorySessionCountRDD = filteredActionRDD
            .map(userAction => ((userAction.click_category_id, userAction.session_id), 1))
            .reduceByKey(_ + _)

        // 3. 统计每个品类top10.  => RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]
        val categorySessionGrouped = categorySessionCountRDD.map {
            case ((cid, sid), count) => (cid, (sid, count))
        }.groupByKey

        // 4. 对每个 Iterable[(sessionId, count)]进行排序, 并取每个Iterable的前10
        // 5. 把数据封装到 CategorySession 中
        val sortedCategorySession = categorySessionGrouped.flatMap {
            case (cid, it) => {
                it.toList.sortBy(_._2)(Ordering.Int.reverse).take(10).map{
                    item => CategorySession(taskId, cid.toString, item._1, item._2)
                }
            }
        }
        // 6. 写入到 mysql 数据库
        val categorySessionArr = sortedCategorySession.collect.map(item => Array(item.taskId, item.categoryId, item.sessionId, item.clickCount))
        JDBCUtil.executeUpdate("truncate category_top10_session_count", null)
        JDBCUtil.executeBatchUpdate("insert into category_top10_session_count values(?, ?, ?, ?)", categorySessionArr)
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-20 02:00:56

results matching ""

    No results matching ""