第 3 章 实时采集数据

更新 CanalHandler, 增加对其他两个表的监控: order_detail, user_info

package com.atguigu.gmall0225.canal.myutil

import java.util

import com.alibaba.fastjson.JSONObject
import com.alibaba.google.common.base.CaseFormat
import com.alibaba.otter.canal.protocol.CanalEntry
import com.alibaba.otter.canal.protocol.CanalEntry.EventType

/**
  * Author lzc
  * Date 2019-07-26 09:12
  */
object CanalHandler2 {
    def handle(tableName: String, rowDataList: util.List[CanalEntry.RowData], eventType: EventType) = {

        if (tableName == "order_info" && eventType == EventType.INSERT && !rowDataList.isEmpty) {
            sendToKafka(rowDataList, "gmall_order_info")
        }else if(tableName == "order_detail" && eventType == EventType.INSERT && !rowDataList.isEmpty){
            sendToKafka(rowDataList, "gmall_order_detail")
        }else if(tableName == "user_info" && (eventType == EventType.INSERT || eventType == EventType.UPDATE) && !rowDataList.isEmpty){
            sendToKafka(rowDataList, "gmall_user_info")
        }
    }

    /**
      * 发送数据到kafka
      * @param rowDataList
      * @param topic
      */
    private def sendToKafka(rowDataList: util.List[CanalEntry.RowData], topic: String): Unit = {
        import scala.collection.JavaConversions._
        for (rowData <- rowDataList) { // 一个rowData表示一行数据
            // 取出每行的每列, 写成json格式的字符串存入kafka  {name: "李四", age: 20,....}
            val obj = new JSONObject()
            val columns: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
            for (column <- columns) {
                val key: String = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName) // 列名
                val value: String = column.getValue
                obj.put(key, value)

            }
            // 写入到kafka
            MyKafkaSenderUtil.send(topic, obj.toJSONString)
        }
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-10-08 23:56:19

results matching ""

    No results matching ""