kafka學習總結003 --- 生產者分區策略


分區結構

如下圖是官網上kafka三級結構圖,三級結構為主題---分區---消息,並且每條消息只能保存在某一個分區內;

kafka雖然是一個隊列但是不保證消息有序,但是對於分區來說消息是有序的

為什么分區

分區的作用就是提供生產消費數據負載分擔的能力;不同的分區被分配在不同的節點,數據的生產消費是基於分區粒度進行的,

這樣每個節點都能獨立的執行各自分區的數據生產消費,而且我們可以按需增加新的節點提升系統的吞吐量。

分區策略

所謂的生產者分區策略就是決定生產消息時,如何寫入到不同的分區中;kafka提供了默認的分區策略,當然我們也能自定義分區策略(通過指定生產者partitioner.class參數)

kafka提供了三種分區策略:輪詢策略、隨機策略、按消息鍵保序策略

1、輪詢策略

這是默認的分區策略,能夠保證消息最大限度的被平均分配到所有分區

2、隨機策略(已經過時了)

也就是生產的消息被隨機分配到不同的分區,實際的表現遜於輪詢策略;實際上,老的kafka版本用的是隨機策略,新的版本已經改成輪詢策略了

3、按消息鍵保序策略

生產消息時,為每條消息定義消息鍵key,消息鍵是一個有着明確含義的業務字符串,可以是業務ID之類的;通過消息鍵,相同的消息鍵的消息能被保證寫入相同的分區

注意kafka不同版本的差異:

kafka-2.4.0之前的版本提供了一個默認策略:org.apache.kafka.clients.producer.internals.DefaultPartitioner

這個分區策略的流程是:如果消息key為空,先隨機選擇一個分區,后續按照輪詢策略分配分區

kafka-2.4.0及以后的版本做了如下的變更,提供了如下三個分區策略:

1、org.apache.kafka.clients.producer.internals.DefaultPartitioner

默認策略,做的變動是:如果消息鍵為空消息發送的分區先保持粘性(也就是先向同一個分區發送);如果當前batch已滿或者linger.ms超時已經發送,那么新的消息會發給另外的分區(選擇策略還是Round-Robin)

這樣變動的原因個人理解是為了減少客戶端和服務端的交互次數,消息按照batchSize發送

2、org.apache.kafka.clients.producer.RoundRobinPartitioner

輪詢策略,沒啥可說的

3、org.apache.kafka.clients.producer.UniformStickyPartitioner

默認策略,消息key為空場景

自定義分區策略

kafka java api提供了一個接口,用於自定義分區策略:org.apache.kafka.clients.producer.Partitioner

public interface Partitioner extends Configurable, Closeable {
    int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);

    void close();

    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

RoundRobinPartitioner(輪詢策略)源碼:

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();

    public RoundRobinPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int nextValue = this.nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return ((PartitionInfo)availablePartitions.get(part)).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = (AtomicInteger)this.topicCounterMap.computeIfAbsent(topic, (k) -> {
            return new AtomicInteger(0);
        });
        return counter.getAndIncrement();
    }

    public void close() {
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM