3.2.1 数据采集
数据采集的思路:
配置kafka,启动zookeeper和kafka集群;
创建kafka主题;
启动kafka控制台消费者(此消费者只用于测试使用);
配置flume,监控日志文件;
启动flume监控任务;
运行日志生产脚本;
观察测试。
步骤1: 启动 Zookeeper 和 Kafka 集群
# 自定义的分别启动 3 个Zookeeper节点的脚本
zkstart
# 启动 Kafka集群 分别在 3 个节点启动
kafka-server-start.sh config/server.properties
步骤2: 创建 kafka 主题(topic)
kafka-topics.sh --create --zookeeper hadoop201:2181 --topic calllog --replication-factor 1 --partitions 3
检查主体是否创建成功
kafka-topics.sh --zookeeper hadoop201:2181 --list
步骤3: 启动消费者开始消费数据
kafka-console-consumer.sh --zookeeper hadoop201:2181 -topic calllog --from-beginning
步骤4: 配置 Flume
在 /opt/module/telecom
目录下创建 agent 的配置文件:flume2kafka.conf
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/telecom/calls.csv
a1.sources.r1.shell = /bin/bash -c
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop201:9092,hadoop202:9092,hadoop203:9092
a1.sinks.k1.kafka.topic = calllog
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
步骤5: 启动 Flume
flume-ng agent -c /opt/module/flume/conf -f flume2kafka.conf -n a1
步骤6: 运行生产日志的任务脚本,观察 Kafka 控制台消费者是否成功显示产生的数据
./producer.sh