分布式事務解決方案


分布式事務解決方案

分布式事務

什么是分布式事務

指一次大的操作由不同的小操作組成的,這些小的操作分布在不同的服務器上,分布式事務需要保證這些小操作要么全部成功,要么全部失敗。從本質上來說,分布式事務就是為了保證不同數據庫的數據一致性。

典型的分布式事務場景:跨銀行轉操作就涉及調用兩個異地銀行服務

分布式產生的原因

1、數據庫分表

當數據庫單表數據達到千萬級別,就要考慮分庫分表,那么就會從原來的一個數據庫變成多個數據庫。例如如果一個操作即操作了01庫,又操作了02庫,而且又要保證數據的一致性,那么就要用到分布式事務。
20211122095726
2、應用SOA化

所謂的SOA化,就是業務的服務化。例如電商平台下單操作就會產生調用庫存服務扣減庫存和訂單服務更新訂單數據,那么就會設計到訂單數據庫和庫存數據庫,為了保證數據的一致性,就需要用到分布式事務。
20211122095745

總結:其實上面兩種場景,歸根到底是要操作多數據庫,並且要保證數據的一致性,而產生的分布式事務的。

分布式事務的CAP理論

CAP理論:CAP原則又稱CAP定理,指的是在一個分布式系統中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。

CAP原則是NOSQL數據庫的基石。

分布式系統的CAP理論:理論首先把分布式系統中的三個特性進行了如下歸納:

一致性(C):數據在多個副本之間是否能夠保持一致的特性。

可用性(A):是指系統提供的服務必須一致處於可用狀態,對於每一個用戶的請求總是在有限的時間內返回結果,超過時間就認為系統是不可用的

分區容錯性(P):分布式系統在遇到任何網絡分區故障的時候,仍然需要能夠保證對外提供滿足一致性和可用性的服務,除非整個網絡環境都發生故障。
20211122100225

BASE理論:BASE是基本可用,軟狀態,最終一致性。是對CAP中一致性和可用性權限的結果,是基於CAP定理演化而來的,核心思想是即使無法做到強一致性,但每個應用都可以根據自身的業務特定,采用適當的方式來使系統達到最終一致性

分布式4種常見解決方案

2PC提交

二階段提交,是指將事務提交分成兩個部分:准備階段和提交階段。事務的發起者稱之為協調者,事務的執行者稱為參與者。

階段一:准備階段

由協調者發起並傳遞帶有事務信息的請求給各個參與者,詢問是否可以提交事務,並等待返回結果。

個 參與者執行事務操作,將Undo和Redo放入事務日志中(但是不提交)

如果參與者執行成功就返回YES(可以提交事務),失敗NO(不能提交事務)

階段二:提交階段

此階段分兩種情況:所有參與者均返回YES,有任何一個參與者返回NO

所有參與者均反饋YES時,即提交事務。

任何一個參與者反饋NO時,即中斷事務。

3PC提交

3PC,三階段提交協議,是2PC的改進版本,即將事務的提交過程分為CanCommit、PreCommit、do Commit三個階段來進行處理。

階段一:CanCommit

1、協調者向所有參與者發出包含事務內容的CanCommit請求,詢問是否可以提交事務,並等待所有參與者答復。

2、參與者收到CanCommit請求后,如果認為可以執行事務操作,則反饋YES並進入預備狀態,否則反饋NO。
階段二:PreCommit

此階段分為兩種情況:

1.所有參與者均受到請求並返回YES。

2.有任何一個參與者返回NO,或者有任何一個參與者超時,協調者無法收到反饋,則事務中斷

事務預提交:(所有參與者均反饋YES時)

1、協調者向所有參與者發出PreCommit請求,進入准備階段。

2、參與者收到PreCommit請求后,執行事務操作,將Undo和Redo信息記入事務日志中(但不提交事務)。

3、各參與者向協調者反饋Ack響應或No響應,並等待最終指令。

