3.1 Kafka 生产过程分析
3.1.1 写入方式
发送消息的主要步骤:
我们从创建一个 ProducerRecord 对象开始, ProducerRecord 对象需要包含目标主题和要发送的内容。
我们还可以指定键或分区。 在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
然后数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。 如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。
选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。 紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区 broker 上。
服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个RecordMetaData 对象, 它包含了主题和分区信息, 以及记录在分区里的偏移量。 如果写入失败, 则会返回一个错误。 生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。
producer 采用推(push)模式将消息发布到 broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率。
3.1.2 分区(partition)
消息发送时都被发送到一个 topic,其本质就是一个目录,而 topic 是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
我们可以看到,每个 Partition 中的消息都是有序的,生产的消息被不断追加到Partition log 上,其中的每一个消息都被赋予了一个唯一的 offset 值。
3.1.2.1 分区的原因
方便在集群中扩展,每个 Partition 可以通过调整以适应它所在的机器,而一个topic 又可以有多个 Partition 组成,因此整个集群就可以适应任意大小的数据了;
可以提高并发,因为可以以 Partition 为单位读写了。
3.1.2.2 分区的原则
指定了 patition,则直接使用;
未指定 patition 但指定 key,通过对 key 的 value 进行 hash 出一个patition;
patition 和 key 都未指定,使用轮询选出一个 patition。
KafakProducer
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
org.apache.kafka.clients.producer.internals.DefaultPartitioner
类:/** * Compute the partition for the given record. * * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } }
3.1.3 replication
同一个 partition 可能会有多个 replication(对应 server.properties 配置中的 default.replication.factor=N)。
没有 replication 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。
引入 replication 之后,同一个 partition 可能会有多个 replication,而这时需要在这些 replication 之间选出一个leader,producer 和 consumer 只与这个leade r交互,其它 replication 作为 follower 从 leader 中复制数据。
一段 leader 挂了, 则会从其他的 follower 中选举出新的 leader, 新的 leader 就会承担起读写的职责.
复本数不能大于 broker 的数量.
3.1.4 写入流程
producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该partition 的 leader
producer 将消息发送给该 leader
leader 将消息写入本地 log
followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
leader 收到所有 ISR 中的 replication 的ACK后,增加 HW(high watermark,最后commit 的offset)并向 producer 发送 ACK
3.1.5 acks
ack 是消息确认模式.
主要有 3 个值:
0
意味着如果生产者能够通过网络把消息发送出去,那么就认为消息已成功写入 Kafka。 在这种情况下还 是有可能发生错误,比如发送的对象无法被序列化或者网卡发生故障,如果是分区离线或整个集群长时间不可用,那就不会收到任何错误.在
acks=0
模式下的运行速 度是非常快的. 你可以得到惊人的吞吐量和带宽利用率,不过如果选择了这种模式,一定会丢失一些消息。1
意味若首领在收到消息并把它写入到分区数据文件时会返回确认或错误响应. 不过在这个模式 下仍然有可能丢失数据,比如消息已经成功写入首领,但在消息被复制 到跟随者副本之前首领发生崩愤。all
意味着首领在返回确认或错误响应之前,会等待所有同步副本都收到悄息。 这是最保险的做一一生产者也会一直重试直到消息被成功提交。 不过这也是最慢的做法,生产者在继续发送其他消息之前需要等待所有副本都收到当前的消息。 可以通过使用异步模式和更大的批次来加快速度,但这样做通常会降低吞吐量。