Kafka Producer 的一些总结

总结一些 Kafka Producer 端的概念。

ISR

对于每一个 Topic,我们都可以设置这个 Topic 包含几个 Partition,每个 Partition 负责存储这个 Topic 一部分的数据。在 Kafka 的 Broker 集群中,每台机器上都存储了一些 Partition,也就存放了 Topic 的一部分数据,这样就实现了 Topic 的数据分布式存储在一个 Broker 集群上。但是有一个问题,万一一个 Kafka Broker 宕机了,此时上面存储的数据不就丢失了吗?

几乎所有分布式系统 hdfs/codis/hbase/tidb 内部都有一套多副本冗余的机制,在 kafka 集群中,每个 Partition 都有多个副本,其中一个副本叫做 leader,其他的副本叫做 follower。任何一个 Partition,只有 Leader 是对外提供读写服务的。也就是说,如果有一个客户端往一个 Partition 写入数据,此时一般就是写入这个 Partition 的 Leader 副本。然后 Leader 副本接收到数据之后,Follower 副本会不停的给他发送请求尝试去拉取最新的数据,拉取到自己本地后,写入磁盘中。

了解了 Partiton 的多副本同步数据的机制后,我们再来看一下 ISR。ISR 全称是 “In-Sync Replicas”,也就是保持同步的副本,通俗来说就是跟 Leader 始终保持同步的 Follower 有哪些。如果说某个 Follower 所在的 Broker 因为 JVM FullGC 之类的问题导致无法及时从 Leader 拉取同步数据,那么是不是会导致 Follower 的数据比 Leader 要落后很多?

所以这个时候,就意味着 Follower 已经跟 Leader 不再处于同步的关系了。但是只要 Follower 一直及时从 Leader 同步数据,就可以保证他们是处于同步的关系的。每个 Partition 都有一个 ISR,这个 ISR 里一定会有 Leader 自己,因为 Leader 肯定数据是最新的,然后就是那些跟 Leader 保持同步的 Follower,也会在 ISR 里。

在补充几个基础概念:

  • AR:assigned replicas。通常情况下,每个分区都会被分配多个副本。具体的副本数量由参数。offsets.topic.replication.factor 指定。分区的AR数据保存在Zookeeper的/brokers/topics/节点中
  • ISR:in-sync replicas。与leader副本保持同步状态的副本集合(leader副本本身也在ISR中)。ISR数据保存在Zookeeper的/brokers/topics//partitions//state节点中
  • High Watermark:副本高水位值,简称HW,它表示该分区最新一条已提交消息(committed message)的位移
  • LEO:log end offset。从名字上来看似乎是日志结束位移,但其实是下一条消息的位移,即追加写下一条消息的位移。

HW表示的是最新一条已提交消息的位移。注意这里是已提交的,说明这条消息已经完全备份过了(fully replicated),而LEO可能会比HW值大——因为对于分区的leader副本而言,它的日志随时会被追加写入新消息,而这些新消息很可能还没有被完全复制到其他follower副本上,所以LEO值可能会比HW值大。两者的关系可参考下图:

img

消费者只能消费到HW线以下的消息,即上图中绿色的部分;而紫色的消息就是未完全备份的消息,因而不能被消费者消费。

ACK

在理解上面的预备知识后,就可以正式看一下 ack 参数了。这个参数实际上有三种常见的值可以设置,分别是:0、1 和 all。

第一种选择是把acks参数设置为 0,表示 producer 客户端只要把消息发送出去就行,哪怕在 Partition Leader 上没有落到磁盘都不 care,直接就认为这个消息发送成功了。如果采用这种设置的话,很明显就会存在 Partition Leader 所在 Broker 直接挂了,结果客户端还认为消息发送成功了,那么这条消息就丢失了。

第二种选择是设置 acks = 1,表示 Partition Leader 接收到消息而且写入本地磁盘了就认为成功了,不管他其他的 Follower 有没有同步过去这条消息了。这种设置其实是kafka默认的设置,也就是如果不设置 acks 这个参数,只要 Partition Leader 写成功就算成功。这样存在的一个问题就是,Partition Leader 刚刚接收到消息,Follower 还没来得及同步过去,结果 Leader 所在的 broker 宕机了,此时也会导致这条消息丢失,但是客户端会已经认为发送成功了。

最后一种情况,就是设置 acks=all,表示 Partition Leader 接收到消息之后,还必须要求 ISR 列表里跟 Leader 保持同步的那些 Follower 都要把消息同步过去,才能认为这条消息是写入成功了。如果说 Partition Leader 刚接收到了消息,但是结果 Follower 没有收到消息,此时 Leader 宕机了,那么客户端会感知到这个消息没发送成功,他会重试再次发送消息过去。此时可能 Partition 2 的 Follower 变成 Leader 了,此时 ISR 列表里只有最新的这个 Follower 转变成的 Leader 了,那么只要这个新的 Leader 接收消息就算成功了。关于 isr 可以通过配置 min.insync.replicas 来决定一个 isr 中至少要有多少个 replia。

缓冲区

Kafka 的客户端发送数据到服务器,一般都是要经过缓冲的,也就是说通过 KafkaProducer 发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集成一个一个的 Batch,再发送到 Broker 上去的。所以这个 buffer.memory 的本质就是用来约束 KafkaProducer 能够使用的内存缓冲的大小的默认值是 32MB。

在内存缓冲里大量的消息会缓冲在里面,形成一个一个的 Batch,每个 Batch 里包含多条消息。然后 KafkaProducer 有一个 Sender 线程会把多个 Batch 打包成一个 Request 发送到 Broker 上去。

那么如果要是内存设置的太小,可能导致一个问题:消息快速的写入内存缓冲里面,但是 Sender 线程来不及把 Request 发送到 Kafka 服务器。这样是不是会造成内存缓冲很快就被写满?一旦被写满,就会阻塞用户线程,不让继续往 Kafka 写消息了。

batch.size 是另外一个值得关注的参数。这个东西决定了每个 Batch 要存放多少数据就可以发送出去了。比如说要是给一个 Batch 设置成是 16KB 的大小,那么里面凑够 16KB 的数据就可以发送了。这个参数的默认值是 16KB。

提升 batch 的大小,可以允许更多的数据缓冲在里面,那么一次 Request 发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是也不能无限的大,过于大了之后,要是数据老是缓冲在 Batch 里迟迟不发送出去,那么发送消息的延迟就会很高。比如说,一条消息进入了 Batch,但是要等待5秒钟 Batch 才凑满了64KB,才能发送出去。那这条消息的延迟就是5秒钟。

要是一个 Batch 迟迟无法凑满,此时就需要引入另外一个参数了,linger.ms。这个参数表示一个 Batch 被创建之后,最多过多久,不管这个 Batch 有没有写满,都必须发送出去了。linger.ms 决定了你的消息一旦写入一个 Batch,最多等待这么多时间,他一定会跟着 Batch 一起发送出去。避免一个 Batch 迟迟凑不满,导致消息一直积压在内存里发送不出去的情况。

max.request.size 这个参数决定了每次发送给 Broker 请求的最大大小。如果发送的消息都是那种大的报文消息,每条消息都是很多的数据,一条消息可能都要 10KB。此时 batch.size 和 buffer.memory 以及 max.request.size 都可以调大点。

重试

retriesretries.backoff.ms 决定了重试机制,也就是如果一个请求失败了可以重试几次,每次重试的间隔是多少毫秒。通过两个参数结合 ack 机制保证不丢失消息非常关键。