4.3.2 Kafka 消费者之低级 API

实现使用低级 API 读取指定 topic,指定 partition,指定 offset 的数据。

消费者使用低级API 的主要步骤

步骤 主要工作
1 根据指定的分区从主题元数据中找到分区领导 ✅
2 获取分区最新的消费进度
3 从主副本拉取分区的消息 ✅
4 识别主副本的变化,重试

代码

官方实例代码: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

我们精简代码:

package com.atguigu.kafka.consume;


import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class LowerConsumer {


    public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
        // 定义 3 个 brokers

        ArrayList<String> brokers = new ArrayList<>(Arrays.asList("hadoop201", "hadoop202", "hadoop303"));
        // 定义 brokers 的端口号
        int port = 9092;
        // 定义 topic
        String topic = "first";
        // 定义 partition
        int partition = 0;
        int offset = 0;
        LowerConsumer lowerConsumer = new LowerConsumer();
        lowerConsumer.getData(brokers, port, topic, partition, offset);

    }

    /**
     * 查找指定 brokers 指定 topic 指定 partition 的 leader
     *
     * @param brokers   brokers
     * @param port
     * @param topic
     * @param partition
     * @return
     */
    public PartitionMetadata findLeader(List<String> brokers, int port, String topic, int partition) {
        for (String broker : brokers) {

            SimpleConsumer consumer
                    = new SimpleConsumer(broker, port, 100000, 64 * 1024, "lookup_leader");

            // 创建 TopicMetadataRequest 对象, 使用 consumer 发送请求的时候使用
            TopicMetadataRequest request = new TopicMetadataRequest(Arrays.asList(topic));
            // 发送请求, 返回值为 Topic 元数据信息
            TopicMetadataResponse topicMetadataResponse = consumer.send(request);
            // 拿到每个 topic 的元数据信息
            List<TopicMetadata> topicMetadataList = topicMetadataResponse.topicsMetadata();
            for (TopicMetadata topicMetadata : topicMetadataList) {
                // 拿到每个 topic 的所有分区元数据信息
                List<PartitionMetadata> partitionMetadataList = topicMetadata.partitionsMetadata();
                for (PartitionMetadata partitionMetadata : partitionMetadataList) {
                    if (partition == partitionMetadata.partitionId()) {

                        return partitionMetadata;

                    }
                }
            }
        }
        return null;
    }

    /**
     * 获取指定brokers 中指定topic的指定分区的指定offset开始的数据
     *
     * @param brokers
     * @param port
     * @param topic
     * @param partition
     * @param offset
     */
    public void getData(List<String> brokers, int port, String topic, int partition, long offset) throws UnsupportedEncodingException, InterruptedException {
        PartitionMetadata leader = findLeader(brokers, port, topic, partition);
        String host = leader.leader().host();
        SimpleConsumer c1 = new SimpleConsumer(host, port, 10000, 64 * 1024, "c1");

        while (true) {
            FetchRequest request = new FetchRequestBuilder()
                    .clientId("c1")
                    // fetchSize 表示一次读取到的最多的字节数. 如果这个值设置过小, 则至少保证一次读取一条记录.
                    // 为了读取效率的提高, 可以把这个值设置的大一些.
                    .addFetch(topic, partition, offset, 100000)
                    .build();
            FetchResponse response = c1.fetch(request);
            ByteBufferMessageSet messageAndOffsets = response.messageSet(topic, partition);
            for (MessageAndOffset messageAndOffset : messageAndOffsets) {
                ByteBuffer buffer = messageAndOffset.message().payload();

                byte[] data = new byte[buffer.limit()];
                buffer.get(data);
                System.out.println(messageAndOffset.offset() + " : " + new String(data));
                // 每读完一条数据, 把offset 的值置为最新的offset
                offset = messageAndOffset.offset();
                Thread.sleep(1000);
            }
            if (messageAndOffsets.iterator().hasNext()) {
                // 如果这次确实读到数据了, 所以把offset的值 ++, 下一次读的时候就从新的位置开始读了
                // 如果这次没有读到数据, 表示数据已经读完了, 所以就不需要再 ++ 了
                // 如果这个offset 只是针对当前分区的.
                // 如果想下次启动的时候从上次位置开始读取, 则应该把 offset 的值持久化,, 比如存到Zookeeper 或者 mysql 数据库中.
                offset++;
                System.out.println(offset);
            }
        }
    }
}
Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-01-25 03:29:58

results matching ""

    No results matching ""