關於kafka的消費組模式,差了點相關資料,其中有一點提到:
一個主題下的分區不能小於消費者數量,即一個主題下消費者數量不能大於分區屬,大了就浪費了
那么,如果我的消費者進程數大於分區數的話,會有什么現象了,接下來就做個實驗試試
1、首先,創建一個3分區,主題名為test3
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test3
2、編寫生產者和消費者腳本,本人用的開發語言是php,demo例子可以參考文檔,只要做一些小修改就可以了
附上文檔地址:https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples-high-level-consumer.html
生產者腳本
1 <?php 2 3 $objRdKafka = new RdKafka\Producer(); 4 $objRdKafka->setLogLevel(LOG_DEBUG); 5 $objRdKafka->addBrokers("192.168.78.139:9092"); 6 7 $oObjTopic = $objRdKafka->newTopic("test3"); 8 9 // 從終端接收輸入 10 $oInputHandler = fopen('php://stdin', 'r'); 11 12 while (true) { 13 echo "\nEnter messages:\n"; 14 $sMsg = trim(fgets($oInputHandler)); 15 16 // 空消息意味着退出 17 if (empty($sMsg)) { 18 break; 19 } 20 21 // 發送消息 22 $oObjTopic->produce(RD_KAFKA_PARTITION_UA, 0, $sMsg); 23 } 24 25 echo "done\n"; 26 ?>
消費者腳本
<?php $conf = new RdKafka\Conf(); // Set a rebalance callback to log partition assignments (optional) // 當有新的消費進程加入或者退出消費組時,kafka 會自動重新分配分區給消費者進程,這里注冊了一個回調函數,當分區被重新分配時觸發 $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); // 配置groud.id 具有相同 group.id 的consumer 將會處理不同分區的消息,所以同一個組內的消費者數量如果訂閱了一個topic, 那么消費者進程的數量多於 多於這個topic 分區的數量是沒有意義的。 $conf->set('group.id', 'myConsumerGroup1'); //添加 kafka集群服務器地址(ip地址和端口替換成自己本地測試環境) $conf->set('metadata.broker.list', '192.168.78.139:9092'); $topicConf = new RdKafka\TopicConf(); // Set where to start consuming messages when there is no initial offset in // offset store or the desired offset is out of range. // 'smallest': start from the beginning //當沒有初始偏移量時,從哪里開始讀取 $topicConf->set('auto.offset.reset', 'smallest'); // Set the configuration to use for subscribed/assigned topics $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafka\KafkaConsumer($conf); // 讓消費者訂閱log 主題(topic替換成自己生成的) $consumer->subscribe(['test3']); while (true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } ?>
3、開始試驗
啟4個消費者腳本,然后用生產者生成多個消息,進行觀察
如上圖所示。消費者2,3,4都能進行消費,而消費者1就只是掛着啥事都不干
接下來,試試kill掉其中一個消費者,比如消費者4,然后繼續產生消息
這是,消費者1頂替了消費者4的位置,進行消息的消費
結論:當消費者總數大於分區數的話,多余的消費者進程會一直掛着,但是當某個消費者進程down掉的話,之前那些多余的消費者進程會頂替上來