1.2 数据生成模块

模拟出来的数据格式:

时间戳,地区,城市,用户 id,广告 id
1566035129449,华南,深圳,101,2

步骤1: 开启集群

启动 zookeeper 和 Kafka

步骤2: 创建 Topic

在 kafka 中创建topic: ads_log

步骤3: 产生循环不断的数据到指定的 topic

创建模块spark-realtime模块

1. 工具类: RandomNumUtil

用于生成随机数

package com.atguigu.realtime.util

import java.util.Random

import scala.collection.mutable

/**
  * Author lzc
  * Date 2019-07-17 14:07
  *
  * 随机生成整数的工具类
  */
object RandomNumUtil {
    val random = new Random()

    /**
      * 返回一个随机的整数 [from, to]
      *
      * @param from
      * @param to
      * @return
      */
    def randomInt(from: Int, to: Int): Int = {
        if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
        // [0, to - from)  + from [form, to -from + from ]
        random.nextInt(to - from + 1) + from
    }

    /**
      * 随机的Long  [from, to]
      *
      * @param from
      * @param to
      * @return
      */
    def randomLong(from: Long, to: Long): Long = {
        if (from > to) throw new IllegalArgumentException(s"from = $from 应该小于 to = $to")
        random.nextLong().abs % (to - from + 1) + from
    }

    /**
      * 生成一系列的随机值
      *
      * @param from
      * @param to
      * @param count
      * @param canReat 是否允许随机数重复
      */
    def randomMultiInt(from: Int, to: Int, count: Int, canReat: Boolean = true): List[Int] = {
        if (canReat) {
            (1 to count).map(_ => randomInt(from, to)).toList
        } else {
            val set: mutable.Set[Int] = mutable.Set[Int]()
            while (set.size < count) {
                set += randomInt(from, to)
            }
            set.toList
        }
    }


    def main(args: Array[String]): Unit = {
        println(randomMultiInt(1, 15, 10))
        println(randomMultiInt(1, 8, 10, false))
    }
}

2. 工具类: RandomOptions

用于生成带有比重的随机选项

package com.atguigu.realtime.util

import scala.collection.mutable.ListBuffer

/**
  * 根据提供的值和比重, 来创建RandomOptions对象.
  * 然后可以通过getRandomOption来获取一个随机的预定义的值
  */
object RandomOptions {
    def apply[T](opts: (T, Int)*): RandomOptions[T] = {
        val randomOptions = new RandomOptions[T]()
        randomOptions.totalWeight = (0 /: opts) (_ + _._2) // 计算出来总的比重
        opts.foreach {
            case (value, weight) => randomOptions.options ++= (1 to weight).map(_ => value)
        }
        randomOptions
    }


    def main(args: Array[String]): Unit = {
        // 测试
        val opts = RandomOptions(("张三", 10), ("李四", 30), ("ww", 20))

        println(opts.getRandomOption())
        println(opts.getRandomOption())
    }
}

// 工程师 10  程序猿 10  老师 20
class RandomOptions[T] {
    var totalWeight: Int = _
    var options = ListBuffer[T]()
    /**
      * 获取随机的 Option 的值
      *
      * @return
      */
    def getRandomOption() = {
        options(RandomNumUtil.randomInt(0, totalWeight - 1))
    }
}

3. 样例类: CityInfo

package com.atguigu.realtime


/**
  * 城市表
  *
  * @param city_id   城市 id
  * @param city_name 城市名
  * @param area      城市区域
  */
case class CityInfo(city_id: Long,
                    city_name: String,
                    area: String)

4. 生成模拟数据: MockRealTime

package com.atguigu.realtime.mock

import java.util.Properties

import com.atguigu.realtime.CityInfo
import com.atguigu.realtime.util.{RandomNumUtil, RandomOptions}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.collection.mutable.ArrayBuffer

/**
  * 生成实时的模拟数据
  */
object MockRealtime {
    /*
    数据格式:
    timestamp area city userid adid
    某个时间点 某个地区 某个城市 某个用户 某个广告

    */
    def mockRealTimeData(): ArrayBuffer[String] = {
        // 存储模拟的实时数据
        val array = ArrayBuffer[String]()
        // 城市信息
        val randomOpts = RandomOptions(
            (CityInfo(1, "北京", "华北"), 30),
            (CityInfo(2, "上海", "华东"), 30),
            (CityInfo(3, "广州", "华南"), 10),
            (CityInfo(4, "深圳", "华南"), 20),
            (CityInfo(4, "杭州", "华中"), 10))
        (1 to 50).foreach {
            i => {
                val timestamp = System.currentTimeMillis()
                val cityInfo = randomOpts.getRandomOption()
                val area = cityInfo.area
                val city = cityInfo.city_name
                val userid = RandomNumUtil.randomInt(100, 105)
                val adid = RandomNumUtil.randomInt(1, 5)
                array += s"$timestamp,$area,$city,$userid,$adid"
                Thread.sleep(10)
            }
        }
        array
    }

    def createKafkaProducer: KafkaProducer[String, String] = {
        val props: Properties = new Properties
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        new KafkaProducer[String, String](props)
    }

    def main(args: Array[String]): Unit = {
        val topic = "ads_log"
        val producer: KafkaProducer[String, String] = createKafkaProducer
        while (true) {
            mockRealTimeData().foreach {
                msg => {
                    producer.send(new ProducerRecord(topic, msg))
                    Thread.sleep(100)
                }
            }
            Thread.sleep(1000)
        }
    }
}

步骤 4: 确认 kafka 中数据是否生成成功

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-09-26 14:46:14

results matching ""

    No results matching ""