第 3 章 需求 2: Top10热门品类中每个品类的 Top10 活跃 Session 统计

3.1 需求分析

对于排名前 10 的品类,分别获取每个品类点击次数排名前 10 的 sessionId。(注意: 这里我们只关注点击次数, 不关心下单和支付次数)

这个就是说,对于 top10 的品类,每一个都要获取对它点击次数排名前 10 的 sessionId。

这个功能,可以让我们看到,对某个用户群体最感兴趣的品类,各个品类最感兴趣最典型的用户的 session 的行为。

3.2 思路

  1. 过滤出来 category Top10的日志

    • 需要用到需求1的结果, 然后只需要得到categoryId就可以了
  2. 转换结果为 RDD[(categoryId, sessionId), 1] 然后统计数量 => RDD[(categoryId, sessionId), count]

  3. 统计每个品类 top10. => RDD[categoryId, (sessionId, count)] => RDD[categoryId, Iterable[(sessionId, count)]]

  4. 对每个 Iterable[(sessionId, count)]进行排序, 并取每个Iterable的前10

  5. 把数据封装到 CategorySession

3.3 具体代码实现

1. bean

CategorySession

封装最终写入到数据库的数据

case class CategorySession(categoryId: String,
                           sessionId: String,
                           clickCount: Long)

2. 具体实现

package com.atguigu.practice.app

import com.atguigu.practice.app.bean.{CategoryCountInfo, CategorySession, UserVisitAction}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

/**
  * Author lzc
  * Date 2019/8/9 10:49 AM
  */
object CategorySessionApp {
    def statCategoryTop10Session(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction], categoryTop10: List[CategoryCountInfo]) = {
        // 1. 得到top10的品类的id
        val categoryIdTop10: List[String] = categoryTop10.map(_.categoryId)
        // 2. 过去出来只包含 top10 品类id的那些用户行为
        val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(UserVisitAction => {
            categoryIdTop10.contains(UserVisitAction.click_category_id.toString)
        })
        // 3. 聚合操作
        //  => RDD[(品类id, sessionId))] map
        //    => RDD[(品类id, sessionId), 1)]
        val categorySessionOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD
            .map(userVisitAction => ((userVisitAction.click_category_id, userVisitAction.session_id), 1))
        // RDD[(品类id, sessionId), count)]
        val categorySessionCount: RDD[(Long, (String, Int))] =
            categorySessionOne.reduceByKey(_ + _).map {
                case ((cid, sid), count) => (cid, (sid, count))
            }
        // 4. 按照品类 id 进行分组
        // RDD[品类id, Iterator[(sessionId, count)]]
        val categorySessionCountGrouped: RDD[(Long, Iterable[(String, Int)])] = categorySessionCount.groupByKey

        // 5. 排序取前 10
        val categorySessionRDD: RDD[CategorySession] = categorySessionCountGrouped.flatMap {
            case (cid, it) => {
                val list: List[(String, Int)] = it.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
                val result: List[CategorySession] = list.map {
                    case (sid, count) => CategorySession(cid.toString, sid, count)
                }
                result
            }
        }
        categorySessionRDD.collect.foreach(println)
    }
}
/*
1. 得到top10的品类的id

2. 过去出来只包含 top10 品类id的那些用户行为

3. 分组计算
    => RDD[(品类id, sessionId))] map
    => RDD[(品类id, sessionId), 1)]  reduceByKey
    => RDD[(品类id, sessionId), count)]    map
    => RDD[品类id, (sessionId, count)]     groupByKey
    RDD[品类id, Iterator[(sessionId, count)]]
 */

3. 前面的实现存在的问题

下面的代码可能存在的问题:

 // 5. 排序取前 10
val categorySessionRDD: RDD[CategorySession] = categorySessionCountGrouped.flatMap {
    case (cid, it) => {
        val list: List[(String, Int)] = it.toList.sortBy(_._2)(Ordering.Int.reverse).take(10)
        val result: List[CategorySession] = list.map {
            case (sid, count) => CategorySession(cid.toString, sid, count)
        }
        result
    }
}

上面的操作中, 有一个操作是把迭代器中的数据转换成List之后再进行排序, 这里存在内存溢出的可能. 如果迭代器的数据足够大, 当转变成 List 的时候, 会把这个迭代器的所有数据都加载到内存中, 所以有可能造成内存的溢出.

前面的排序是使用的 Scala 的排序操作, 由于 scala 排序的时候需要把数据全部加载到内存中才能完成排序, 所以理论上都存在内存溢出的风险.

如果使用 RDD 提供的排序功能, 可以避免内存溢出的风险, 因为 RDD 的排序需要 shuffle, 是采用了内存+磁盘来完成的排序.

4. 解决方案 1

使用 RDD 的排序功能, 但是由于 RDD 排序是对所有的数据整体排序, 所以一次只能针对一个 CategoryId 进行排序操作.

参考下面的代码:

