分布式事務(1)-理論基礎
分布式事務(2)---強一致性分布式事務解決方案
分布式事務(3)---強一致性分布式事務Atomikos實戰
分布式事務(4)---最終一致性方案之TCC
可靠消息最終一致性是解決分布式事務中一種典型的柔性方案。通常有兩種實現方式,一種是基於本地消息表,一種是基於RocketMQ的事務消息。需要注意發送消息的一致性和消息的可靠性。
基本原理:
事務發起方執行本地事務成功后發出一條消息,事務參與方也就是消息的消費者,接收到消息並執行成功本地事務。這樣來達到數據的最終一致性。
需要注意發起方一定能夠將消息發送出去,參與方一定能成功接收到消息。這樣來確保消息的可靠性。否則同樣會出現分布式事務問題。
本地消息表:
為了防止在使用消息一致性方案時,出現消息丟失,可以使用本地消息表來保證消息的發送。通過本地事務將業務數據和消息寫入本地數據庫,這一步操作是本地事務可以保證消息表必然會寫入數據。然后通過定時任務讀物本地消息表中的數據,將消息發送給消息中間件。如果發送失敗,進行重試,因此還涉及到冪等操作。消費方接收到消息之后,執行業務(本地事務)成功,則完成分布式事務,若失敗則進行重試。如果多次任然失敗,則通知事務發起方進行事務回滾。

方案存在如下缺點:
1.消息表耦合在業務庫中,需要額外的處理發送消息的操作,不利於消息的擴展,同事如果消息表堆積了大量消息數據,會對業務操作產生一定的性能影響。
2.消息發送失敗需要重試,需要保證操作的相關操作的冪等
3.多次重試依然失敗需要人工干預
4.消息服務與業務耦合,不利於消息服務的擴展。
RocketMQ事務消息
RocketMQ在4.3版本后引入了完整的事務消息機制,其內部實現了本地消息表的邏輯,使用其事務消息極大的減輕了開發的工作量。
在RocketMQ中,producer和broker具有雙向通信能力,使得broker自然的具備了事務協調者的能力。
RocketMQ事務消息分布式事務解決方案原理圖:

