10.2 读取的数据发送到 Kafka

10.2.1 实现 Kafka 生产者

package com.atguigu.dw.gamallcanal.util

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

/**
  * Author lzc
  * Date 2019/5/17 4:45 PM
  */
object MyKafkaSender {
    val props = new Properties()
    // Kafka服务端的主机名和端口号
    props.put("bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9093")
    // key序列化
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    // value序列化
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)

    def sendToKafka(topic: String, content: String) = {
        producer.send(new ProducerRecord[String, String](topic, content))
    }
}

10.2.2 使用生产者向 Kafka 生成数据

考虑到将来存储到 ES 中的数据是 Json 格式, 所以, 我们在 Kafka 存储的的时候也存储为 Json 格式的.

改写工具类: CanalHandler

package com.atguigu.dw.gamallcanal.util

import java.util

import com.alibaba.fastjson.JSONObject
import com.alibaba.otter.canal.protocol.CanalEntry
import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowData}
import com.atguigu.dw.gmall.common.constant.GmallConstant

/**
  * Author lzc
  * Date 2019/5/17 4:09 PM
  */
object CanalHandler {
    /**
      * 处理从 canal 取来的数据
      *
      * @param tableName   表名
      * @param eventType   事件类型
      * @param rowDataList 数据类别
      */
    def handle(tableName: String, eventType: EventType, rowDataList: util.List[RowData]) = {
        import scala.collection.JavaConversions._
        if ("order_info" == tableName && eventType == EventType.INSERT && rowDataList.size() > 0) {
            val obj: JSONObject = new JSONObject()
            // 1. rowData 表示一行数据, 通过他得到每一列. 首先遍历每一行数据
            for (rowData <- rowDataList) {
                // 2. 得到每行中, 所有列组成的列表
                val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
                for (column <- columnList) {
                    // 3. 得到列名和列值
                    // key下划线转成驼峰
                    val newColumn: String = CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, column.getName)
                    obj.put(newColumn, column.getValue)
                }
            }
            // 4. 发送到 Kafka
            MyKafkaSender.sendToKafka(GmallConstant.TOPIC_ORDER, obj.toJSONString)
        }
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-10-08 23:56:19

results matching ""

    No results matching ""