兩階段提交協議(Two-phase Commit,2PC)經常被用來實現分布式事務。一般分為協調器C和若干事務執行者Si兩種角色,這里的事務執行者就是具體的數據庫,協調器可以和事務執行器在一台機器上。

- 我們的應用程序(client)發起一個開始請求到TC;
- TC先將<prepare>消息寫到本地日志,之后向所有的Si發起<prepare>消息。以支付寶轉賬到余額寶為例,TC給A的prepare消息是通知支付寶數據庫相應賬目扣款1萬,TC給B的prepare消息是通知余額寶數據庫相應賬目增加1w。為什么在執行任務前需要先寫本地日志,主要是為了故障后恢復用,本地日志起到現實生活中憑證 的效果,如果沒有本地日志(憑證),出問題容易死無對證;
- Si收到<prepare>消息后,執行具體本機事務,但不會進行commit,如果成功返回<yes>,不成功返回<no>。同理,返回前都應把要返回的消息寫到日志里,當作憑證。
- TC收集所有執行器返回的消息,如果所有執行器都返回yes,那么給所有執行器發生送commit消息,執行器收到commit后執行本地事務的commit操作;如果有任一個執行器返回no,那么給所有執行器發送abort消息,執行器收到abort消息后執行事務abort操作。
注:TC或Si把發送或接收到的消息先寫到日志里,主要是為了故障后恢復用。如某一Si從故障中恢復后,先檢查本機的日志,如果已收到<commit >,則提交,如果<abort >則回滾。如果是<yes>,則再向TC詢問一下,確定下一步。如果什么都沒有,則很可能在<prepare>階段Si就崩潰了,因此需要回滾。
現如今實現基於兩階段提交的分布式事務也沒那么困難了,如果使用Java,那么可以使用開源軟件atomikos(http://www.atomikos.com/)來快速實現。
不過但凡使用過的上述兩階段提交的同學都可以發現性能實在是太差,根本不適合高並發的系統。為什么?
- 兩階段提交涉及多次節點間的網絡通信,通信時間太長!
- 事務時間相對於變長了,鎖定的資源的時間也變長了,造成資源等待時間也增加好多!
正是由於分布式事務存在很嚴重的性能問題,大部分高並發服務都在避免使用,往往通過其他途徑來解決數據一致性問題。
使用事件和消息隊列實現分布式事務
不同於單一架構應用(Monolith), 分布式環境下, 進行事務操作將變得困難, 因為分布式環境通常會有多個數據源, 只用本地數據庫事務難以保證多個數據源數據的一致性. 這種情況下, 可以使用兩階段或者三階段提交協議來完成分布式事務.但是使用這種方式一般來說性能較差, 因為事務管理器需要在多個數據源之間進行多次等待. 有一種方法同樣可以解決分布式事務問題, 並且性能較好, 這就是我這篇文章要介紹的使用事件,本地事務以及消息隊列來實現分布式事務.
我們從一個簡單的實例入手. 基本所有互聯網應用都會有用戶注冊的功能. 在這個例子中, 我們對於用戶注冊有兩步操作:
- 注冊成功, 保存用戶信息.
- 需要給用戶發放一張代金券, 目的是鼓勵用戶進行消費.
如果是一個單一架構應用, 實現這個功能非常簡單: 在一個本地事務里, 往用戶表插一條記錄, 並且在代金券表里插一條記錄, 提交事務就完成了. 但是如果我們的應用是用微服務實現的, 可能用戶和代金券是兩個獨立的服務, 他們有各自的應用和數據庫, 那么就沒有辦法簡單的使用本地事務來保證操作的原子性了. 現在來看看如何使用事件機制和消息隊列來實現這個需求.(我在這里使用的消息隊列是kafka, 原理同樣適用於ActiveMQ/RabbitMQ等其他隊列)
我們會為用戶注冊這個操作創建一個事件, 該事件就叫做用戶創建事件(USER_CREATED). 用戶服務成功保存用戶記錄后, 會發送用戶創建事件到消息隊列, 代金券服務會監聽用戶創建事件, 一旦接收到該事件, 代金券服務就會在自己的數據庫中為該用戶創建一張代金券. 好了, 這些步驟看起來都相當的簡單直觀, 但是怎么保證事務的原子性呢? 考慮下面這兩個場景:
- 用戶服務在保存用戶記錄, 還沒來得及向消息隊列發送消息之前就宕機了. 怎么保證用戶創建事件一定發送到消息隊列了?
- 代金券服務接收到用戶創建事件, 還沒來得及處理事件就宕機了. 重新啟動之后如何消費之前的用戶創建事件?
這兩個問題的本質是: 如何讓操作數據庫和操作消息隊列這兩個操作成為一個原子操作. 不考慮2PC, 這里我們可以通過事件表來解決這個問題. 下面是類圖.

EventPublish是記錄待發布事件的表. 其中:
- id: 每個事件在創建的時候都會生成一個全局唯一ID, 例如UUID.
- status: 事件狀態, 枚舉類型. 現在只有兩個狀態: 待發布(NEW), 已發布(PUBLISHED).
- payload: 事件內容. 這里我們會將事件內容轉成json存到這個字段里.
- eventType: 事件類型, 枚舉類型. 每個事件都會有一個類型, 比如我們之前提到的創建用戶USER_CREATED就是一個事件類型.
EventProcess是用來記錄待處理的事件. 字段與EventPublish基本相同.
我們首先看看事件的發布過程. 下面是用戶服務發布用戶創建事件的順序圖.

- 用戶服務在接收到用戶請求后開啟事務, 在用戶表創建一條用戶記錄, 並且在EventPublish表創建一條status為NEW的記錄, payload記錄的是事件內容, 提交事務.
- 用戶服務中的定時器首先開啟事務, 然后查詢EventPublish是否有status為NEW的記錄, 查詢到記錄之后, 拿到payload信息, 將消息發布到kafka中對應的topic.發送成功之后, 修改數據庫中EventPublish的status為PUBLISHED, 提交事務.
下面是代金券服務處理用戶創建事件的順序圖.

- 代金券服務接收到kafka傳來的用戶創建事件(實際上是代金券服務主動拉取的消息, 先忽略消息隊列的實現), 在EventProcess表創建一條status為NEW的記錄, payload記錄的是事件內容, 如果保存成功, 向kafka返回接收成功的消息.
- 代金券服務中的定時器首先開啟事務, 然后查詢EventProcess是否有status為NEW的記錄, 查詢到記錄之后, 拿到payload信息, 交給事件回調處理器處理, 這里是直接創建代金券記錄. 處理成功之后修改數據庫中EventProcess的status為PROCESSED, 最后提交事務.
回過頭來看我們之前提出的兩個問題:
- 用戶服務在保存用戶記錄, 還沒來得及向消息隊列發送消息之前就宕機了. 怎么保證用戶創建事件一定發送到消息隊列了?
根據事件發布的順序圖, 我們把創建事件和發布事件分成了兩步操作. 如果事件創建成功, 但是在發布的時候宕機了. 啟動之后定時器會重新對之前沒有發布成功的事件進行發布. 如果事件在創建的時候就宕機了, 因為事件創建和業務操作在一個數據庫事務里, 所以對應的業務操作也失敗了, 數據庫狀態的一致性得到了保證. - 代金券服務接收到用戶創建事件, 還沒來得及處理事件就宕機了. 重新啟動之后如何消費之前的用戶創建事件?
根據事件處理的順序圖, 我們把接收事件和處理事件分成了兩步操作. 如果事件接收成功, 但是在處理的時候宕機了. 啟動之后定時器會重新對之前沒有處理成功的事件進行處理. 如果事件在接收的時候就宕機了, kafka會重新將事件發送給對應服務.
通過這種方式, 我們不用2PC, 也保證了多個數據源之間狀態的最終一致性. 和2PC/3PC這種同步事務處理的方式相比, 這種異步事務處理方式具有異步系統通常都有的優點:
- 事務吞吐量大. 因為不需要等待其他數據源響應.
- 容錯性好. A服務在發布事件的時候, B服務甚至可以不在線.
缺點:
- 編程與調試較復雜.
- 容易出現較多的中間狀態. 比如上面的例子, 在用戶服務已經保存了用戶並發布了事件, 但是代金券服務還沒來得及處理之前, 用戶如果登錄系統, 會發現自己是沒有代金券的. 這種情況可能在有些業務中是能夠容忍的, 但是有些業務卻不行. 所以開發之前要考慮好.
另外, 上面的流程在實現的過程中還有一些可以改進的地方:
- 定時器在更新EventPublish狀態為PUBLISHED的時候, 可以一次批量更新多個EventProcess的狀態.
- 定時器查詢EventProcess並交給事件回調處理器處理的時候, 可以使用線程池異步處理, 加快EventProcess處理周期.
- 在保存EventPublish和EventProcess的時候同時保存到Redis, 之后的操作可以對Redis中的數據進行, 但是要小心處理緩存和數據庫可能狀態不一致問題.
- 針對Kafka, 因為Kafka的特點是可能重發消息, 所以在接收事件並且保存到EventProcess的時候可能報主鍵沖突的錯誤(因為重復消息id是相同的), 這個時候可以直接丟棄該消息.