注意:從kafka-0.9版本及以后,kafka的消費者組和offset信息就不存zookeeper了,而是存到broker服務器上,所以,如果你為某個消費者指定了一個消費者組名稱(group.id),那么,一旦這個消費者啟動,這個消費者組名和它要消費的那個topic的offset信息就會被記錄在broker服務器上
1.概述
Kafka版本[0.10.1.1],已默認將消費的 offset 遷入到了 Kafka 一個名為 __consumer_offsets 的Topic中。其實,早在 0.8.2.2 版本,已支持存入消費的 offset 到Topic中,只是那時候默認是將消費的 offset 存放在 Zookeeper 集群中。那現在,官方默認將消費的offset存儲在 Kafka 的Topic中,同時,也保留了存儲在 Zookeeper 的接口,通過 offsets.storage 屬性來進行設置。
2.內容
其實,官方這樣推薦,也是有其道理的。之前版本,Kafka其實存在一個比較大的隱患,就是利用 Zookeeper 來存儲記錄每個消費者/組的消費進度。雖然,在使用過程當中,JVM幫助我們完成了一些優化,但是消費者需要頻繁的去與 Zookeeper 進行交互,而利用ZKClient的API操作Zookeeper頻繁的Write其本身就是一個比較低效的Action,對於后期水平擴展也是一個比較頭疼的問題。如果期間 Zookeeper 集群發生變化,那 Kafka 集群的吞吐量也跟着受影響。
在此之后,官方其實很早就提出了遷移到 Kafka 的概念,只是,之前是一直默認存儲在 Zookeeper集群中,需要手動的設置,如果,對 Kafka 的使用不是很熟悉的話,一般我們就接受了默認的存儲(即:存在 ZK 中)。在新版 Kafka 以及之后的版本,Kafka 消費的offset都會默認存放在 Kafka 集群中的一個叫 __consumer_offsets 的topic中。
當然,其實她實現的原理也讓我們很熟悉,利用 Kafka 自身的 Topic,以消費的Group,Topic,以及Partition做為組合 Key。所有的消費offset都提交寫入到上述的Topic中。因為這部分消息是非常重要,以至於是不能容忍丟數據的,所以消息的 acking 級別設置為了 -1,生產者等到所有的 ISR 都收到消息后才會得到 ack(數據安全性極好,當然,其速度會有所影響)。所以 Kafka 又在內存中維護了一個關於 Group,Topic 和 Partition 的三元組來維護最新的 offset 信息,消費者獲取最新的offset的時候會直接從內存中獲取。
kafka 提供三種語義的傳遞:
1至少一次
2至多一次
3精確一次
首先在 producer 端保證1和2的語義是非常簡單的,至少一次只需要同步確認即可(確認方式分為只需要 leader 確認以及所有副本都確認,第二種更加具有容錯性),至多一次最簡單只需要異步不斷的發送即可,效率也比較高。目前在 producer 端還不能保證精確一次,在未來有可能實現,實現方式如下:在同步確認的基礎上為每一條消息加一個主鍵,如果發現主鍵曾經接受過,則丟棄
在 consumer 端,大家都知道可以控制 offset,所以可以控制消費,其實 offset 只有在重啟的時候才會用到。在機器正常運行時我們用的是 position,我們實時消費的位置也是 position 而不是 offset。我們可以得到每一條消息的 position。如果我們在處理消息之前就將當前消息的 position 保存到 zk 上即 offset,這就是只多一次消費,因為我們可能保存成功后,消息還沒有消費機器就掛了,當機器再打開時此消息就丟失了;或者我們可以先消費消息然后保存 position 到 zk 上即 offset,此時我們就是至少一次,因為我們可能在消費完消息后offset 沒有保存成功。而精確一次的做法就是讓 position的保存和消息的消費成為原子性操作,比如將消息和 position 同時保存到 hdfs 上 ,此時保存的 position 就稱為 offset,當機器重啟后,從 hdfs重新讀入offset,這就是精確一次。
- consumer可以先讀取消息,然后將offset寫入日志文件中,然后再處理消息。這存在一種可能就是在存儲offset后還沒處理消息就crash了,新的consumer繼續從這個offset處理,那么就會有些消息永遠不會被處理,這就是上面說的“最多一次”。
- consumer可以先讀取消息,處理消息,最后記錄offset,當然如果在記錄offset之前就crash了,新的consumer會重復的消費一些消息,這就是上面說的“最少一次”。
- “精確一次”可以通過將提交分為兩個階段來解決:保存了offset后提交一次,消息處理成功之后再提交一次。但是還有個更簡單的做法:將消息的offset和消息被處理后的結果保存在一起。比如用Hadoop ETL處理消息時,將處理后的結果和offset同時保存在HDFS中,這樣就能保證消息和offser同時被處理了。