3.3 代码开发

代码添加到实时模块中

3.3.1 事件日志样例类

case class EventLog(mid: String,
                    uid: String,
                    appId: String,
                    area: String,
                    os: String,
                    logType: String,
                    eventId: String,
                    pageId: String,
                    nextPageId: String,
                    itemId: String,
                    ts: Long,
                    var logDate: String,
                    var logHour: String)

3.3.2 预警样例类


case class AlertInfo(mid: String,
                     uids: java.util.HashSet[String],
                     itemIds: java.util.HashSet[String],
                     events: java.util.List[String],
                     ts: Long)

3.3.3 预警具体业务

package com.atguigu.gmall0225.realtime.app


import java.text.SimpleDateFormat
import java.util
import java.util.Date

import com.alibaba.fastjson.JSON
import com.atguigu.gmall0225.common.util.{GmallConstant, MyESUtil}
import com.atguigu.gmall0225.realtime.bean.{AlertInfo, EventLog}
import com.atguigu.gmall0225.realtime.util.MyKafkaUtil
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.control.Breaks._

/**
  * Author lzc
  * Date 2019-07-23 08:50
  */
object AlertApp {
    def main(args: Array[String]): Unit = {
        // 1. 从kafka消费数据(事件日志)
        val conf: SparkConf = new SparkConf().setAppName("DAUApp").setMaster("local[1]")
        val ssc = new StreamingContext(conf, Seconds(5))
        val sourceDStream: InputDStream[(String, String)] = MyKafkaUtil.getKafkaStream(ssc, GmallConstant.TOPIC_EVENT)

        // 2. 添加窗口, 调整数据结构
        val eventLogDStream: DStream[(String, EventLog)] = sourceDStream
            .window(Seconds(5 * 60), Seconds(5))
            .map {
                case (_, jsonString) => {
                    val log = JSON.parseObject(jsonString, classOf[EventLog])
                    val date = new Date(log.ts)
                    log.logDate = new SimpleDateFormat("yyyy-MM-dd").format(date)
                    log.logHour = new SimpleDateFormat("HH").format(date)
                    (log.mid, log)
                }
            }

        // 3. 按照 uid 分组
        val groupedEventLogDStream: DStream[(String, Iterable[EventLog])] = eventLogDStream.groupByKey

        // 4. 预警的业务逻辑
        val checkCouponAlertDStream: DStream[(Boolean, AlertInfo)] = groupedEventLogDStream.map {
            case (mid, logIt) => {
                val uids: util.HashSet[String] = new util.HashSet[String]()
                val itemIds: util.HashSet[String] = new util.HashSet[String]()
                val eventIds: util.ArrayList[String] = new util.ArrayList[String]()

                var isBrowserProduct: Boolean = false // 是否浏览商品, 默认没有浏览
                // 1. 遍历这个设备上5分钟内的所有事件日志
                breakable {
                    logIt.foreach(log => {
                        eventIds.add(log.eventId)
                        // 2. 记录下领优惠全的所有用户
                        if (log.eventId == "coupon") {
                            uids.add(log.uid) // 领优惠券的用户id
                            itemIds.add(log.itemId) // 用户领券的商品id
                        } else if (log.eventId == "clickItem") { // 如果有浏览商品
                            isBrowserProduct = true
                            break
                        }
                    })
                }
                //2. 组合成元组  (是否预警, 预警信息)
                (!isBrowserProduct && uids.size() >= 3, AlertInfo(mid, uids, itemIds, eventIds, System.currentTimeMillis()))
            }
        }
        // 5. 过滤掉不需要报警的信息
        val filteredDStream: DStream[AlertInfo] = checkCouponAlertDStream.filter(_._1).map(_._2)
        // 6. 把预警信息写入到 ES

        ssc.start()
        ssc.awaitTermination()

    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-10-08 23:56:19

results matching ""

    No results matching ""