从源码理解 Kafka 的分区选择策略

  1. 1. 源码分析
  2. 2. 结论

Kafka 中将 Topic 分为 partition,消费者从 partition 中消费消息。消息是怎么确定发住哪个 partition 呢?其实默认有两种分区选择策略:

  1. 消息 key 为空时随机选择
  2. 消息 key 不为空时,对 key 进行 HASH,然后对分区数取模

源码分析

KafkaProducerdoSend 方法中调用了以下方法进行分区选择,如果指定了分区,则直接使用指定的分区。如果没有指定则通过默认的 partitioner 来计算出分区。

1
2
3
4
5
6
7
8
// org.apache.kafka.clients.producer.KafkaProducer#partition
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

默认使用的分区处理类是 DefaultPartitioner,其内的实现如下:

1
2
3
4
5
6
7
8
9
10
// org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

如果提供了 key 值,则用 murmur2 的方法对 key 进行 HASH 并对 numPartitions(Topic 的分区数)取模。

当 Key 为空时,通过 stickyPartitionCachepartition 方法计算出分区。StickyPartitionCache 是 Kafka Client 内部的一个类,用于管理 Topic 的分区选择的逻辑和缓存。

1
2
3
4
5
6
7
8
// org.apache.kafka.clients.producer.internals.StickyPartitionCache#partition
public int partition(String topic, Cluster cluster) {
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}

indexCache 是一个 ConcurrentHashMap 对象,对应的是 Topic -> Partition 的映射,如果该值不存在则调用 nextPartition 方法选择一个分区并缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// org.apache.kafka.clients.producer.internals.StickyPartitionCache#nextPartition
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set or that the partition that
// triggered the new batch matches the sticky partition that needs to be changed.
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}

第一个分支条件 oldPart == null || oldPart == prevPartition 中:

  1. oldPart == null 表示没有分区缓存,对应着新增 topic 或第一次调用的情况
  2. oldPart == prevPartition 当创建了新的 Batch 时触发了此方法的情况,对分区缓存进行更新

满足以上条件之一就进入真正选择分区的逻辑。

关于 Batch:

  1. 每个 batch 的数据属于同一个 partition
  2. Sticker Partitioner 是 Kafka 对空 key 的分区选择进行的优化,尽量在一个 Batch 中提交多几条数据。当 Batch 满或 linger.ms 时间到 才触发选择新的分区,在这之前,所有消息都会发到缓存的分区。原来的逻辑会每条消息都选择新的分区,可能造成很多 batch 太小,客户端请求过多,降低呑吐

后面的逻辑根据可用分区数进行处理,决定新的分区:

  1. 如果无可用分区,从所有分区里随机选择一个分区
  2. 只有一个可用分区,直接选用该分区
  3. 多个可用分区,随机取一个不等于当前分区的可用分区

结论

  1. 消息 key 为空时,如果有缓存分区,使用缓存分区,没有缓存则随机选择
  2. 消息 key 不为空时,对 key 进行 HASH,然后对分区数取模

参考:

kafka生产者分区优化
Apache Kafka Producer Improvements with the Sticky Partitioner