Kafka 生產者 自定義分區策略


實現一個用於審計功能的分區策略:假設我們有兩類消息,其中一類消息的key為audit,用於審計,放在最后一個分區中,其他消息在剩下的分區中隨機分配。

先創建一個三個分區三個副本的主題audit-test:

bin/kafka-topics.sh --create --zookeeper localhost:2181,localhost:2812,localhost:2183 --topic audit-test 
--partitions 3 --replication-factor 3 //查看 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic audit-test

然后實現Kafka客戶端提供的Partitioner接口:

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;
import java.util.Random;


/***
 *
 * 實現一個自定義分區策略:
 *
 * key含有audit的一部分消息發送到最后一個分區上,其他消息在其他分區隨機分配
 *
 *
 *
 */
public class PartitionerImpl implements Partitioner {


    private Random random;

    public void configure(Map<String, ?> configs) {
        //做必要的初始化工作
        random = new Random();
    }

    //分區策略
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        String keyObj = (String) key;
        List<PartitionInfo> partitionInfoList = cluster.availablePartitionsForTopic(topic);
        int partitionCount = partitionInfoList.size();
        System.out.println("partition size: "  + partitionCount);
        int auditPartition = partitionCount - 1;
        return keyObj == null || "".equals(keyObj) || !keyObj.contains("audit") ? random.nextInt(partitionCount - 1) : auditPartition;
    }

    public void close() {
        //清理工作
    }
}

接下來設定啟動類參數:

//實現類
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"cn.org.fubin.PartitionerImpl");

String topic = "audit-test";
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord nonKeyRecord = new ProducerRecord(topic,"non-key record");
//這類消息需要放在最后一個分區
ProducerRecord auditRecord = new ProducerRecord(topic,"audit","audit record");
ProducerRecord nonAuditRecord = new ProducerRecord(topic,"other","non-audit record");

try {
    producer.send(nonAuditRecord).get();
    producer.send(nonAuditRecord).get();
    producer.send(auditRecord).get();
    producer.send(nonAuditRecord).get();
    producer.send(nonAuditRecord).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

最后驗證:多推送幾次消息,查看每個分區的消息數

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic audit-test

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM