kafka - 為CQRS而生


  前段時間跟一個朋友聊起kafka,flint,spark這些是不是某種分布式運算框架。我自認為的分布式運算框架最基礎條件是能夠把多個集群節點當作一個完整的系統,然后程序好像是在同一台機器的內存里運行一樣。當然,這種集成實現方式有賴於底層的一套消息系統。這套消息系統可以把消息隨意在集群各節點之間自由傳遞。所以如果能夠通過消息來驅動某段程序的運行,那么這段程序就有可能在集群中任何一個節點上運行了。好了,akka-cluster是通過對每個集群節點上的中介發送消息使之調動該節點上某段程序運行來實現分布式運算的。那么,kafka也可以實現消息在集群節點間的自由流通,是不是也是一個分布式運算框架呢?實際上,kafka設計強調的重點是消息的接收,或者叫消息消費機制。至於接收消息后怎么去應對,用什么方式處理,都是kafka用戶自己的事了。與分布式運算框架像akka-cluster對比,kafka還缺了個在每個集群節點上的”運算調度中介“,所以kafka應該不算我所指的分布式運算框架,充其量是一種分布式的消息傳遞系統。實際上kafka是一種高吞吐量、高可用性、安全穩定、有良好口碑的分布式消息系統。

  kafka的本質是一種commit-log,或者“事件記錄系統”:上游產生的數據(即事件)會按發生時間順序存入kafka,然后下游可以對任何時間段內事件按序進行讀取,重演運算產生那段時間內的某種狀態。這不就是妥妥的CQRS模式嗎?當然kafka也可以使用在其它一些場景如:消息隊列,數據存儲等,不過這些都是commit-log的具體應用。

  常常看到網上有朋友抱怨akka-cluster的一些處理方式太底層或太基礎了。用戶往往需要自己來增加一些方法來確保使用安全。我想作為一種消息驅動系統,如何保證akka消息的正確產生和安全使用應該是最基本的要求。而恰恰akka是沒有提供對消息遺漏和重復消息的保障機制。我想這也是造成akka用戶擔心的主要原因。上面提到kafka是一種高吞吐量、高可用性、安全穩定的分布式消息系統,特別是它提供了對exactly-once,“保證一次”的消息使用支持。那么通過kafka實現一套CQRS模式的實時交易處理系統應該是可行的。這也是我使用kafka的主要目的。

  上面提到,希望能充分利用kafka commit-log特性來開發一個基於CQRS的實時交易系統,比如支付系統、庫存管理系統,從實踐中了解kafka。kafka支持多種語言終端,怪異的是沒有scala終端。kafka是用scala開發的,不提供scala終端實在是說不通啊。不過akka在alpakka社區提供了alpakka-kafka:這個東西是個基於akka-streams的kafka scala終端編程工具,稍微過了一下,感覺功能比較全面,那就是它了。不過在開始前先把kafka的原理和基本情況做個介紹:

從表面上看kafka就是一個簡單的消息存儲和傳遞工具。不過因為其特殊分布式的消息發布、存儲、讀取處理機制,使其成為一種高吞吐量、高可用性、安全穩定的分布式消息處理工具。從應用角度來講,kafka應用包括三個方面,kafka本身,就叫kafka引擎吧,發布終端、訂閱終端,即:kafka,writer,reader三部分,其中:所有復雜的功能實現是包嵌在kafka內部的,writer,reader應該整合到用戶應用里。kafka的作業是圍繞着消息的發布訂閱/讀寫進行的。所謂消息即CQRS模式里的事件。那么kafka的工作原理直白點就是writer向kafka寫事件,kafka把事件按發生時間順序保存,reader再按順序從kafka讀取事件並進行處理以產生新的業務狀態,如在某個庫位的一個商品數量得到了更新。當然原理看似簡單,但具體的實現才是真正復雜的地方。

首先,writer和reader是以事件關聯的,即:write發布某種類型的事件,而reader則是訂閱相同類型的事件。 這里的事件也就是topic,或一項業務,如:圖書類當前庫存。為了提高數據吞吐量,每個topic又可以細分為多個partition。每個partition分擔所屬topic消息類型下的一些指定的細分類消息或者事件,如"圖書庫房101"。如果把這些partition再分布到一個集群的節點上,就可以實現高吞吐量的分布式讀寫,然后通過集群partition的復本同步又可以達到數據安全及系統高可用性的目的。這些集群節點就是所謂的broker了。發布消息內容由topic,key,value所組成。其中key值指定該消息應該寫入那個partition,即通過對key進行hash計算得出partition id。hash算法可以保證相同的key值永遠指定同一個partition。值得注意的是kafka保證每個partition上的事件肯定按照發生時間排序,所以要保證一種事件只能寫入同一個partition。當然,一個partition可以承載多種事件。要注意的是創建topic和partition都是嚴格的管理工作admin,不是在某些程序中任意進行增減的。一般來講,在創建一個新topic時就要確定它下面的partition數量了。這個partition數量要按照對數據吞吐量需求設定。但一般是集群節點的倍數,這樣partition可以均勻分布在各broker上。

好了,該到reader這頭了:reader作業從訂閱某個topic開始。上面提過:一個topic下面可能有多個partition,但每個partition都會包含topic的其中幾個子業務的全部事件,而且這些事件是嚴格按發生時間排序的。kafka有個reader group這么個概念:針對同一個topic,容許有一組多個reader對這個topic下的partition進行讀取。但每個partition只容許組內一個reader讀取。至於goup內reader是如何分配partition的完全由kafka內部解決。如果發現新partition或者組內reader有增減變化,kafka會自動進行再分配rebalance。所以總的來說訂閱某個topic的一個組內reader應該負責那個partition是不確定的,加上隨時可能發生動態再分配的情況,比如組內某個reader出問題倒了。換言之組內所有reader都必須具備處理整個topic所有類型業務的能力,如此才能解決組內reader-partition關系不確定的難題。kafka最重要的特點就是可以容許不同的應用通過不同的reader-group對同一個partition上的事件進行任意讀取,本意應該是不同的應用可以利用同一個業務事件序列進行不同的業務處理。具體實現方式應該是每個組對某個partition上事件最后讀取的位置分別進行了登記,offset-commit。這樣,即使發生了重新分配rebalance組內任何一個reader對分配到的partition應從那個位置開始讀還是確定的。這個offset-commit方式描述了幾種事件讀取模式:

1、at-most-once, 最多一次:如果剛讀取事件,在進行業務處理之前就登記位置commit-offset,那么commit-offset后位置已經登記,即使業務處理失敗也再也不可能二次讀取了。 

2、at-least-once,最少一次:讀取事件、完成業務處理后才commit-offset。如果處理業務中系統故障,只能從上次登記的位置重新讀取了,那么就會出現重復讀取的情況。

3、exactly-once, 保證只一次:控制commit-offset的時間節點是取得at-most-once, at-least-once之間安全系數的一種方式。但exactly-once不容許有模糊地帶。具體做法是把業務處理和commit-offset作為一個完整事物單元來處理(atomic-transaction)。兩樣操作同時成功或失敗。

我覺着kafka的exactly-once能力最值得推介。因為在akka或者其它消息隊列工具里不容易得到保證。而在一個消息驅動的實時交易系統里,保證事件重演能正確反映當時狀態是關鍵。任何事件遺失或重復都會造成不可逆轉的誤差。那么下面的一系列討論我就會嘗試用alpakka-kafka來構建一個基於CQRS模式的實時交易系統,並和大家進行交流分享。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM