第 2 章 广告黑名单实时统计
2.1 需求简介
实现实时的动态黑名单检测机制:将每天对某个广告点击超过阈值(比如:100次)的用户拉入黑名单。
- 黑名单应该是每天更新一次. 如果昨天进入黑名单, 今天应该重新再统计
- 把黑名单写入到 redis 中, 以供其他应用查看
- 已经进入黑名单的用户不再进行检测(提高效率)
2.2 思路
2.2.1 写入到黑名单
黑名单存放在 redis 中, 使用 set, set 中的每个元素表示一个用户.
通过 sql 查询过滤出来每天每广告点击数超过阈值的用户, 然后使用 foreach
写入到 redis 即可.
2.2.2 过滤黑名单的用户点击记录
先从 redis 读取到所有黑名单数据, 然后过滤, 只保留非黑名单用户的点击记录.
2.3 具体实现代码
2.3.1 RedisUtil
工具类
package com.atguigu.realtime.util
import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
object RedisUtil {
private val jedisPoolConfig: JedisPoolConfig = new JedisPoolConfig()
jedisPoolConfig.setMaxTotal(100) //最大连接数
jedisPoolConfig.setMaxIdle(20) //最大空闲
jedisPoolConfig.setMinIdle(20) //最小空闲
jedisPoolConfig.setBlockWhenExhausted(true) //忙碌时是否等待
jedisPoolConfig.setMaxWaitMillis(500) //忙碌时等待时长 毫秒
jedisPoolConfig.setTestOnBorrow(true) //每次获得连接的进行测试
private val jedisPool: JedisPool = new JedisPool(jedisPoolConfig, "hadoop201", 6379)
// 直接得到一个 Redis 的连接
def getJedisClient: Jedis = {
jedisPool.getResource
}
}
2.3.2 BlackListApp
类具体实现:
package com.atguigu.realtime.app
import com.atguigu.realtime.bean.AdsInfo
import com.atguigu.realtime.util.RedisUtil
import org.apache.spark.sql._
import org.apache.spark.sql.streaming.Trigger
import redis.clients.jedis.Jedis
/**
* Author lzc
* Date 2019-08-19 09:37
*
* 需求1: 统计黑名单
*
* 其他需求直接使用过滤后的数据就可以了
*/
object BlackListApp {
def statBlackList(spark: SparkSession, adsInfoDS: Dataset[AdsInfo]): Dataset[AdsInfo] = {
import spark.implicits._
// 1. 过滤黑名单的数据: 如果有用户已经进入黑名单, 则不再统计这个用户的广告点击记录
val filteredAdsInfoDS: Dataset[AdsInfo] = adsInfoDS.mapPartitions(adsInfoIt => { // 每个分区连接一次到redis读取黑名单, 然后把进入黑名单用户点击记录过滤掉
val adsInfoList: List[AdsInfo] = adsInfoIt.toList
if (adsInfoList.isEmpty) {
adsInfoList.toIterator
} else {
// 1. 先读取到黑名单
val client: Jedis = RedisUtil.getJedisClient
val blackList: java.util.Set[String] = client.smembers(s"day:blcklist:${adsInfoList(0).dayString}")
// 2. 过滤
adsInfoList.filter(adsInfo => {
!blackList.contains(adsInfo.userId)
}).toIterator
}
})
// 创建临时表: tb_ads_info
filteredAdsInfoDS.createOrReplaceTempView("tb_ads_info")
// 需求1: 黑名单 每天每用户每广告的点击量
// 2. 按照每天每用户每id分组, 然后计数, 计数超过阈值(100)的查询出来
val result: DataFrame = spark.sql(
"""
|select
| dayString,
| userId
|from tb_ads_info
|group by dayString, userId, adsId
|having count(1) >= 100000
""".stripMargin)
// 3. 把点击量超过 100 的写入到redis中.
result.writeStream
.outputMode("update")
.trigger(Trigger.ProcessingTime("2 seconds"))
.foreach(new ForeachWriter[Row] {
var client: Jedis = _
override def open(partitionId: Long, epochId: Long): Boolean = {
// 打开到redis的连接
client = RedisUtil.getJedisClient
client != null
}
override def process(value: Row): Unit = {
// 写入到redis 把每天的黑名单写入到set中 key: "day:blacklist" value: 黑名单用户
val dayString: String = value.getString(0)
val userId: String = value.getString(1)
client.sadd(s"day:blcklist:$dayString", userId)
}
override def close(errorOrNull: Throwable): Unit = {
// 关闭到redis的连接
if (client != null) client.close()
}
})
.option("checkpointLocation", "C:/blacklist")
.start()
// 4. 把过滤后的数据返回 (在其他地方也可以使用临时表: tb_ads_info)
filteredAdsInfoDS
}
}
2.3.3 RealtimeApp
package com.atguigu.realtime.app
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Date
import com.atguigu.realtime.bean.AdsInfo
import org.apache.spark.sql._
/**
* Author lzc
* Date 2019-08-17 17:52
*/
object RealtimeApp {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.appName("RealtimeApp")
.master("local[*]")
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("warn")
// 从 kafka 读取数据, 为了方便后续处理, 封装数据到 AdsInfo 样例类中
val dayStringFormatter: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val hmStringFormatter: SimpleDateFormat = new SimpleDateFormat("HH:mm")
val adsInfoDS: Dataset[AdsInfo] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "ads_log")
.load
.select("value")
.as[String]
.map(v => {
val split: Array[String] = v.split(",")
val date: Date = new Date(split(0).toLong)
AdsInfo(split(0).toLong, new Timestamp(split(0).toLong), dayStringFormatter.format(date), hmStringFormatter.format(date), split(1), split(2), split(3), split(4))
})
.withWatermark("timestamp", "24 hours") // 都是统计每天的数据, 对迟到24小时的数据废弃不用
// 需求1: 黑名单
val filteredAdsInfoDS: Dataset[AdsInfo] = BlackListApp.statBlackList(spark, adsInfoDS)
// 需求2:
AdsClickCountApp.statAdsClickCount(spark, filteredAdsInfoDS)
}
}