5.2 拦截器案例

需求

实现一个简单的 2 个 interceptor 组成的拦截链。

  • 第 1 个 interceptor 会在消息发送前将时间戳信息加到消息 value 的最前部;

  • 第 2 个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。


实例操作

时间拦截器

package com.atguigu.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class TimeInterceptor implements ProducerInterceptor<String, String> {
    /**
     * 这个方法在key-value被序列化和分区被分配之前调用.
     * 主要用来对 ProducerRecord 进行修改.
     * 返回修改后的一个新的 ProducerRecord 即可
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {

        return new ProducerRecord<String, String>(
                record.topic(),
                record.partition(),
                record.key(),
                System.currentTimeMillis() + " : " + record.value()
        );
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }

    @Override
    public void close() {

    }

    /**
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

统计消息的拦截器

package com.atguigu.kafka.interceptor;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class CountInterceptor implements ProducerInterceptor<String, String> {

    private int successCount = 0;
    private int failCount = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        // 不需要修改记录, 所以要把原记录返回, 否则将来发送给 broker 的都是null了
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(exception == null){  // 发送成功
            successCount++;
        }else{ // 发送失败
            failCount++;
        }
    }

    @Override
    public void close() {
        System.out.println("successCount = " + successCount);
        System.out.println("failCount = " + failCount);
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

Producer 主程序

package com.atguigu.kafka.interceptor;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class InterceptorProducer {
    public static void main(String[] args) {
        // 1. 设置配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop201:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 构建拦截器

        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.atguigu.kafka.interceptor.TimeInterceptor");
        interceptors.add("com.atguigu.kafka.interceptor.CountInterceptor");
        props.put("interceptor.classes", interceptors);

        // 3. 创建生产者

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        // 4. 发送消息

        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("first", "hadoop" + i);
            producer.send(record);
        }
        // 5. 关闭 Producer. 一定要关闭Producer否则不会调用Interceptor的close方法
        producer.close();

    }
}

测试

在 kafka 上启动消费者,然后运行客户端java程序

观察 java 平台控制台输出数据

Copyright © 尚硅谷大数据 2019 all right reserved,powered by Gitbook
该文件最后修订时间: 2019-01-25 03:29:58

results matching ""

    No results matching ""