為什么分區?
kafka有topic的概念,它是承載真實數據的邏輯容器,而在主題之下還分為若干個分區,也就是說kafka的消息組織方式實際上是三級結構:主題-分區-消息。主題下的每條消息只會保存在某個分區中,而不會在多個分區中被保存多份。
其實分區的作用就是提供負載均衡的能力,不同的分區能夠被放置在不同節點的機器上,而數據的讀寫操作也都是針對分區這個粒度進行的,這樣每個節點的機器都能夠獨立地執行各自分區的讀寫請求處理。並且,我們還可以通過添加新的節點機器來增加整體系統的吞吐量。
分區策略
所謂的分區策略是決定生產者將消息發送到哪個分區的算法。
如果要自定義分區策略,你需要顯示地配置生產者端的參數partitioner.class。
package kafka; import org.apache.kafka.clients.producer.KafkaProducer; import java.util.Properties; public class KafkaProduce { public void kafkaProducer() throws Exception { Properties pro = new Properties();
......// 其他配置 pro.put("partitioner.class", "kafka.KafkaPartitioner"); KafkaProducer config = new KafkaProducer(pro); } }
輪詢策略
也稱Round-robin策略,即順序分配。比如一個topic下有3個分區,那么第一條消息被發送到分區0,第二條被發送到分區1,第三條被發送到分區2,以此類推。當生產第四條消息時又會重新開始。
這就是所謂的輪詢策略。輪詢策略是kafka java生產者API默認提供的分區策略。輪詢策略有非常優秀的負載均衡表現,它總是能保證消息最大限度地被平均分配到所有分區上,故默認情況下它是最合理的分區策略,也是平時最常用的分區策略之一。
隨機策略
也稱Randomness策略。所謂隨機就是我們隨意地將消息放置在任意一個分區上,如下圖
實現方法:
package kafka; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; public class KafkaPartitioner implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(s); // 隨機策略 return ThreadLocalRandom.current().nextInt(partitions.size()); } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
本質上看隨機策略也是力求將數據均勻地打撒到各個分區,但從實際表現來看,它要遜於輪詢策略,所以如果追求數據的均勻分布,還是使用輪詢策略比較好。
按key保存策略
kafka允許為每條消息定義消息鍵,簡稱為key。一旦消息被定義了key,那么你就可以保證同一個key的所有消息都進入到相同的分區里面,由於每個分區下的消息處理都是有順序的,如下圖所示
package kafka; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import java.util.List; import java.util.Map; public class KafkaPartitioner implements Partitioner { @Override public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(s); // 隨機策略 //return ThreadLocalRandom.current().nextInt(partitions.size()); // 按key分配分區 return Math.abs(o.hashCode()) % partitions.size(); } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }
實際上kafka默認分區策略實際上同時實現了兩種策略:如果指定了key那么默認按key保存策略;如果沒有指定key,則使用輪詢策略。
切記分區是實現負載均衡以及高吞吐量的關鍵,故在生產者這一端就要仔細盤算合適的分區策略,避免造成消息數據的“傾斜”,使得某些分區成為性能瓶頸,這樣極易引發下游數據消費的性能下降。