0.前言
客戶端用法:
kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
// 決定一個topic啟動幾個線程去拉取數據,即生成幾個KafkaStream;
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threads));
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);
// 本質是調用了 ZookeeperConsumerConnector
val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
- 一個Topic啟動幾個消費者線程,會生成幾個KafkaStream。
- 一個KafkaStream對應的是一個Queue(有界的LinkedBlockingQueue),有界的參數控制:
queued.max.message.chunks
。消費者線程數量決定阻塞隊列的個數。 - Fetcher線程是對應topic所在的broker的個數。
因此,分析Consumer,主要是分析ZookeeperConsumerConnector
。代碼里面,有兩個類,它們是什么關系呢?
- kafka.consumer.ZookeeperConsumerConnector:核心類
- kafka.javaapi.consumer.ZookeeperConsumerConnector:對上面那個類的scala數據結構封裝,方便Java程序員使用。
0.8.0 和 0.8.2.1
ZookeeperConsumerConnector
的源碼不一樣,下面以0.8.2.1源碼為主來分析,也就是從這個版本開始,可以將Offset存在Kafka的Broker中。(關注實現思想,忽略細節。)
1.ZookeeperConsumerConnector 架構
一個Consumer會創建一個ZookeeperConsumerConnector,代表一個消費者進程.
- fetcher: 消費者獲取數據, 使用ConsumerFetcherManager fetcher線程抓取數據
- zkClient: 消費者要和ZK通信, 除了注冊自己,還有其他信息也會寫到ZK中
- topicThreadIdAndQueues: 消費者會指定自己消費哪些topic,並指定線程數, 所以topicThreadId都對應一個隊列
- messageStreamCreated: 消費者會創建消息流, 每個隊列都對應一個消息流
- offsetsChannel: offset可以存儲在ZK或者kafka中,如果存在kafka里,像其他請求一樣,需要和Broker通信。可以理解成OffsetManager的一部分。
- scheduler: 后台調度autoCommit
- 還有其他幾個Listener監聽器,分別用於topicPartition的更新,負載均衡,消費者重新負載等
簡述獲取數據的流程
- 初始化上面的幾個組件,包括與ZK的連接,創建ConsumerFetcherManager,確保連接上OffsetManager(為該ConsumerGroup建立一個OffsetChannel)。
createMessageStreams
創建消息流,反序列化message- 通過Fetcher線程拉取數據,放入BlockingQueue來給客戶端。
- 客戶端啟動ZKRebalancerListener,ZKRebalancerListener實例會在內部創建一個線程,這個線程定時檢查監聽的事件有沒有執行(消費者發生變化),如果沒有變化則wait 1秒鍾,當發生了變化就調用
syncedRebalance
方法,去rebalance
消費者。
1.1 消費者線程(consumer thread),隊列(LinkedBlockingQueue),拉取線程(fetch thread)三者之間關系
以一段代碼來說明,消費的topic 12 partition,分配在3台broker機器上。
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test-string-topic", new Integer(2)); //value表示consumer thread線程數量
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- consumer thread數量與BlockingQueue一一對應。所以上述的代碼只有2個BlockQueue。(它們連接的橋梁是KafkaStream)
- fetcher線程數和topic所在多少台broker有關。因此,共有3個fetcher線程與broker建立一個連接。(3個fetch thread線程去拉取消息數據,最終放到2個BlockingQueue中,等待consumer thread來消費。)
下面是分配的情況:
- 消費者線程,緩沖隊列,partitions分布列表如下
consumer線程 | Blocking Queue | partitions |
---|---|---|
consumer thread1 | blockingQueue1 | 0,1,2,3,4,5 |
consumer thread2 | blockingQueue2 | 6,7,8,9,10,11 |
- fetch thread與partitions分布列表如下
fetch線程 | partitions |
---|---|
fetch thread1 | 0,3,6,9 |
fetch thread2 | 1,4,7,10 |
fetch thread3 | 2,5,8,11 |
用戶的consumer thread就使用2個BlockingQueue的數據進行處理;所以一般會使用2個consumer thread去消費這2個BlockingQueue數據。
1.2 rebalance的流程
代碼上調用:syncedRebalance方法在內部會調用def rebalance(cluster: Cluster): Boolean方法,去執行操作。
- // 關閉所有的數據獲取者 closeFetchers
- // 解除分區的所有者 releasePartitionOwnership
- // 按規則得到當前消費者擁有的分區信息並保存到topicRegistry中
topicRegistry=getCurrentConsumerPartitionInfo
- // 修改並重啟Fetchers updateFetchers
最后,對每個broker創建一個FetcherRunnable線程,並啟動它。這個fetcher線程負責從Broker上不斷獲取數據,對每個partition分別創建FetchRequest,最后把數據插入BlockingQueue的操作。
KafkaStream
對ConsumerIterator
做了進一步的封裝,我們調用stream的next
方法就可以取到數據了(內部通過調用ConsumerIterator
的next
方法實現)
1.3 注意
ConsumerIterator
的實現可能會造成數據的重復發送(這要看生產者如何生產數據),FetchedDataChunk
是一個數據集合,它內部會包含很多數據塊,一個數據塊可能包含多條消息,但同一個數據塊中的消息只有一個offset,所以當一個消息塊有多條數據,處理完部分數據發生異常時,消費者重新去取數據,就會再次取得這個數據塊,然后消費過的數據就會被重新消費。
- 沒想到里面,里面是這個樣子的,給一個數據塊,導致了數據消費的重復。
3.美團遇到的一個問題
問題: Kafka中由Consumer維護消費狀態,當Consumer消費消息時,支持2種模式commit消費狀態,分別為立即commit和周期commit。前者會導致性能低下,做到消息投遞恰好一次,但很少使用,后者性能高,通常用於實際應用,但極端條件下無法保證消息不丟失。
解決方案(這個問題太極端情況,不推薦,長個知識)
- 將本來的結果改成下面的處理流程:等待“執行業務邏輯”成功完成后更新緩存消費狀態,就可以保證消息不會丟失。
變成下面的: