第 7 章 需求4: 各区域热门商品 Top3

7.1 需求简介

这里的热门商品是从点击量的维度来看的.

计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

例如:

地区 商品名称 点击次数 城市备注
华北 商品A 100000 北京21.2%,天津13.2%,其他65.6%
华北 商品P 80200 北京63.0%,太原10%,其他27.0%
华北 商品M 40000 北京63.0%,太原10%,其他27.0%
东北 商品J 92000 大连28%,辽宁17.0%,其他 55.0%

7.2 思路分析

该需求我们不使用 RDD, 使用 sql 来完成. 碰到复杂的需求, 可以使用 udf 或 udaf

  1. 查询出来所有的点击记录, 并与 city_info 表连接, 得到每个城市所在的地区.

  2. 按照地区和商品 id 分组, 统计出每个商品在每个地区的总点击次数

  3. 每个地区内按照点击次数降序排列

  4. 只取前三名. 并把结果保存在数据库中

  5. 城市备注需要自定义 UDAF 函数


7.3 具体业务实现

1. 具体业务实现

package com.atguigu.sparkmall.offline.app

import java.util.Properties

import com.atguigu.sparkmall.common.util.ConfigurationUtil
import com.atguigu.sparkmall.offline.udf.CityClickCountUDAF
import org.apache.spark.sql.{SaveMode, SparkSession}

object AreaClickTop3App {
    def statAreaClickTop3Product(spark: SparkSession) = {
        // 注册 UDAF 函数
        spark.udf.register("city_remark", new CityClickCountUDAF)
        spark.sql("use sparkmall")
        // 1. 查询出来所有的点击记录, 并与 city_info 表连接, 得到每个城市所在的地区.
        spark.sql(
            """
              |select
              |    c.*,
              |    v.click_product_id
              |from city_info c join user_visit_action v on c.city_id=v.city_id
              |where v.click_product_id > -1
            """.stripMargin).createOrReplaceTempView("t1")
        // 2. 按照地区商品 id 分组, 统计出每个商品的总点击次数
        spark.sql(
            """
              |select
              |    t1.area,
              |    t1.click_product_id,
              |    count(*) click_count,
              |    city_remark(t1.city_name) remark
              |from t1
              |group by t1.area, t1.click_product_id
            """.stripMargin).createOrReplaceTempView("t2")
        // 3. 每个地区内按照点击次数降序排列
        spark.sql(
            """
              |select
              |    *,
              |    rank() over(partition by t2.area sort by t2.click_count desc) rank
              |from t2
            """.stripMargin).createOrReplaceTempView("t3")
        // 4. 只取前三名. 并把结果保存在数据库中
        val conf = ConfigurationUtil("config.properties")
        val props = new Properties()
        props.setProperty("user", "root")
        props.setProperty("password", "aaa")
        spark.sql(
            """
              |select
              |    t3.area,
              |    p.product_name,
              |    t3.click_count,
              |    t3.remark,
              |    t3.rank
              |from t3 join product_info p on t3.click_product_id=p.product_id
              |where t3.rank <= 3
            """.stripMargin)
            .write.mode(SaveMode.Overwrite)
            .jdbc(conf.getString("jdbc.url"), "area_click_top10", props)
    }
}

/*
1. 查询出来所有的点击记录, 并与 city 变连接, 得到每个城市所在的地区
    select c.*
    from city_info c join user_visit_action v on c.cit_id=v.city_id
    where v.click_product_id > -1

    t1
2. 按照地区分组和商品 id, 统计出来商品的点击次数

    select
        t1.area,
        t1.click_product_id,
        count(*) click_count
    from t1
    group by t1.area, t1.click_product_id

    t2

3. 每个地区内按照点击次数进行排名
    select
        *,
        rank() over(partition by t2.area sort by t2.click_count desc) rank
    from t2

    t3
4. 每个地区取 3 名

    select
        *
    from t3
    where rank <= 3

    t4

5. 前3名的商品中每个城市的点击量百分比


6. 自定义聚合函数

 */

2. UDAF 函数

package com.atguigu.sparkmall.offline.udf

import java.text.DecimalFormat

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

class CityClickCountUDAF extends UserDefinedAggregateFunction {


    // 输出数据的类型 北京:String
    override def inputSchema: StructType = {
        StructType(StructField("city_name", StringType) :: Nil)
    }

    // 存储类型 map[北京, count]  总点击数:total_count
    override def bufferSchema: StructType = {
        StructType(StructField("city_count", MapType(StringType, LongType)) :: StructField("total_count", LongType) :: Nil)
    }

    // 输出的数据类型  String
    override def dataType: DataType = StringType

    // 校验 相同输入是否有相同的输出 true
    override def deterministic: Boolean = true

    // 初始化  给 map 和 totalCount 赋初值
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
        // 初始化 map
        buffer(0) = Map[String, Long]()
        // 初始化总值
        buffer(1) = 0L
    }

    // 分区执行更新操作.  executor 内的合并
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
        val cityName: String = input.getString(0)
        val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
        // 更新各个城市的计数
        buffer(0) = map + (cityName -> (map.getOrElse(cityName, 0L) + 1L))
        // 更新总数
        buffer(1) = buffer.getLong(1) + 1
    }

    // 多个 buffer 合并. 跨 executor 合并
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
        // 合并各个城市的累计
        val map1: Map[String, Long] = buffer1.getAs[Map[String, Long]](0)
        val map2: Map[String, Long] = buffer2.getAs[Map[String, Long]](0)

        buffer1(0) = map1.foldLeft(map2) {
            case (m, (cityName, cityCount)) => {
                m + (cityName -> (m.getOrElse(cityName, 0L) + cityCount))
            }
        }
        // 合并总数
        buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
    }

    // 最终的输出  把 buffer 中的数据展示成字符串
    override def evaluate(buffer: Row): Any = {
        // 1. 取值
        val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
        val totalCount = buffer.getLong(1)
        // 2. 排序之后截取前2个, 并统计他们的比值
        var cityTop2: List[(String, Long)] = map.toList.sortBy(_._2)(Ordering.Long.reverse).take(2)
        var cityRemarks: List[CityRemark] = cityTop2.map {
            case (cityName, cityCount) => {
                val ratio: Double = cityCount.toDouble / totalCount
                CityRemark(cityName, ratio)
            }
        }
        // 3. 剩下的合并的其他
        var otherRadio = 1D
        cityRemarks.foreach(otherRadio -= _.cityRatio)
        cityRemarks = cityRemarks :+ CityRemark("其他", otherRadio)
        // 4. 拼接成字符串
        cityRemarks.mkString(", ")
    }

    case class CityRemark(cityName: String, cityRatio: Double) {
        val formatter = new DecimalFormat("0.00%")

        override def toString: String = s"$cityName:${formatter.format(cityRatio)}"
    }


}

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-20 02:00:56

results matching ""

    No results matching ""