中斷事務:(任何一個參與者反饋NO,或者等待超時后協調者尚無法收到所有參與者的反饋時)

1、協調者向所有參與者發出abort請求。

2、無論收到協調者發出的abort請求,或者在等待協調者請求過程中出現超時,參與者均會中斷事務。

階段3:do Commit

此階段也存在兩種情況:

1、所有參與者均反饋Ack響應,即執行真正的事務提交。

2、任何一個參與者反饋NO,或者等待超時后協調者尚無法收到所有參與者的反饋,即中斷事務。

本地消息表(阿里Seata)

Seata的分布式事務解決方案是業務層面的解決方案,只依賴於單台數據庫的事務能力。Seata框架中一個分布式事務包含3中角色:

Transaction Coordinator (TC): 事務協調器,維護全局事務的運行狀態,負責協調並驅動全局事務的提交或回滾。

Transaction Manager (TM): 控制全局事務的邊界,負責開啟一個全局事務,並最終發起全局提交或全局回滾的決議。

Resource Manager (RM): 控制分支事務,負責分支注冊、狀態匯報,並接收事務協調器的指令,驅動分支(本地)事務的提交和回滾。

其中,TM是一個分布式事務的發起者和終結者,TC負責維護分布式事務的運行狀態,而RM則負責本地事務的運行。如下圖所示:

20211122102337

mq的ack手動補償事務(rabbitmq)

RabbitMQ中與事務有關的主要有三個方法:

txSelect()

txCommit()

txRollback()

try {
            // 開啟事務
            channel.txSelect();
            // 往隊列中發出一條消息,使用rabbitmq默認交換機
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            // 提交事務
            channel.txCommit();
        } catch (Exception e) {
            e.printStackTrace();
            // 事務回滾
            channel.txRollback();
        }

消息生產者發送消息主要執行了四個步驟:

1:Client發送Tx.Select

2:Broker發送Tx.Select-Ok(在它之后,發送消息)

3:Client發送Tx.Commit

4:Broker發送Tx.Commit-Ok

消費者手動ack確認

// 關閉自動確認
  channel.basicConsume(QUEUE_NAME, false, queueingConsumer1);
 //手動確認ack  是否批量處理.true:將一次性ack所有小於deliveryTag的消息
 channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);

  public static void main(String[] args)  throws Exception {
       Connection connection = RabbitConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest");
       Channel channel = connection.createChannel();
       //3.聲明隊列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       //設置每次從隊列獲取消息的數量 能者多勞的原則不至於做的快的做完了就歇着了!
       channel.basicQos(1);
       //4.定義隊列的消費者
       QueueingConsumer queueingConsumer1 = new QueueingConsumer(channel);
        /*
    true:表示自動確認,只要消息從隊列中獲取,無論消費者獲取到消息后是否成功消費,都會認為消息成功消費.
    false:表示手動確認,消費者獲取消息后,服務器會將該消息標記為不可用狀態,等待消費者的反饋,
    如果消費者一直沒有反饋,那么該消息將一直處於不可用狀態,並且服務器會認為該消費者已經掛掉,不會再給其發送消息,
    直到該消費者反饋.例如:channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
     */
       //b:ack
       channel.basicConsume(QUEUE_NAME, false, queueingConsumer1);
       //6.獲取消息
       while (true) {
           anInt += 1;
           QueueingConsumer.Delivery delivery1 = queueingConsumer1.nextDelivery(); 
           String message1 = String.valueOf(delivery1.getBody());          
           System.out.println("[" + String.valueOf(anInt) + "]:receve msg:" + message1);
           //手動確認ack  是否批量處理.true:將一次性ack所有小於deliveryTag的消息
           channel.basicAck(delivery1.getEnvelope().getDeliveryTag(), false);
           // System.out.println("[x] Received '" + message2 + "'");
           Thread.sleep(100);
       }
   }

福祿·研發中心 福小雄


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM