考慮一個分布式場景中一個常見的場景:服務A執行某個數據庫操作成功后,會發送一條消息到消息隊列,現在希望只有數據庫操作執行成功才發送這條消息。下面是一些常見的作法:
1. 先執行數據庫操作,再發送消息
public void purchaseOrder() { orderDao.save(order); messageQueue.send(message); }
有可能order新增成功,發送消息失敗。最終形成不一致狀態。
2. 先發送消息,再執行數據庫操作
public void purchaseOrder() { messageQueue.send(message); orderDao.save(order); }
有可能消息發送成功,而order新增失敗,從而形成不一致狀態。
3. 在數據庫事務中,先發送消息,再執行數據庫操作
@Transactional public void purchaseOrder() { messageQueue.send(message); orderDao.save(order); }
這里同樣無法保證一致性。如果數據庫操作成功,然而消息已經發送了,無法進行回滾。
4. 在數據庫事務中,先執行數據庫操作,再發送消息
@Transactional public void purchaseOrder() { orderDao.save(order); messageQueue.send(message); }
這種方案成功與否,取決於消息隊列是否擁有應答機制和事務機制。
應答機制表示producer發送消息后,消息隊列能夠返回response從而證明消息是否插入成功。
如果消息隊列擁有應答機制,將上面的代碼改寫為:
@Transactional public void purchaseOrder() { orderDao.save(order); try{ kafkaProducer.send(message).get(); } catch(Exception e) throw new RuntimeException("Fail to send message"); }
這段代碼表示如果發送發收到消息隊列錯誤的response,就拋出一個RuntimeException。那么消息發送失敗,能夠造成數據庫操作的回滾。這個方案看似可行,然而存在這樣一種情況,如果消息發送成功,而消息隊列由於網絡原因沒有即時返回response,此時消息發送方由於沒有及時收到應答從而認為消息發送失敗了,因此消息發送方的數據庫事務回滾了,然而消息的確已經插入成功,從而造成了最終不一致性。
上面的不一致性可以通過消息的事務機制解決。
事務機制表示消息隊列中的消息是否擁有狀態,從而決定消費者是否消費該條消息。
Alibaba旗下的開源消息隊列RocketMQ以高可用性聞名,它是最早支持事務消息的消息隊列。Kafka從版本0.11開始也支持了事務機制。
RoketMQ的事務機制是將消息標記為Prepared狀態或者Confirmed狀態。處於Prepared狀態的消息對consumer不可見。
而Kafka通過Transaction Marker將消息標記為Uncommited或Commited狀態。Consumer通過配置isolation-level
為read_committed
或read_uncommitted
來決定對哪種類型的消息可見。
5. 消息隊列不支持事務消息
如果消息隊列不支持事務消息,那么我們的解決方案是,新增一張message表,並開啟一個定時任務掃描這張message表,將所有狀態為prepared的message發送給消息隊列,發送成功后,將message狀態置為confirmed。
代碼如下:
@Transactional public void purchaseOrder() { orderDao.save(order); messageService.save(message); }
此時插入order和插入message的邏輯處於同一個數據庫事務,通過后台的定時程序不斷掃描message表,因此一定能夠保證消息被成功投遞到消息消費方。
這個方案存在的一個問題是,有可能后台任務發送消息成功后宕機了,從而沒有來得及將已發送的message狀態置為confirmed。因此下一次掃描message表時,會重復發送該條消息。這就是at least once delivery。
由於at least once delivery的特性,consumer有可能收到重復的數據。此時可以在consumer端建立一張message_consume表,來判斷消息是否已經消費過,如果已經消費過,那么就直接丟棄該消息。