1、 consumer API
kafka 提供了兩套 consumer API:
1. The high-level Consumer API 2. The SimpleConsumer API
其中 high-level consumer API 提供了一個從 kafka 消費數據的高層抽象,而 SimpleConsumer API 則需要開發人員更多地關注細節。
1.1 The high-level consumer API
high-level consumer API 提供了 consumer group 的語義,一個消息只能被 group 內的一個 consumer 所消費,且 consumer 消費消息時不關注 offset,最后一個 offset 由 zookeeper 保存。
使用 high-level consumer API 可以是多線程的應用,應當注意:
1. 如果消費線程大於 patition 數量,則有些線程將收不到消息 2. 如果 patition 數量大於線程數,則有些線程多收到多個 patition 的消息 3. 如果一個線程消費多個 patition,則無法保證你收到的消息的順序,而一個 patition 內的消息是有序的
1.2 The SimpleConsumer API
如果你想要對 patition 有更多的控制權,那就應該使用 SimpleConsumer API,比如:
1. 多次讀取一個消息 2. 只消費一個 patition 中的部分消息 3. 使用事務來保證一個消息僅被消費一次
但是使用此 API 時,partition、offset、broker、leader 等對你不再透明,需要自己去管理。你需要做大量的額外工作:
1. 必須在應用程序中跟蹤 offset,從而確定下一條應該消費哪條消息 2. 應用程序需要通過程序獲知每個 Partition 的 leader 是誰 3. 需要處理 leader 的變更
使用 SimpleConsumer API 的一般流程如下:
1. 查找到一個“活着”的 broker,並且找出每個 partition 的 leader 2. 找出每個 partition 的 follower 3. 定義好請求,該請求應該能描述應用程序需要哪些數據 4. fetch 數據 5. 識別 leader 的變化,並對之作出必要的響應
以下針對 high-level Consumer API 進行說明。
2、 consumer group
如 2.2 節所說, kafka 的分配單位是 patition。每個 consumer 都屬於一個 group,一個 partition 只能被同一個 group 內的一個 consumer 所消費(也就保障了一個消息只能被 group 內的一個 consuemr 所消費),但是多個 group 可以同時消費這個 partition。
kafka 的設計目標之一就是同時實現離線處理和實時處理,根據這一特性,可以使用 spark/Storm 這些實時處理系統對消息在線處理,同時使用 Hadoop 批處理系統進行離線處理,還可以將數據備份到另一個數據中心,只需要保證這三者屬於不同的 consumer group。如下圖所示:
圖.8
3、 消費方式
consumer 采用 pull 模式從 broker 中讀取數據。
push 模式很難適應消費速率不同的消費者,因為消息發送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現就是拒絕服務以及網絡擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費消息。
對於 Kafka 而言,pull 模式更合適,它可簡化 broker 的設計,consumer 可自主控制消費消息的速率,同時 consumer 可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
4 、consumer delivery guarantee
如果將 consumer 設置為 autocommit,consumer 一旦讀到數據立即自動 commit。如果只討論這一讀取消息的過程,那 Kafka 確保了 Exactly once。
但實際使用中應用程序並非在 consumer 讀取完數據就結束了,而是要進行進一步處理,而數據處理與 commit 的順序在很大程度上決定了consumer delivery guarantee1.讀完消息先 commit 再處理消息。 這種模式下,如果 consumer 在 commit 后還沒來得及處理消息就 crash 了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應於 At most once
2.讀完消息先處理再 commit。 這種模式下,如果在處理完消息之后 commit 之前 consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經被處理過了。這就對應於 At least once。 3.如果一定要做到 Exactly once,就需要協調 offset 和實際操作的輸出。 精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統可能不支持兩階段提交。比如,consumer 拿到數據后可能
把數據放到 HDFS,如果把最新的 offset 和數據本身一起寫到 HDFS,那就可以保證數據的輸出和 offset 的更新要么都完成,要么都不完成,間接實現 Exactly once。(目前就 high-level
API而言,
offset 是存於Zookeeper 中的,無法存於HDFS,而SimpleConsuemr API的 offset 是由自己去維護的,可以將之存於 HDFS 中)
總之,Kafka 默認保證 At least once,並且允許通過設置 producer 異步提交來實現 At most once(見文章《kafka consumer防止數據丟失》)。而 Exactly once 要求與外部存儲系統協作,幸運的是 kafka 提供的 offset 可以非常直接非常容易得使用這種方式。
更多關於 kafka 傳輸語義的信息請參考《Message Delivery Semantics》。
5 、consumer rebalance
當有 consumer 加入或退出、以及 partition 的改變(如 broker 加入或退出)時會觸發 rebalance。consumer rebalance算法如下:
1. 將目標 topic 下的所有 partirtion 排序,存於PT 2. 對某 consumer group 下所有 consumer 排序,存於 CG,第 i 個consumer 記為 Ci 3. N=size(PT)/size(CG),向上取整 4. 解除 Ci 對原來分配的 partition 的消費權(i從0開始) 5. 將第i*N到(i+1)*N-1個 partition 分配給 Ci
在 0.8.*版本,每個 consumer 都只負責調整自己所消費的 partition,為了保證整個consumer group 的一致性,當一個 consumer 觸發了 rebalance 時,該 consumer group 內的其它所有其它 consumer 也應該同時觸發 rebalance。這會導致以下幾個問題:
1.Herd effect 任何 broker 或者 consumer 的增減都會觸發所有的 consumer 的 rebalance 2.Split Brain 每個 consumer 分別單獨通過 zookeeper 判斷哪些 broker 和 consumer 宕機了,那么不同 consumer 在同一時刻從
zookeeper 看到的 view 就可能不一樣,這是由 zookeeper 的特性決定的,這就會造成不正確的 reblance 嘗試。 3. 調整結果不可控 所有的 consumer 都並不知道其它 consumer 的 rebalance 是否成功,這可能會導致 kafka 工作在一個不正確的狀態。
基於以上問題,kafka 設計者考慮在0.9.*版本開始使用中心 coordinator 來控制 consumer rebalance,然后又從簡便性和驗證要求兩方面考慮,計划在 consumer 客戶端實現分配方案。(見文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此處不再贅述。