5.2 从 Kafka 读取数据
5.2.1 配置文件config.properties
# Kafka配置
kafka.broker.list=hadoop201:9092,hadoop202:9092,hadoop203:9092
kafka.group=bigdata
# Redis配置
redis.host=hadoop201
redis.port=6379
5.2.2 配置文件log4j.properties
log4j.appender.atguigu.MyConsole=org.apache.log4j.ConsoleAppender
log4j.appender.atguigu.MyConsole.target=System.err
log4j.appender.atguigu.MyConsole.layout=org.apache.log4j.PatternLayout
log4j.appender.atguigu.MyConsole.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %10p (%c:%M) - %m%n
log4j.rootLogger=error,atguigu.MyConsole
5.2.3 读取配置文件PropertiesUtil
package com.atguigu.dw.gmall.realtime.util
import java.io.InputStream
import java.util.Properties
/**
* Author lzc
* Date 2019/5/15 11:31 AM
*/
object PropertiesUtil {
private val is: InputStream = ClassLoader.getSystemResourceAsStream("config.properties")
private val properties = new Properties()
properties.load(is)
def getProperty(propertyName: String): String = properties.getProperty(propertyName)
def main(args: Array[String]): Unit = {
println(getProperty("kafka.broker.list"))
}
}
5.2.4 MyKafkaUtil
可以返回一个kafkaStream
, 我们使用高级消费者
package com.atguigu.dw.gmall.realtime.util
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
/**
* Author lzc
* Date 2019/5/15 11:19 AM
*/
object MyKafkaUtil {
def getKafkaStream(ssc: StreamingContext, topic: String): InputDStream[(String, String)] = {
val params: Map[String, String] = Map(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> PropertiesUtil.getProperty("kafka.broker.list"),
ConsumerConfig.GROUP_ID_CONFIG -> PropertiesUtil.getProperty("kafka.group")
)
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, params, Set(topic))
}
}
5.2.5 样例类:StartupLog
为了数据访问方便, 把用户访问记录封装在样例类中.
由于 ES 要建索引, 为了以后从 ES 中查询数据的方便, 把日期拆成多个需要的格式.
case class StartupLog(mid: String,
uid: String,
appId: String,
area: String,
os: String,
channel: String,
logType: String,
version: String,
ts: Long,
var logDate: String,
var logHour: String)
5.2.6 RedisUtil
package com.atguigu.dw.gmall.realtime.util
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object RedisUtil {
val host = PropertiesUtil.getProperty("redis.host")
val port = PropertiesUtil.getProperty("redis.port").toInt
private val jedisPoolConfig = new JedisPoolConfig()
jedisPoolConfig.setMaxTotal(100) //最大连接数
jedisPoolConfig.setMaxIdle(20) //最大空闲
jedisPoolConfig.setMinIdle(20) //最小空闲
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(false) //每次获得连接的进行测试
private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, host, port)
// 直接得到一个 Redis 的连接
def getJedisClient: Jedis = {
jedisPool.getResource
}
}
5.2.7 DauApp
在保存到 Redis 的时候, key为dau:日期
, value 使用 set 结构.
value 存的值为uid
package com.atguigu.dw.gmall.realtime.app
import com.alibaba.fastjson.JSON
import com.atguigu.dw.gmall.common.constant.GmallConstant
import com.atguigu.dw.gmall.realtime.bean.StartupLog
import com.atguigu.dw.gmall.realtime.util.{MyKafkaUtil, RedisUtil}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis
import redis.clients.util.JedisClusterHashTagUtil
object DauApp {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DauApp")
val ssc = new StreamingContext(conf, Seconds(5))
val sourceStream: InputDStream[(String, String)] =
MyKafkaUtil.getKafkaStream(ssc, GmallConstant.TOPIC_STARTUP)
// 1. 调整数据结构
val starupLogDSteam = sourceStream.map {
case (_, log) => JSON.parseObject(log, classOf[StartupLog])
}
// 2. 保存到 redis
starupLogDSteam.foreachRDD(rdd => {
rdd.foreachPartition(it => {
val client: Jedis = RedisUtil.getJedisClient
it.foreach(startupLog => {
// 存入到 Redis value 类型 set, 存储 uid
val key = "dau:" + startupLog.logDate
client.sadd(key, startupLog.uid)
})
client.close()
})
})
ssc.start()
ssc.awaitTermination()
}
}