kafka消息分發策略分析



當我們使用kafka向指定Topic發送消息時,如果該Topic具有多個partition,無論消費者有多少,最終都會保證一個partition內的消息只會被一個Consumer group中的一個Consumer消費,也就是說同一Consumer group中的多個Consumer自動會起到負載均衡的效果。

1、消息構造

下面我們就針對調用kafka API發送消息到Topic時partition的分配策略,分析下其內部具體的源碼碼實現。

首先看下kafka API中消息體ProducerRecord類的構造函數,可以看到構造消息時可指定該消息要發送的Topic、partition、key、value等關鍵信息。

    /**
     * Creates a record to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param key The key that will be included in the record
     * @param value The record contents
     * @param headers The headers that will be included in the record
     */
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }
    
    /**
     * Creates a record to be sent to a specified topic and partition
     *
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    
    /**
     * Create a record to be sent to Kafka
     * 
     * @param topic The topic the record will be appended to
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }

2、分發策略 

在實際使用中,我們一般不會指定消息發送的具體partition,最多只會傳入key值,類似下面這種方式:

producer.send(new ProducerRecord<Object, Object>(topic, key, data));

而kafka也會根據你傳入key的hash值,通過取余的方法,盡可能保證消息能夠相對均勻的分攤到每個可用的partition上;

下面是kafka內部默認的分發策略:

public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public void configure(Map<String, ?> configs) {}

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //獲取該topic的分區列表
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //如果key值為null
        if (keyBytes == null) {
            //維護一個key為topic的ConcurrentHashMap,並通過CAS操作的方式對value值執行遞增+1操作
            int nextValue = nextValue(topic);
            //獲取該topic的可用分區列表
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {//如果可用分區大於0
                //執行求余操作,保證消息落在可用分區上
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 沒有可用分區的話,就給出一個不可用分區
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // 通過計算key的hash,確定消息分區
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        //獲取一個AtomicInteger對象
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {//如果為空
            //生成一個隨機數
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            //維護到topicCounterMap中
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        //返回值並執行遞增
        return counter.getAndIncrement();
    }

    public void close() {}

}

3、自定義負載策略

我們也可以通過實現Partitioner接口,自定義分發策略,看下具體實現

自定義實現Partitioner接口

/**
 * 自定義實現Partitioner接口
 *
 */
public class KeyPartitioner implements Partitioner {

    /**
     * 實現具體分發策略
     */
    @Override
    public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);//拉取可用的partition
        if (key == null||key.equals("")) {
            int random =  (int) (Math.random() * 10);
            int part = random % availablePartitions.size();
            return availablePartitions.get(part).partition();
        }
        return  Math.abs(key.toString().hashCode() % 6);
    }

    @Override
    public void configure(Map<String, ?> configs) {
        // TODO Auto-generated method stub

    }

    @Override
    public void close() {
        // TODO Auto-generated method stub

    }

}

同時在初始化kafka生產者時,增加自定義配置

Properties properties = new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,KeyPartitioner.class); //加入自定義的配置
producer = new KafkaProducer<Object, Object>(properties);

 4、總結

以上是對kafka消息分發的策略進行一定的分析與自定義擴展,希望對大家在使用kafka時有所幫助,其中如有不足與不正確的地方還望指出與海涵。

 

關注微信公眾號,查看更多技術文章。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM