問題描述:
在消費端能夠正常消費到Kafka數據並成功生產到producer topic 中,當將kafka的一台機器關機之后,正常情況下應該是 消費端是不受影響的。因為有還有兩台的負載機器。問題就是一台機器停止運行之后,消費端酒 shutdown 而無法重新starting
解決方式 :
在如下代碼中。
1 public void run(){ 2 try{ 3 System. out.println( "Consumer...."); 4 Map<String,Integer> topicCountMap = new HashMap<String,Integer>(); 5 topicCountMap.put( topic, partitionNum); 6 Map<String,List<KafkaStream< byte[], byte[]>>> consumerMap = consumer.createMessageStreams( topicCountMap); 7 List<KafkaStream< byte[], byte[]>> partitions = consumerMap.get( topic); 8 9 threadPool = Executors. newCachedThreadPool(); 10 for(KafkaStream< byte[], byte[]> partition : partitions){ 11 threadPool.execute( new MessageFetcher(partition,producer )); 12 } 13 } catch(Exception ex){ 14 logger.info( "KafkaConsumer-> Run -> ErrInfo : " +ex.getMessage()); 15 close(); 16 } 17 }
有一個 partitionNum,在代碼中的可配置值為 private int partitionNum = 3;
把partitionNum 改為 1 即可解決此問題。
問題跟蹤源碼分析:
partitionNUm 改為 1 , 此處的Num 為ThreadNum ,因為kafka內部實現中,都為多線程, partition為1時,此時有一個backingQueue1,三個fetch thread 線程,該topic分布在幾個node上就有幾個 fetch thread 每個fetch thread 會於kafka broker建立一個連接,3個fetch thread線程去拉去消息數據,最終防盜blockingQueue中,等到consumer thread來消費。