3.2.2.4 类: HBaseConsumer


package com.atguigu.dataconsumer;

import com.atguigu.dataconsumer.util.HBaseUtil;
import com.atguigu.dataconsumer.util.PropertyUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;

/**
 * 该类主要用于读取 Kafka中缓存的数据,然后调用 HBaseAPI,持久化数据。
 * <p>
 * 是 data-consumer 模块的入口
 */
public class HBaseConsumer {
    public static void main(String[] args) throws InterruptedException {
        final HBaseDao hBaseDao = new HBaseDao();
        HBaseConsumer hBaseConsumer = new HBaseConsumer();
        hBaseConsumer.readDataFromKafka(value -> {
            System.out.println(Bytes.toString(HBaseUtil.getRowKey(value)));
            // 把拿到的数据 put 到 HBase 表中
            hBaseDao.put(value);
        });
    }

    /**
     * 从 Kafka 读取数据
     * 并把读取到的数据的值通过回调接口传递到这个方法的调用者
     *
     * @param callback 回调接口
     * @throws InterruptedException
     */
    public void readDataFromKafka(CallBack<String> callback) throws InterruptedException {
        // 创建 Kafka 消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(PropertyUtil.properties);
        // 订阅主体
        consumer.subscribe(Collections.singletonList(PropertyUtil.getProperty("kafka.topics")));
        // 循环读取数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                if (callback != null) {
                    callback.call(record.value());
                }
                // 测试的时候为了防止读取速度过快, 线程休眠100ms
                Thread.sleep(100);
            }
        }
    }

    /**
     * 定义一个回调接口
     *
     * @param <T>
     */
    public static interface CallBack<T> {
        public abstract void call(T t);
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-01-14 05:27:28

results matching ""

    No results matching ""