kafka分區策略


為什么分區?

kafka有topic的概念,它是承載真實數據的邏輯容器,而在主題之下還分為若干個分區,也就是說kafka的消息組織方式實際上是三級結構:主題-分區-消息。主題下的每條消息只會保存在某個分區中,而不會在多個分區中被保存多份。

其實分區的作用就是提供負載均衡的能力,不同的分區能夠被放置在不同節點的機器上,而數據的讀寫操作也都是針對分區這個粒度進行的,這樣每個節點的機器都能夠獨立地執行各自分區的讀寫請求處理。並且,我們還可以通過添加新的節點機器來增加整體系統的吞吐量。

分區策略

所謂的分區策略是決定生產者將消息發送到哪個分區的算法。

如果要自定義分區策略,你需要顯示地配置生產者端的參數partitioner.class。

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;

public class KafkaProduce {

    public void kafkaProducer() throws Exception {
     
        Properties pro = new Properties();
     ......// 其他配置 pro.put(
"partitioner.class", "kafka.KafkaPartitioner"); KafkaProducer config = new KafkaProducer(pro); } }

輪詢策略

也稱Round-robin策略,即順序分配。比如一個topic下有3個分區,那么第一條消息被發送到分區0,第二條被發送到分區1,第三條被發送到分區2,以此類推。當生產第四條消息時又會重新開始。

這就是所謂的輪詢策略。輪詢策略是kafka java生產者API默認提供的分區策略。輪詢策略有非常優秀的負載均衡表現,它總是能保證消息最大限度地被平均分配到所有分區上,故默認情況下它是最合理的分區策略,也是平時最常用的分區策略之一。

隨機策略

也稱Randomness策略。所謂隨機就是我們隨意地將消息放置在任意一個分區上,如下圖

實現方法:

package kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class KafkaPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(s);
        // 隨機策略
        return ThreadLocalRandom.current().nextInt(partitions.size());
       
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

本質上看隨機策略也是力求將數據均勻地打撒到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分布,還是使用輪詢策略比較好。

按key保存策略

kafka允許為每條消息定義消息鍵,簡稱為key。一旦消息被定義了key,那么你就可以保證同一個key的所有消息都進入到相同的分區里面,由於每個分區下的消息處理都是有順序的,如下圖所示

package kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import java.util.List;
import java.util.Map;

public class KafkaPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {

        List<PartitionInfo> partitions = cluster.partitionsForTopic(s);
        // 隨機策略
        //return ThreadLocalRandom.current().nextInt(partitions.size());
        // 按key分配分區
        return Math.abs(o.hashCode()) % partitions.size();
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

實際上kafka默認分區策略實際上同時實現了兩種策略:如果指定了key那么默認按key保存策略;如果沒有指定key,則使用輪詢策略。

切記分區是實現負載均衡以及高吞吐量的關鍵,故在生產者這一端就要仔細盤算合適的分區策略,避免造成消息數據的“傾斜”,使得某些分區成為性能瓶頸,這樣極易引發下游數據消費的性能下降。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM