初識kafka 之 分區策略


kafka 分區策略

1.指明partition的情況下,直接將指明的值作為partition值;
   例如partition=0,所有數據寫入分區0
2.沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取余得到partition值;
   例如:key1的hash值=5, key2的hash值=6 ,topic的partition數=2,那么key1對應的value1寫入1號分區,key2對應的value2寫入0號分區。
   例如:要把一張表的數據入到同一個分區,可以指定key的值為該表名

3.既沒有partition值又沒有key值的情況下,Kafka采用Sticky Partition(黏性分區器),會隨機選擇一個分區,並盡可能一直使用該分區,待該分區的batch已滿或者已完成,Kafka再隨機一個分區進行使用(和上一次的分區不同)。
   例如:第一次隨機選擇0號分區,等0號分區當前批次滿了(默認16k)或者linger.ms設置的時間到, Kafka再隨機一個分區進行使用(如果還是0會繼續隨機)。

代碼實現

package com.lzh.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/* kafka 分區策略
1.指明partition的情況下,直接將指明的值作為partition值;
    例如partition=0,所有數據寫入分區0
2.沒有指明partition值但有key的情況下,將key的hash值與topic的partition數進行取余得到partition值;
    例如:key1的hash值=5, key2的hash值=6 ,topic的partition數=2,那么key1對應的value1寫入1號分區,key2對應的value2寫入0號分區。
    例如:要把一張表的數據入到同一個分區,可以指定key的值為該表名

3.既沒有partition值又沒有key值的情況下,Kafka采用Sticky Partition(黏性分區器),會隨機選擇一個分區,並盡可能一直使用該分區,
  待該分區的batch已滿或者已完成,Kafka再隨機一個分區進行使用(和上一次的分區不同)。
    例如:第一次隨機選擇0號分區,等0號分區當前批次滿了(默認16k)或者linger.ms設置的時間到, Kafka再隨機一個分區進行使用(如果還是0會繼續隨機)。
*/

public class CustomProducerPartition {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.配置
        Properties properties = new Properties();

        // 連接集群
        // 給kafka對象添加配置信息 bootstrap.servers
        // 生產者連接集群所需的 broker 地址清單
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"bigdata01:9092,bigdata02:9092");


        // 指定發送消息的key和value的序列化類型。一定要寫全類名。
        // key,value序列化 key.serializer,value.serializer

        // key序列化
        // 全類名與下等價: properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // value序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 2.創建 kafka 生產者對象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 3.發送數據
        // 調用 send 方法,發送消息

        // 指定分區 partition
        for (int i = 1; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("Mytopic",3,"","" + i), new Callback() { // 有回調函數
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主題"+ recordMetadata.topic() +"已異步發送消息到指定分區"+ recordMetadata.partition());
                    }
                }
            });
        }

        // 指定 key,把同一張的數據入到同一個分區 key就是表名
        for (int i = 1; i < 10; i++) {
            final int n;
            n=i;
            kafkaProducer.send(new ProducerRecord<String, String>("Mytopic","tab_name","記錄"+i), new Callback() { // 有回調函數
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("表tab_name的第"+ n +"條記錄已發送到主題" + recordMetadata.topic() + "的指定分區" + recordMetadata.partition());
                    }
                }
            });
        }

        // 4.關閉資源
        kafkaProducer.close();

    }
}

結果

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM