分區結構
如下圖是官網上kafka三級結構圖,三級結構為主題---分區---消息,並且每條消息只能保存在某一個分區內;
kafka雖然是一個隊列但是不保證消息有序,但是對於分區來說消息是有序的
為什么分區
分區的作用就是提供生產消費數據負載分擔的能力;不同的分區被分配在不同的節點,數據的生產消費是基於分區粒度進行的,
這樣每個節點都能獨立的執行各自分區的數據生產消費,而且我們可以按需增加新的節點提升系統的吞吐量。
分區策略
所謂的生產者分區策略就是決定生產消息時,如何寫入到不同的分區中;kafka提供了默認的分區策略,當然我們也能自定義分區策略(通過指定生產者partitioner.class參數)
kafka提供了三種分區策略:輪詢策略、隨機策略、按消息鍵保序策略
1、輪詢策略
這是默認的分區策略,能夠保證消息最大限度的被平均分配到所有分區
2、隨機策略(已經過時了)
也就是生產的消息被隨機分配到不同的分區,實際的表現遜於輪詢策略;實際上,老的kafka版本用的是隨機策略,新的版本已經改成輪詢策略了
3、按消息鍵保序策略
生產消息時,為每條消息定義消息鍵key,消息鍵是一個有着明確含義的業務字符串,可以是業務ID之類的;通過消息鍵,相同的消息鍵的消息能被保證寫入相同的分區
注意kafka不同版本的差異:
kafka-2.4.0之前的版本提供了一個默認策略:org.apache.kafka.clients.producer.internals.DefaultPartitioner
這個分區策略的流程是:如果消息key為空,先隨機選擇一個分區,后續按照輪詢策略分配分區
kafka-2.4.0及以后的版本做了如下的變更,提供了如下三個分區策略:
1、org.apache.kafka.clients.producer.internals.DefaultPartitioner
默認策略,做的變動是:如果消息鍵為空消息發送的分區先保持粘性(也就是先向同一個分區發送);如果當前batch已滿或者linger.ms超時已經發送,那么新的消息會發給另外的分區(選擇策略還是Round-Robin)
這樣變動的原因個人理解是為了減少客戶端和服務端的交互次數,消息按照batchSize發送
2、org.apache.kafka.clients.producer.RoundRobinPartitioner
輪詢策略,沒啥可說的
3、org.apache.kafka.clients.producer.UniformStickyPartitioner
默認策略,消息key為空場景
自定義分區策略
kafka java api提供了一個接口,用於自定義分區策略:org.apache.kafka.clients.producer.Partitioner
public interface Partitioner extends Configurable, Closeable { int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6); void close(); default void onNewBatch(String topic, Cluster cluster, int prevPartition) { } }
RoundRobinPartitioner(輪詢策略)源碼:
public class RoundRobinPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap(); public RoundRobinPartitioner() { } public void configure(Map<String, ?> configs) { } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); int nextValue = this.nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (!availablePartitions.isEmpty()) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.toPositive(nextValue) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> { return new AtomicInteger(0); }); return counter.getAndIncrement(); } public void close() { } }