最近項目開發過程使用kafka作為項目模塊間負載轉發器,實現實時接收不同產品線消息,分布式准實時消費產品線消息。通過kafka作為模塊間的轉換器,不僅有MQ的幾大好處:異步、 解耦、 削峰等幾大好處,而且開始考慮最大的好處,可以實現架構的水平擴展,下游系統出現性能瓶頸,容器平台伸縮增加一些實例消費能力很快就提上來了,整體系統架構上不用任何變動。理論上,我們項目數據量再大整體架構上高可用都沒有問題。在使用kafka過程中也遇到一些問題:
1. 消息逐漸積壓,消費能力跟不上;
2.某個消費者實例因為某些異常原因掛掉,造成少量數據丟失的問題。
針對消費積壓的問題,通過研究kafka多線程消費的原理,解決了消費積壓的問題。所以,理解多線程的Consumer模型是非常有必要,對於我們正確處理kafka多線程消費很重要。
kafka多線程消費模式
說kafka多線程消費模式前,我們先來說下kafka本身設計的線程模型和ConcurrentmodificationException異常的原因。見官方文檔:
The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.
ConcurrentmodificationException異常的出處見以下代碼:
該方法acquire 會在KafkaConsumer的大部分公有方法調用第一句就判斷是否正在同一個KafkaConsumer被多個線程調用。
"正在"怎么理解呢?我們順便看下KafkaConsumer的commitAsync 這個方法就知道了。
我們看KafkaConsumer的release方法就是釋放正在操作KafkaConsumer實例的引用。
通過以上的代碼理解,我們可以總結出來kafka多線程的要點:
kafka的KafkaConsumer必須保證只能被一個線程操作。
下面就來說說,我理解的Kafka能支持的兩種多線程模型,首先,我們必須保證操作KafkaConsumer實例的只能是一個線程,那我們要想多線程只能用在消費ConsumerRecord List上動心思了。下面列舉我理解的kafka多線程消費模式。
模式一1個Consumer模型對應一個線程消費,最多可以有topic對應的partition個線程同時消費Topic。

模式二1個Consumer和多個線程消費模型,保證只有一個線程操作KafkaConsumer,其它線程消費ConsumerRecord列表。

注意
第二種模式其實也可以支持多個Consumer,用戶最多可以啟用partition總數個Consumer實例,然后,模式二跟模式一唯一的差別就是模式二在單個Consuemr里面是多線程消費,而模式一單個Consumer里面是單線程消費。
以上兩種kafka多線程消費模式優缺點對比:

其實,模式二就是模式一情況在消費單個Consumer內部的特殊情況。
kafka多線程消費模式實現
關於多線程消費模式具體實現都是選擇基於spring-kafka實現,畢竟站在巨人肩膀上,站的高望的遠少加班,以下就是模式二的具體實現,模式一的話就是對模式二的簡化,具體實現如下。
具體業務代碼在BaseConsumer:
其中,closeConsumeExecutorService方法就是為了服務實例異常退出或者多機房上線kill的情況下,盡最大可能保證本次拉下來的任務被消費掉。最后,附上closeConsumeExecutorService實現,覺得RocketMQ源碼這個實現的不錯,就借用過來了,在此表示感謝。
下面回到使用kafka遇到的第二個問題,怎么解決消費者實例因為某些原因掛掉,造成少量數據丟失的問題。其實,通過我們上面的寫法,已經不會出現因為某些原因服務實例(docker、物理機)掛掉,丟數據的情況。因為我們是先拉取后消費,消費完才手動提交kafka確認offset。實在還存在萬一退出時候調用的closeConsumeExecutorService方法還沒有消費完數據,表示這個時候offset肯定沒有手動提交,這一部分數據也不會丟失,會在服務實例恢復了重新拉取消費。
以上的代碼存在極小的可能瑕疵,比如,我們雙機房切換上線,某機房實例有一部分數據沒有消費,下次會重復消費的問題。其實,這個問題我們在業務上通過在配置中心配置一個標識符來控制,當改變標識符控制某些機房停止拉取kafka消息,這個時候我們就可以安全操作,不擔心kafka沒有消費完,下次重復消費的問題了。
以上自己使用kafka過程中一些心得體會,難免有所遺漏,感謝指出,知錯能改,每天進步。