kafka 分區策略
1.指明partition的情況下,直接將指明的值作為partition值;
例如partition=0,所有數據寫入分區0
2.沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取余得到partition值;
例如:key1的hash值=5, key2的hash值=6 ,topic的partition數=2,那么key1對應的value1寫入1號分區,key2對應的value2寫入0號分區。
例如:要把一張表的數據入到同一個分區,可以指定key的值為該表名
3.既沒有partition值又沒有key值的情況下,Kafka采用Sticky Partition(黏性分區器),會隨機選擇一個分區,並盡可能一直使用該分區,待該分區的batch已滿或者已完成,Kafka再隨機一個分區進行使用(和上一次的分區不同)。
例如:第一次隨機選擇0號分區,等0號分區當前批次滿了(默認16k)或者linger.ms設置的時間到, Kafka再隨機一個分區進行使用(如果還是0會繼續隨機)。
代碼實現
package com.lzh.kafka; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; /* kafka 分區策略 1.指明partition的情況下,直接將指明的值作為partition值; 例如partition=0,所有數據寫入分區0 2.沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取余得到partition值; 例如:key1的hash值=5, key2的hash值=6 ,topic的partition數=2,那么key1對應的value1寫入1號分區,key2對應的value2寫入0號分區。 例如:要把一張表的數據入到同一個分區,可以指定key的值為該表名 3.既沒有partition值又沒有key值的情況下,Kafka采用Sticky Partition(黏性分區器),會隨機選擇一個分區,並盡可能一直使用該分區, 待該分區的batch已滿或者已完成,Kafka再隨機一個分區進行使用(和上一次的分區不同)。 例如:第一次隨機選擇0號分區,等0號分區當前批次滿了(默認16k)或者linger.ms設置的時間到, Kafka再隨機一個分區進行使用(如果還是0會繼續隨機)。 */ public class CustomProducerPartition { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1.配置 Properties properties = new Properties(); // 連接集群 // 給kafka對象添加配置信息 bootstrap.servers // 生產者連接集群所需的 broker 地址清單 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092"); // 指定發送消息的key和value的序列化類型。一定要寫全類名。 // key,value序列化 key.serializer,value.serializer // key序列化 // 全類名與下等價: properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName()); // 2.創建 kafka 生產者對象 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties); // 3.發送數據 // 調用 send 方法,發送消息 // 指定分區 partition for (int i = 1; i < 5; i++) { kafkaProducer.send(new ProducerRecord<String, String>("Mytopic",3,"","" + i), new Callback() { // 有回調函數 public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("主題"+ recordMetadata.topic() +"已異步發送消息到指定分區"+ recordMetadata.partition()); } } }); } // 指定 key,把同一張的數據入到同一個分區 key就是表名 for (int i = 1; i < 10; i++) { final int n; n=i; kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","tab_name","記錄"+i), new Callback() { // 有回調函數 public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e == null) { System.out.println("表tab_name的第"+ n +"條記錄已發送到主題" + recordMetadata.topic() + "的指定分區" + recordMetadata.partition()); } } }); } // 4.關閉資源 kafkaProducer.close(); } }
結果