關於kafka消費組模式,分區數和消費者數的一些疑惑


關於kafka的消費組模式,差了點相關資料,其中有一點提到:

一個主題下的分區不能小於消費者數量,即一個主題下消費者數量不能大於分區屬,大了就浪費了

 

那么,如果我的消費者進程數大於分區數的話,會有什么現象了,接下來就做個實驗試試

 

1、首先,創建一個3分區,主題名為test3

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test3

 

2、編寫生產者和消費者腳本,本人用的開發語言是php,demo例子可以參考文檔,只要做一些小修改就可以了

附上文檔地址:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples-high-level-consumer.html

 

生產者腳本

 1 <?php
 2 
 3 $objRdKafka = new RdKafka\Producer();
 4 $objRdKafka->setLogLevel(LOG_DEBUG);
 5 $objRdKafka->addBrokers("192.168.78.139:9092");
 6 
 7 $oObjTopic = $objRdKafka->newTopic("test3");
 8 
 9 // 從終端接收輸入 
10 $oInputHandler = fopen('php://stdin', 'r');
11 
12 while (true) {
13     echo "\nEnter  messages:\n";
14     $sMsg = trim(fgets($oInputHandler));
15 
16    // 空消息意味着退出
17     if (empty($sMsg)) {
18         break;
19     }
20 
21     // 發送消息
22     $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg);
23 }
24 
25 echo "done\n";
26 ?>

 

 

消費者腳本

<?php

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
// 當有新的消費進程加入或者退出消費組時,kafka 會自動重新分配分區給消費者進程,這里注冊了一個回調函數,當分區被重新分配時觸發
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err);
    }
});

// 配置groud.id 具有相同 group.id 的consumer 將會處理不同分區的消息,所以同一個組內的消費者數量如果訂閱了一個topic, 那么消費者進程的數量多於 多於這個topic 分區的數量是沒有意義的。
$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集群服務器地址(ip地址和端口替換成自己本地測試環境)
$conf->set('metadata.broker.list', '192.168.78.139:9092');

$topicConf = new RdKafka\TopicConf();


// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//當沒有初始偏移量時,從哪里開始讀取
$topicConf->set('auto.offset.reset', 'smallest');


// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// 讓消費者訂閱log 主題(topic替換成自己生成的)
$consumer->subscribe(['test3']);


while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}
?>

 

 

3、開始試驗

啟4個消費者腳本,然后用生產者生成多個消息,進行觀察

 

 

 

 

 

 

 

 如上圖所示。消費者2,3,4都能進行消費,而消費者1就只是掛着啥事都不干

 

接下來,試試kill掉其中一個消費者,比如消費者4,然后繼續產生消息

 

 這是,消費者1頂替了消費者4的位置,進行消息的消費

 

結論:當消費者總數大於分區數的話,多余的消費者進程會一直掛着,但是當某個消費者進程down掉的話,之前那些多余的消費者進程會頂替上來


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM