3.3 创建模拟数据子模块

本模块用来生成模拟数据

模块名: sparkmall-mock

3.3.1 添加依赖

<dependencies>
    <!--依赖通用模块-->
    <dependency>
        <groupId>com.atguigu</groupId>
        <artifactId>sparkmall-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

3.3.2 创建需要的类

1. 创建生成随机数据的工具类

文件: RandomNumUtil.scala

package com.atguigu.sparkmall.mock.util


import java.util.Date

import scala.collection.mutable
import scala.util.Random

/**
  * 生成随机数据的工具
  */
object RandomNumUtil {
    /*随机数生成器对象*/
    private val random = new Random()

    /**
      * 生成随机的整数, 区间: [from, to]
      *
      * @param from
      * @param to
      * @return
      */
    def randomInt(from: Int, to: Int): Int = {
        if (from > to) throw new IllegalArgumentException(s"from: $from 不能大于 to: $to")
        else random.nextInt(to - from + 1) + from
    }

    /**
      * 创建多个 Int 值
      *
      * @param from
      * @param to
      * @param count     创建的 Int 值的顺序
      * @param canRepeat 是否允许重复
      * @return List[Int] 集合
      */
    def randomMultiInt(from: Int, to: Int, count: Int, canRepeat: Boolean = true): List[Int] = {
        if (canRepeat) {
            (1 to count).toList.map(_ => randomInt(from, to))
        } else {
            val set = mutable.Set[Int]()
            while (set.size < count) {
                set += randomInt(from, to)
            }
            set.toList
        }
    }

    /**
      * 生成一个随机的 Long 值 范围: [from, to]
      *
      * @param from
      * @param to
      * @return
      */
    def randomLong(from: Long, to: Long): Long = {
        if (from > to) throw new IllegalArgumentException(s"from: $from 不能大于 to: $to")
        else math.abs(random.nextLong) % (to - from + 1) + from
    }
}

文件: RandomOptions.scala

这个类主要生成带有一定比重的值.

package com.atguigu.sparkmall.mock.util

import scala.collection.mutable.ListBuffer

/**
  * 根据提供的值和比重, 来创建RandomOptions对象.
  * 然后可以通过getRandomOption来获取一个随机的预定义的值
  */
object RandomOptions {
    def apply[T](opts: (T, Int)*) ={
        val randomOptions = new RandomOptions[T]()
        randomOptions.totalWeight = (0 /: opts)(_ + _._2) // 计算出来总的比重
        opts.foreach{
            case (value, weight) => randomOptions.options ++= (1 to weight).map(_ => value)
        }
        randomOptions
    }


    def main(args: Array[String]): Unit = {
        // 测试
        val opts = RandomOptions(("张三", 10), ("李四", 30))
        (0 to 50).foreach(_ => println(opts.getRandomOption()))
    }
}
class RandomOptions[T]{
    var totalWeight: Int = _
    var options = ListBuffer[T]()

    /**
      * 获取随机的 Option 的值
      * @return
      */
    def getRandomOption() = {
        options(RandomDataUtil.randomInt(0, totalWeight - 1))
    }
}

文件: RandomDate.scala

package com.atguigu.sparkmall.mock.util

import java.util.Date

object RandomDate {
    def apply(startDate: Date, stopDate: Date, step: Int) = {
        val randomDate = new RandomDate
        val avgStepTime = (stopDate.getTime - startDate.getTime) / step
        randomDate.maxStepTime = 4 * avgStepTime
        randomDate.lastDateTIme = startDate.getTime
        randomDate
    }
}

class RandomDate {
    // 上次 action 的时间
    var lastDateTIme: Long = _
    // 每次最大的步长时间
    var maxStepTime: Long = _

    /**
      * 得到一个随机时间
      * @return
      */
    def getRandomDate = {
        // 这次操作的相比上次的步长
        val timeStep = RandomNumUtil.randomLong(0, maxStepTime)
        lastDateTIme += timeStep
        new Date(lastDateTIme)
    }
}

2. 生成模拟数据

package com.atguigu.sparkmall.mock

import java.text.SimpleDateFormat
import java.util.UUID

import com.atguigu.sparkmall.common.bean.{CityInfo, ProductInfo, UserInfo, UserVisitAction}
import com.atguigu.sparkmall.common.util.ConfigurationUtil
import com.atguigu.sparkmall.mock.util.{RandomDate, RandomNumUtil, RandomOptions}
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ListBuffer