def statCategoryTop10Session_1(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction], categoryTop10: List[CategoryCountInfo]) = {
    // 1. 得到top10的品类的id
    val categoryIdTop10: List[String] = categoryTop10.map(_.categoryId)
    // 2. 过去出来只包含 top10 品类id的那些用户行为
    val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(UserVisitAction => {
        categoryIdTop10.contains(UserVisitAction.click_category_id.toString)
    })
    // 3. 聚合操作
    //  => RDD[(品类id, sessionId))] map
    //    => RDD[(品类id, sessionId), 1)]
    val categorySessionOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD
        .map(userVisitAction => ((userVisitAction.click_category_id, userVisitAction.session_id), 1))
    // RDD[(品类id, sessionId), count)]
    val categorySessionCount: RDD[(Long, (String, Int))] =
        categorySessionOne.reduceByKey(_ + _).map {
            case ((cid, sid), count) => (cid, (sid, count))
        }

    // 4. 每个品类 id 排序取前 10的 session
    categoryIdTop10.foreach(cid => {
        // 针对某个具体的 CategoryId, 过滤出来只只包含这个CategoryId的 RDD, 然后整体j降序p排列
        val top10: Array[CategorySession] = categorySessionCount
            .filter(_._1 == cid.toLong)
            .sortBy(_._2._2, ascending = false)
            .take(10)
            .map {
                case (cid, (sid, count)) => CategorySession(cid.toString, sid, count)
            }
        top10.foreach(println)

    })

}

5. 解决方案 2

方案 1 解决了内存溢出的问题, 但是也有另外的问题: 提交的 job 比较多, 有一个品类 id 就有一个 job, 在本案例中就有了 10 个 job.

有没有更加好的方案呢?

可以把同一个品类的数据都进入到同一个分区内, 然后对每个分区的数据进行排序!

需要用到自定义分区器.

自定义分区器

class MyPartitioner(categoryIdTop10: List[String]) extends Partitioner {
    // 给每个 cid 配一个分区号(使用他们的索引就行了)
    private val cidAndIndex: Map[String, Int] = categoryIdTop10.zipWithIndex.toMap

    override def numPartitions: Int = categoryIdTop10.size

    override def getPartition(key: Any): Int = {
        key match {
            case (cid: Long, _) => cidAndIndex(cid.toString)
        }
    }
}

CategorySession修改

case class CategorySession(categoryId: String,
                           sessionId: String,
                           clickCount: Long) extends Ordered[CategorySession] {
    override def compare(that: CategorySession): Int = {
        if (this.clickCount <= that.clickCount) 1
        else -1
    }

}

具体方法

def statCategoryTop10Session_2(sc: SparkContext, userVisitActionRDD: RDD[UserVisitAction], categoryTop10: List[CategoryCountInfo]) = {
    // 1. 得到top10的品类的id
    val categoryIdTop10: List[String] = categoryTop10.map(_.categoryId)
    // 2. 过去出来只包含 top10 品类id的那些用户行为
    val filteredUserVisitActionRDD: RDD[UserVisitAction] = userVisitActionRDD.filter(UserVisitAction => {
        categoryIdTop10.contains(UserVisitAction.click_category_id.toString)
    })
    // 3. 聚合操作
    //  => RDD[(品类id, sessionId))] map
    //    => RDD[(品类id, sessionId), 1)]
    val categorySessionOne: RDD[((Long, String), Int)] = filteredUserVisitActionRDD
        .map(userVisitAction => ((userVisitAction.click_category_id, userVisitAction.session_id), 1))
    // RDD[(品类id, sessionId), count)]  在 reduceByKey 的时候指定分区器
    val categorySessionCount: RDD[CategorySession] = categorySessionOne
        .reduceByKey(new MyPartitioner(categoryIdTop10), _ + _) //  指定分区器  (相比以前有变化)
        .map {
        case ((cid, sid), count) => CategorySession(cid.toString, sid, count)
    }

    // 4. 对每个分区内的数据排序取前 10(相比以前有变化)
    val categorySessionRDD: RDD[CategorySession] = categorySessionCount.mapPartitions(it => {

        // 这个时候也不要把 it 变化 list 之后再排序, 否则仍然会有可能出现内存溢出.
        // 我们可以把数据存储到能够自动排序的集合中 比如 TreeSet 或者 TreeMap 中, 并且永远保持这个集合的长度为 10
        // 让TreeSet默认安装 count 的降序排列, 需要让CategorySession现在 Ordered 接口(Comparator)
        var top10: mutable.TreeSet[CategorySession] = mutable.TreeSet[CategorySession]()

        it.foreach(cs => {
            top10 += cs // 把 CategorySession 添加到 TreeSet 中
            if (top10.size > 10) { // 如果 TreeSet 的长度超过 10, 则移除最后一个
                top10 = top10.take(10)
            }
        })
        top10.toIterator
    })
    categorySessionRDD.collect.foreach(println)

}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""