我們知道Kafka 的消息通過topic進行分類。topic可以被分為若干個partition來存儲消息。消息以追加的方式寫入partition,然后以先入先出的順序讀取。
下面是topic和partition的關系圖:
我們一般會在server.conf中通過num.partitions參數指定創建topic時包含多少個partition。默認是num.partitions=1。
既然一個topic有多個partition,那么消息是怎么樣分配到partition的呢?
生產者生產一個消息send到topic分區器,分區器會根據消息里面的分區參數key值把消息分到對應的partition。這里就像我們快遞代發網點一樣,快遞代發網點可以代理很多種快遞公司,如果要寄快遞者P(生產者)指定用什么快遞公司,代發網點人員C(分區器)就會把該物品M(消息)歸類到指定的快遞公司區域存放。如果P不要求具體的快遞公司寄件,那么就由C隨意分配快遞公司(哈哈,那就要看這個家伙的心情了,心情好點給你一個順豐比較快到達,心情不好時就GG吧)。
下面是Kafka對消息分配分區 DefaultPartitioner.java 類的核心代碼:
1 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { 2 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); 3 int numPartitions = partitions.size(); 4 if (keyBytes == null) { 5 int nextValue = counter.getAndIncrement(); 6 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); 7 if (availablePartitions.size() > 0) { 8 int part = Utils.toPositive(nextValue) % availablePartitions.size(); 9 return availablePartitions.get(part).partition(); 10 } else { 11 // no partitions are available, give a non-available partition 12 return Utils.toPositive(nextValue) % numPartitions; 13 } 14 } else { 15 // hash the keyBytes to choose a partition 16 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; 17 } 18 }
第4、7行:如果沒有指定key值並且可用分區個數大於0時,在就可用分區中做輪詢決定改消息分配到哪個partition。
第4、10行:如果沒有指定key值並且沒有可用分區時,在所有分區中輪詢決定改消息分配到哪個partition。
第14行:如果指定key值,對key做hash分配到指定的partition。
所以當同一個key的消息會被分配到同一個partition中。消息在同一個partition處理的順序是FIFO,這就保證了消息的順序性。