/**
  * 生成离线数据
  */
object MockOffline {
    // --- 用户相关参数 开始 ---
    // 用户数量
    val userNum = 100
    // 年龄范围
    val ageFrom = 20
    val ageTo = 60
    // 职业
    val professionOpts = RandomOptions(("学生", 4), ("程序员", 3), ("经理", 2), ("老师", 1))
    // 性别
    val genderOpts = RandomOptions(("男", 6), ("女", 4))
    // --- 用户相关参数 结束 ---

    // --- 商品相关参数 开始 ---
    //品类数量
    val cargoryNum = 20
    // 商品数量
    val productNum = 100
    // --- 商品相关参数 结束 ---


    // --- 用户行为相关参数 开始 ---
    // 搜索关键词
    val searchKeywordsOpts = RandomOptions(("手机", 30), ("笔记本", 70), ("内存", 70), ("i7", 70), ("苹果", 70), ("吃鸡", 70))
    // 动作
    val actionOpts = RandomOptions(("search", 20), ("click", 60), ("order", 6), ("pay", 4), ("quit", 10))
    // session 数量
    val sessionNum = 10000
    //系统页面数
    val pageNum = 50
    // 日志大致数量,用于分布时间
    val logAboutNum = 100000
    // --- 用户行为相关参数 结束 ---


    /**
      * 生成 UserInfo 数据
      *
      * @return 所有的 UserInfo 组成的 List 集合
      */
    def mockUserInfo: List[UserInfo] = {

        (1 to userNum).toList.map(
            i => UserInfo(i,
                s"user_$i",
                s"name_$i",
                RandomNumUtil.randomInt(ageFrom, ageTo),
                professionOpts.getRandomOption(),
                genderOpts.getRandomOption()
            ))
    }

    /**
      * 生成 CityInfo 数据
      *
      * @return 所有的 CityInfo 组成的 List 集合
      */
    def mockCityInfo: List[CityInfo] = {
        List(CityInfo(1L, "北京", "华北"),
            CityInfo(2L, "上海", "华东"),
            CityInfo(3L, "深圳", "华南"),
            CityInfo(4L, "广州", "华南"),
            CityInfo(5L, "武汉", "华中"),
            CityInfo(6L, "南京", "华东"),
            CityInfo(7L, "天津", "华北"),
            CityInfo(8L, "成都", "西南"),
            CityInfo(9L, "哈尔滨", "东北"),
            CityInfo(10L, "大连", "东北"),
            CityInfo(11L, "沈阳", "东北"),
            CityInfo(12L, "西安", "西北"),
            CityInfo(13L, "长沙", "华中"),
            CityInfo(14L, "重庆", "西南"),
            CityInfo(15L, "济南", "华东"),
            CityInfo(16L, "石家庄", "华北"),
            CityInfo(17L, "银川", "西北"),
            CityInfo(18L, "杭州", "华东"),
            CityInfo(19L, "保定", "华北"),
            CityInfo(20L, "福州", "华南"),
            CityInfo(21L, "贵阳", "西南"),
            CityInfo(22L, "青岛", "华东"),
            CityInfo(23L, "苏州", "华东"),
            CityInfo(24L, "郑州", "华北"),
            CityInfo(25L, "无锡", "华东"),
            CityInfo(26L, "厦门", "华南"))
    }

    /**
      * 生成 ProductInfo 数据
      *
      * @return 所有的 ProductInfo 对象组成的 List 集合
      */
    def mockProductInfo: List[ProductInfo] = {

        // 商品的品类
        val productExtendOpts = RandomOptions(("自营", 70), ("第三方", 30))
        (1 to productNum).toList.map(
            i => ProductInfo(i, s"商品_$i", productExtendOpts.getRandomOption())
        )
    }

