10.1 从 Canal 读取数据
创建子项目: gmall-canal
10.1.1 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>gmall</artifactId>
<groupId>com.atguigu.dw</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>gmall-canal</artifactId>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client -->
<!--canal 客户端, 从 canal 服务器读取数据-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<!-- kafka 客户端 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>com.atguigu.dw</groupId>
<artifactId>gmall-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
10.1.2 从 Canal 服务读取数据的客户端
package com.atguigu.dw.gamallcanal
import java.net.InetSocketAddress
import com.alibaba.otter.canal.client.{CanalConnector, CanalConnectors}
import com.alibaba.otter.canal.protocol.CanalEntry.{EntryType, RowChange}
import com.alibaba.otter.canal.protocol.{CanalEntry, Message}
import com.atguigu.dw.gamallcanal.util.CanalHandler
import com.google.protobuf.ByteString
/**
* Author lzc
* Date 2019/5/17 3:22 PM
*/
object CanalClient {
def main(args: Array[String]): Unit = {
// 1. 创建能连接到 Canal 的连接器对象
val connector: CanalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop201", 11111), "example", "", "")
// 2. 连接到 Canal
connector.connect()
// 3. 监控指定的表的数据的变化
connector.subscribe("gmall.order_info")
while (true) {
// 4. 获取消息 (一个消息对应 多条sql 语句的执行)
val msg: Message = connector.get(100) // 一次最多获取 100 条 sql
// 5. 个消息对应多行数据发生了变化, 一个 entry 表示一条 sql 语句的执行
val entries: java.util.List[CanalEntry.Entry] = msg.getEntries
import scala.collection.JavaConversions._
if (entries.size() > 0) {
// 6. 遍历每行数据
for (entry <- entries) {
// 7. EntryType.ROWDATA 只对这样的 EntryType 做处理
if (entry.getEntryType == EntryType.ROWDATA) {
// 8. 获取到这行数据, 但是这种数据不是字符串, 所以要解析
val value: ByteString = entry.getStoreValue
val rowChange: RowChange = RowChange.parseFrom(value)
// 9.定义专门处理的工具类: 参数 1 表名, 参数 2 事件类型(插入, 删除等), 参数 3: 具体的数据
CanalHandler.handle(entry.getHeader.getTableName, rowChange.getEventType, rowChange.getRowDatasList)
}
}
} else {
println("没有抓取到数据...., 2s 之后重新抓取")
Thread.sleep(2000)
}
}
}
}
10.1.3 专门处理数据的工具类CanalHandler
package com.atguigu.dw.gamallcanal.util
import java.util
import com.alibaba.otter.canal.protocol.CanalEntry
import com.alibaba.otter.canal.protocol.CanalEntry.{EventType, RowData}
/**
* Author lzc
* Date 2019/5/17 4:09 PM
*/
object CanalHandler {
/**
* 处理从 canal 取来的数据
*
* @param tableName 表名
* @param eventType 事件类型
* @param rowDataList 数据类别
*/
def handle(tableName: String, eventType: EventType, rowDataList: util.List[RowData]) = {
import scala.collection.JavaConversions._
if ("order_info" == tableName && eventType == EventType.INSERT && rowDataList.size() > 0) {
// 1. rowData 表示一行数据, 通过他得到每一列. 首先遍历每一行数据
for (rowData <- rowDataList) {
// 2. 得到每行中, 所有列组成的列表
val columnList: util.List[CanalEntry.Column] = rowData.getAfterColumnsList
for (column <- columnList) {
// 3. 得到列名和列值
println(column.getName + ":" + column.getValue)
}
}
}
}
}