kafka 多線程消費


一、

   1、Kafka的消費並行度依賴Topic配置的分區數,如分區數為10,那么最多10台機器來並行消費(每台機器只能開啟一個線程),或者一台機器消費(10個線程並行消費)。即消費並行度和分區數一致。

   2、(1)如果指定了某個分區,會只講消息發到這個分區上

       (2)如果同時指定了某個分區和key,則也會將消息發送到指定分區上,key不起作用 

       (3)如果沒有指定分區和key,那么將會隨機發送到topic的分區中

       (4)如果指定了key,那么將會以hash<key>的方式發送到分區中 

二、多線程消費實例

     paritition 為3,broker為3,節點為3

1、生產者隨機分區提交數據

    這也是一個比較關鍵步驟,只有隨機提交到不同的分區才能實現多分區消費; 
自定義隨機分區:

public class MyPartition implements Partitioner{
     private static Logger LOG = LoggerFactory.getLogger(MyPartition.class); 
    @Override
    public void configure(Map<String, ?> arg0) {
        // TODO Auto-generated method stub
    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value,
            byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        int partitionNum = 0;
        try {
            partitionNum = Integer.parseInt((String) key);
        } catch (Exception e) {
            partitionNum = key.hashCode() ;
        }
//        System.out.println("kafkaMessage topic:"+ topic+" |key:"+ key+" |value:"+value);
        return Math.abs(partitionNum  % numPartitions);
    }
}  

    然后在初始化kafka生產者配置的時候修改如下配置

props.put("partitioner.class", properties.getProperty("com.mykafka.MyPartition"));

這樣就實現了kafka生產者隨機分區提交數據。

 2、消費者

最后一步就是消費者,修改單線程模式為多線程,這里的多線程實現方式有很多,本例知識用了最簡單的固定線程模式:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 3; i++) {
            fixedThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    kafkaConsumerService.getInstance();
                }
            });
        }

在消費方面得注意,這里得遍歷所有分區,否則還是只消費了一個區:

ConsumerRecords<String, String> records = consumer.poll(1000);
        for (TopicPartition partition : records.partitions()) {  
            List<ConsumerRecord<String, String>> partitionRecords = records  
                    .records(partition); 
        for (ConsumerRecord<String, String> record : partitionRecords) {
            System.out.println(
                    "message==>key:" + record.key() + " value:" + record.value() + " offset:" + record.offset()
                    + " 分區:" + record.partition());
            if (record.value() == null || record.key() == null) {
                consumer.commitSync();
            } else {
                // dealMessage
                KafkaServer.dealMessage(record.key(),record.value(),consumer);
//              consumer.commitSync();
            }
         }
        }

  注意上面的線程為啥只有3個,這里得跟上面kafka的分區個數相對應起來,否則如果線程超過分區數量,那么只會浪費線程,因為即使使用3個以上的線程也只會消費三個分區,而少了則無法消費完全。所以這里必須更上面的對應起來。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM