Kafka問題排查(消費者自動關閉)


問題描述:
           在消費端能夠正常消費到Kafka數據並成功生產到producer topic 中,當將kafka的一台機器關機之后,正常情況下應該是 消費端是不受影響的。因為有還有兩台的負載機器。問題就是一台機器停止運行之后,消費端酒 shutdown  而無法重新starting
解決方式 : 
在如下代碼中。
 1      public void run(){
 2              try{
 3                   System. out.println( "Consumer....");
 4                   Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
 5                    topicCountMap.put( topic, partitionNum);
 6                   Map<String,List<KafkaStream< byte[], byte[]>>> consumerMap = consumer.createMessageStreams( topicCountMap);
 7                   List<KafkaStream< byte[], byte[]>> partitions = consumerMap.get( topic);
 8 
 9                    threadPool = Executors. newCachedThreadPool();
10                    for(KafkaStream< byte[], byte[]> partition : partitions){
11                          threadPool.execute( new MessageFetcher(partition,producer ));
12                   }
13             } catch(Exception ex){
14                    logger.info( "KafkaConsumer-> Run -> ErrInfo : " +ex.getMessage());
15                   close();
16             }
17       }

有一個 partitionNum,在代碼中的可配置值為 private int partitionNum = 3;  

把partitionNum 改為 1 即可解決此問題。

問題跟蹤源碼分析:

partitionNUm 改為 1 , 此處的Num 為ThreadNum ,因為kafka內部實現中,都為多線程, partition為1時,此時有一個backingQueue1,三個fetch thread 線程,該topic分布在幾個node上就有幾個 fetch thread 每個fetch thread 會於kafka broker建立一個連接,3個fetch thread線程去拉去消息數據,最終防盜blockingQueue中,等到consumer thread來消費。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM