8.2 实时环境搭建

步骤1: 开启集群

启动 zookeeper 和 Kafka


步骤2: 创建 Topic

在 kafka 中创建topic: ads_log


步骤3: 产生循环不断的数据到指定的 topic

sparkmall-mock模块中创建文件MockRealTime, 启动之后开始生产数据.

代码:

package com.atguigu.sparkmall.mock

import java.util.Properties

import com.atguigu.sparkmall.common.bean.CityInfo
import com.atguigu.sparkmall.mock.util.{RandomNumUtil, RandomOptions}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.collection.mutable.ArrayBuffer

/**
  * 生成实时的模拟数据
  */
object MockRealTime {
    /*
    数据格式:
        timestamp   area    city    userid  adid
        某个时间点 某个地区  某个城市   某个用户  某个广告

     */
    def mockRealTimeData() = {
        // 存储模拟的实时数据
        val array = ArrayBuffer[String]()
        // 城市信息
        val randomOpts = RandomOptions(
            (CityInfo(1, "北京", "华北"), 30),
            (CityInfo(2, "上海", "华东"), 30),
            (CityInfo(3, "广州", "华南"), 10),
            (CityInfo(4, "深圳", "华南"), 20),
            (CityInfo(4, "杭州", "华中"), 10))
        (1 to 50).foreach {
            i => {
                val timestamp = System.currentTimeMillis()
                val cityInfo = randomOpts.getRandomOption()
                val area = cityInfo.area
                val city = cityInfo.city_name
                val userid = RandomNumUtil.randomInt(100, 105)
                val adid = RandomNumUtil.randomInt(1, 5)
                array += s"$timestamp,$area,$city,$userid,$adid"
                Thread.sleep(10)
            }
        }
        array
    }

    def createKafkaProducer: KafkaProducer[String, String] = {
        val props = new Properties
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
        // 等待所有副本节点的应答
        props.put("acks", "1")
        // 重试最大次数
        props.put("retries", "0")

        // 批消息处理大小
        props.put("batch.size", "16384")
        // 请求延时
        props.put("linger.ms", "1")
        //// 发送缓存区内存大小
        props.put("buffer.memory", "33554432")
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
        new KafkaProducer[String, String](props)
    }

    def main(args: Array[String]): Unit = {
        val topic = "ads_log"
        val producer: KafkaProducer[String, String] = createKafkaProducer
        while (true) {
            mockRealTimeData().foreach {
                msg => {
                    producer.send(new ProducerRecord(topic, msg))
                    Thread.sleep(100)
                }
            }
            Thread.sleep(1000)
        }
    }
}

步骤4: 在终端消费者中查看数据是否生产成功


步骤5: 从 kafka 读取数据得到 DStream

sparkmall-common模块中定义MyKafkaUtil, 用来从 Kafka 读取数据得到 DStream

package com.atguigu.sparkmall.common.util

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object MyKafkaUtil {
    val config = ConfigurationUtil("config.properties")
    val broker_list = config.getString("kafka.broker.list")

    // kafka消费者配置
    val kafkaParam = Map(
        "bootstrap.servers" -> broker_list, //用于初始化链接到集群的地址
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        //用于标识这个消费者属于哪个消费团体
        "group.id" -> "commerce-consumer-group",
        //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
        //可以使用这个配置,latest自动重置偏移量为最新的偏移量
        "auto.offset.reset" -> "latest",
        //如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
        //如果是false,会需要手动维护kafka偏移量. 本次我们仍然自动维护偏移量
        "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    /*
     创建DStream,返回接收到的输入数据
     LocationStrategies:根据给定的主题和集群地址创建consumer
     LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
     ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
     ConsumerStrategies.Subscribe:订阅一系列主题
     */

    def getDStream(ssc: StreamingContext, topic: String) = {
        KafkaUtils.createDirectStream[String, String](
            ssc,
            LocationStrategies.PreferConsistent,  // 标配. 只要 kafka 和 spark 没有部署在一台设备就应该是这个参数
            ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
    }
}

步骤6: 创建实时子模块

模块名: sparkmall-realtime

1. 导入依赖

<dependencies>

    <dependency>
        <groupId>com.atguigu</groupId>
        <artifactId>sparkmall-common</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
    </dependency>

</dependencies>

2. 编写RealTimeApp

package com.atgugu.sparkmall.realtime

import com.atgugu.sparkmall.realtime.app.AdsInfo
import com.atguigu.sparkmall.common.util.MyKafkaUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object RealTimeApp {
    def main(args: Array[String]): Unit = {
        // 1. 创建 SparkConf 对象
        val conf: SparkConf = new SparkConf()
            .setAppName("RealTimeApp")
            .setMaster("local[*]")
        // 2. 创建 SparkContext 对象
        val sc = new SparkContext(conf)
        // 3. 创建 StreamingContext
        val ssc = new StreamingContext(sc, Seconds(1))
        // 4. 得到 DStream
        val recordDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getDStream(ssc, "ads_log")

        // 5. 为了方便后面的计算, 把消费到的字符串封装到对象中
        val adsInfoDStream: DStream[AdsInfo] = recordDStream.map {
            record =>
                val split: Array[String] = record.value.split(",")
                AdsInfo(split(0).toLong, split(1), split(2), split(3), split(4))
        }

        ssc.start()
        ssc.awaitTermination()
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-08-20 02:00:56

results matching ""

    No results matching ""