8.2 实时环境搭建
步骤1: 开启集群
启动 zookeeper 和 Kafka
步骤2: 创建 Topic
在 kafka 中创建topic: ads_log
步骤3: 产生循环不断的数据到指定的 topic
在 sparkmall-mock
模块中创建文件MockRealTime
, 启动之后开始生产数据.
代码:
package com.atguigu.sparkmall.mock
import java.util.Properties
import com.atguigu.sparkmall.common.bean.CityInfo
import com.atguigu.sparkmall.mock.util.{RandomNumUtil, RandomOptions}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.mutable.ArrayBuffer
/**
* 生成实时的模拟数据
*/
object MockRealTime {
/*
数据格式:
timestamp area city userid adid
某个时间点 某个地区 某个城市 某个用户 某个广告
*/
def mockRealTimeData() = {
// 存储模拟的实时数据
val array = ArrayBuffer[String]()
// 城市信息
val randomOpts = RandomOptions(
(CityInfo(1, "北京", "华北"), 30),
(CityInfo(2, "上海", "华东"), 30),
(CityInfo(3, "广州", "华南"), 10),
(CityInfo(4, "深圳", "华南"), 20),
(CityInfo(4, "杭州", "华中"), 10))
(1 to 50).foreach {
i => {
val timestamp = System.currentTimeMillis()
val cityInfo = randomOpts.getRandomOption()
val area = cityInfo.area
val city = cityInfo.city_name
val userid = RandomNumUtil.randomInt(100, 105)
val adid = RandomNumUtil.randomInt(1, 5)
array += s"$timestamp,$area,$city,$userid,$adid"
Thread.sleep(10)
}
}
array
}
def createKafkaProducer: KafkaProducer[String, String] = {
val props = new Properties
// Kafka服务端的主机名和端口号
props.put("bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
// 等待所有副本节点的应答
props.put("acks", "1")
// 重试最大次数
props.put("retries", "0")
// 批消息处理大小
props.put("batch.size", "16384")
// 请求延时
props.put("linger.ms", "1")
//// 发送缓存区内存大小
props.put("buffer.memory", "33554432")
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
new KafkaProducer[String, String](props)
}
def main(args: Array[String]): Unit = {
val topic = "ads_log"
val producer: KafkaProducer[String, String] = createKafkaProducer
while (true) {
mockRealTimeData().foreach {
msg => {
producer.send(new ProducerRecord(topic, msg))
Thread.sleep(100)
}
}
Thread.sleep(1000)
}
}
}
步骤4: 在终端消费者中查看数据是否生产成功
步骤5: 从 kafka 读取数据得到 DStream
在sparkmall-common
模块中定义MyKafkaUtil
, 用来从 Kafka 读取数据得到 DStream
package com.atguigu.sparkmall.common.util
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object MyKafkaUtil {
val config = ConfigurationUtil("config.properties")
val broker_list = config.getString("kafka.broker.list")
// kafka消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> broker_list, //用于初始化链接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用于标识这个消费者属于哪个消费团体
"group.id" -> "commerce-consumer-group",
//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
//可以使用这个配置,latest自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
//如果是true,则这个消费者的偏移量会在后台自动提交,但是kafka宕机容易丢失数据
//如果是false,会需要手动维护kafka偏移量. 本次我们仍然自动维护偏移量
"enable.auto.commit" -> (true: java.lang.Boolean)
)
/*
创建DStream,返回接收到的输入数据
LocationStrategies:根据给定的主题和集群地址创建consumer
LocationStrategies.PreferConsistent:持续的在所有Executor之间分配分区
ConsumerStrategies:选择如何在Driver和Executor上创建和配置Kafka Consumer
ConsumerStrategies.Subscribe:订阅一系列主题
*/
def getDStream(ssc: StreamingContext, topic: String) = {
KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent, // 标配. 只要 kafka 和 spark 没有部署在一台设备就应该是这个参数
ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParam))
}
}
步骤6: 创建实时子模块
模块名: sparkmall-realtime
1. 导入依赖
<dependencies>
<dependency>
<groupId>com.atguigu</groupId>
<artifactId>sparkmall-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
</dependency>
</dependencies>
2. 编写RealTimeApp
package com.atgugu.sparkmall.realtime
import com.atgugu.sparkmall.realtime.app.AdsInfo
import com.atguigu.sparkmall.common.util.MyKafkaUtil
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
object RealTimeApp {
def main(args: Array[String]): Unit = {
// 1. 创建 SparkConf 对象
val conf: SparkConf = new SparkConf()
.setAppName("RealTimeApp")
.setMaster("local[*]")
// 2. 创建 SparkContext 对象
val sc = new SparkContext(conf)
// 3. 创建 StreamingContext
val ssc = new StreamingContext(sc, Seconds(1))
// 4. 得到 DStream
val recordDStream: InputDStream[ConsumerRecord[String, String]] = MyKafkaUtil.getDStream(ssc, "ads_log")
// 5. 为了方便后面的计算, 把消费到的字符串封装到对象中
val adsInfoDStream: DStream[AdsInfo] = recordDStream.map {
record =>
val split: Array[String] = record.value.split(",")
AdsInfo(split(0).toLong, split(1), split(2), split(3), split(4))
}
ssc.start()
ssc.awaitTermination()
}
}