roketMQ事務消息案例,官方復制粘貼:
事務消息共有三種狀態,提交狀態、回滾狀態、中間狀態:
- TransactionStatus.CommitTransaction: 提交事務,它允許消費者消費此消息。
- TransactionStatus.RollbackTransaction: 回滾事務,它代表該消息將被刪除,不允許被消費。
- TransactionStatus.Unknown: 中間狀態,它代表需要檢查消息隊列來確定狀態。
1、創建事務性生產者 使用 TransactionMQProducer類創建生產者,並指定唯一的 ProducerGroup,就可以設置自定義線程池來處理這些檢查請求。執行本地事務后、需要根據執行結果對消息隊列進行回復。 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class TransactionProducer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionListener transactionListener = new TransactionListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name"); ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest1234", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", sendResult); Thread.sleep(10); } catch (MQClientException | UnsupportedEncodingException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } } 2、實現事務的監聽接口 當發送半消息成功時,我們使用 executeLocalTransaction 方法來執行本地事務。它返回前一節中提到的三個事務狀態之一。checkLocalTransaction 方法用於檢查本地事務狀態,並回應消息隊列的檢查請求。它也是返回前一節中提到的三個事務狀態之一。 public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.UNKNOW; } @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Integer status = localTrans.get(msg.getTransactionId()); if (null != status) { switch (status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } } 3. 事務消息使用上的限制 1.事務消息不支持延時消息和批量消息。 2.為了避免單個消息被檢查太多次而導致半隊列消息累積,我們默認將單個消息的檢查次數限制為 15 次,但是用戶可以通過 Broker 配置文件的 transactionCheckMax參數來修改此限制。如果已經檢查某條消息超過 N 次的話( N = transactionCheckMax ) 則 Broker 將丟棄此消息,並在默認情況下同時打印錯誤日志。用戶可以通過重寫 AbstractTransactionalMessageCheckListener 類來修改這個行為。 3.事務消息將在 Broker 配置文件中的參數 transactionTimeout 這樣的特定時間長度之后被檢查。當發送事務消息時,用戶還可以通過設置用戶屬性 CHECK_IMMUNITY_TIME_IN_SECONDS 來改變這個限制,該參數優先於 transactionTimeout 參數。 4.事務性消息可能不止一次被檢查或消費。 5.提交給用戶的目標主題消息可能會失敗,目前這依日志的記錄而定。它的高可用性通過 RocketMQ 本身的高可用性機制來保證,如果希望確保事務消息不丟失、並且事務完整性得到保證,建議使用同步的雙重寫入機制。 6.事務消息的生產者 ID 不能與其他類型消息的生產者 ID 共享。與其他類型的消息不同,事務消息允許反向查詢、MQ服務器能通過它們的生產者 ID 查詢到消費者。
消息發送的一致性
消息發送的一致性指的事務發起方執行本地事務成功則一定能把其產生的消息發送出去。這里涉及到消息發送與確認機制,消息發送的不可靠性,如何保證消息發送的一致性。
消息發送與確認機制:
常規中間的消息發送與確認機制如下:
1.生產者執行本地事務,然后將消息發送到MQ,這里可以是同步或者異步
2.MQ接收到消息后,將消息數據持久化到磁盤。這個MQ都會提供相應的配置
3.MQ向生產者返回發送結果(消息狀態或者異常)
4.消費者監聽消費消息
5.消費者執行本地事務
6.消費者向消息MQ確認消費消息
這種流程一般來說無法保證消息發送的一致性。
消息發送如何不一致:
1.先操作數據庫,再發送消息。數據庫寫入了,但消息可能沒有發送出去,事務參與方就沒有消息可消費
public void tx() {
//1.執行業務
//2.發送消息
}
2.先發消息,在操作庫。消息發出去,但是本地事務執行失敗,參與方可以執行業務,但是發起方沒有執行業務
public void tx() {
//1.發送消息
//2.執行業務
}
3.同一事務中,先發消息,再操作庫。和第二點一樣,事務回滾無法控制消息的回滾
@Transactional
public void tx() {
//1.發送消息
//2.執行業務
}
4.同一事務中,先操作庫,再發送消息。這種看似正常,數據保存成功,消息發送失敗,事務會回滾。但是如果事務執行成功,消息發送成功,由於網絡原因,導致發送消息相應超時,拋出異常回滾了事務,這個時候消息可能已經被事務參與方消費了,並執行了業務。所以還是需要發送確認機制。流程參考上面RocketMQ事務消息流程圖
@Transactional
public void tx() {
//1.執行業務
//2.發送消息
}
消息接收的一致性:
消息接收與確認:
消息接收的一致性在一定程度上需要滿足消息的接收與確認機制:
1.MQ向消費方投遞消息
2.消費方收到消息,執行本地事務,執行成功/失敗,將結果發送給MQ
3.中間件處理消費者發來的結果,成功則清除消息記錄,失敗則根據不同的情況處理,比如rabbitMQ,可以設置重回隊列
4.MQ投遞消息失敗會進行重試,多次投遞失敗,將消息轉入死信隊列,以便后面人工處理
5.消費方執行完業務,如果如法將結果發送給MQ,同樣應該引入重試機制,比如另起線程,掃表數據狀態,將結果發送給MQ
需要注意:1.消息接收接口需要保證冪等;2.涉及到重試,最好設置重試次數,以免進入死循環。
消息接收不一致:
1.接收消息的接口沒有冪等,如果消息重復投遞則會導致數據不一致。
2.消費者可能無法接收消息,此時MQ並沒有重試投遞,導致事務參與方業務沒有執行,引起數據不一致
3.消費者執行完本地業務后,無法將結果反饋給MQ,MQ無法正確的處理消息,進行了重試,消費接口又沒有冪等導致數據一不一致
如何保證消息接收的一致性:
1.限制MQ消息投遞重試的最大次數
2.消息接收接口保證冪等
3.事務參與方與MQ之間需要確認機制
4.失敗的消息轉入私信隊列,后續人工干預處理