    /**
      * 生成 UserVisitAction 数据
      */
    def mockUserVisitAction: List[UserVisitAction] = {
        val dateFormatter = new SimpleDateFormat("yyy-MM-dd")
        val timeFormatter = new SimpleDateFormat("yyy-MM-dd HH:mm:ss")
        // 开始日期
        val fromDate = dateFormatter.parse("2019-03-20")
        // 结束日期
        val toDate = dateFormatter.parse("2019-03-23")


        val randomDate = RandomDate(fromDate, toDate, logAboutNum)
        val rows = ListBuffer[UserVisitAction]()
        // 根据 session 来创建对应 action
        for (i <- 1 to sessionNum) {
            val userId = RandomNumUtil.randomInt(1, userNum)
            val sessionId = UUID.randomUUID().toString
            var isQuit = false
            while (!isQuit) {
                val action = actionOpts.getRandomOption()
                if (action == "quit") {
                    isQuit = true
                } else {
                    val date = randomDate.getRandomDate
                    val actionDateString = dateFormatter.format(date)
                    val actionTimeString = timeFormatter.format(date)

                    var searchKeyword: String = null
                    var clickCategoryId: Long = -1
                    var clickProductId: Long = -1
                    var orderCategoryIds: String = null
                    var orderProductIds: String = null
                    var payCategoryIds: String = null
                    var payProductIds: String = null

                    val cityId: Long = RandomNumUtil.randomLong(1, 26)
                    action match {
                        case "search" => searchKeyword = searchKeywordsOpts.getRandomOption()
                        case "click" => {
                            clickCategoryId = RandomNumUtil.randomInt(1, cargoryNum)
                            clickProductId = RandomNumUtil.randomInt(1, productNum)
                        }
                        case "order" => {
                            orderCategoryIds = RandomNumUtil.randomMultiInt(1, cargoryNum, RandomNumUtil.randomInt(1, 5), false).mkString(",")
                            orderProductIds = RandomNumUtil.randomMultiInt(1, productNum, RandomNumUtil.randomInt(1, 3), false).mkString(",")
                        }
                        case "pay" => {
                            payCategoryIds = RandomNumUtil.randomMultiInt(1, cargoryNum, RandomNumUtil.randomInt(1, 5), false).mkString(",")
                            payProductIds = RandomNumUtil.randomMultiInt(1, productNum, RandomNumUtil.randomInt(1, 3), false).mkString(",")
                        }
                    }
                    rows += UserVisitAction(actionDateString,
                        userId,
                        sessionId,
                        RandomNumUtil.randomInt(1, pageNum),
                        actionTimeString,
                        searchKeyword,
                        clickCategoryId,
                        clickProductId,
                        orderCategoryIds,
                        orderProductIds,
                        payCategoryIds,
                        payProductIds,
                        cityId)
                }
            }
        }
        rows.toList
    }

    def main(args: Array[String]): Unit = {
        // 模拟数据
        val userVisitActionData = mockUserVisitAction
        val userInfoData = mockUserInfo
        val productInfoData = mockProductInfo
        val cityInfoData = mockCityInfo

        val spark: SparkSession = SparkSession
            .builder()
            .master("local[1]")
            .appName("MockOffline")
            .enableHiveSupport()
            .config("spark.sql.warehouse.dir", "hdfs://hadoop201:9000/user/hive/warehouse")
            .getOrCreate()
        import spark.implicits._
        val sc = spark.sparkContext
        val userVisitActionDF = sc.makeRDD(userVisitActionData).toDF
        val userInfoDF = sc.makeRDD(userInfoData).toDF
        val productInfoDF = sc.makeRDD(productInfoData).toDF
        val cityInfoDF = sc.makeRDD(cityInfoData).toDF

        insertIntoHive(spark, "user_visit_action", userVisitActionDF)
        insertIntoHive(spark, "user_info", userInfoDF)
        insertIntoHive(spark, "product_info", productInfoDF)
        insertIntoHive(spark, "city_info", cityInfoDF)

    }

    /**
      * 把数据插入到 Hive 表中
      *
      * @param spark
      * @param tableName
      * @param df
      * @return
      */
    def insertIntoHive(spark: SparkSession, tableName: String, df: DataFrame) = {
        val database = ConfigurationUtil("config.properties").getString("hive.database")
        spark.sql(s"use $database") // 切换数据库
        spark.sql(s"drop table if exists $tableName") // 如果表已经存在, 则删除该表
        df.write.saveAsTable(tableName) // 保存数据
        //        spark.sql(s"select * from $tableName").show(10000)
        println(s"$tableName 数据写入完毕!")
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-20 02:00:56

results matching ""

    No results matching ""