Java生鮮電商平台-SpringCloud微服務架構中分布式事務解決方案


Java生鮮電商平台-SpringCloud微服務架構中分布式事務解決方案

 

說明:Java生鮮電商平台中由於采用了微服務架構進行業務的處理,買家,賣家,配送,銷售,供應商等進行服務化,但是不可避免存在分布式事務的問題

業界有很多的解決方案,對此我相信大家都百度一下子就有很多,但是我巨人大哥想說的是:微服務架構中應當盡量避免分布式事務。

 

下面就是來討論下,分布式事務中主要聚焦於強一致性和最終一致性的解決方案。

微服務的發展

微服務倡導將復雜的單體應用拆分為若干個功能簡單、松耦合的服務,這樣可以降低開發難度、增強擴展性、便於敏捷開發。當前被越來越多的開發者推崇,很多互聯網行業巨頭、開源社區等都開始了微服務的討論和實踐。

微服務落地存在的問題

雖然微服務現在如火如荼,但對其實踐其實仍處於探索階段。很多中小型互聯網公司,鑒於經驗、技術實力等問題,微服務落地比較困難。

如著名架構師Chris Richardson所言,目前存在的主要困難有如下幾方面:

  • 單體應用拆分為分布式系統后,進程間的通訊機制和故障處理措施變的更加復雜。
  • 系統微服務化后,一個看似簡單的功能,內部可能需要調用多個服務並操作多個數據庫實現,服務調用的分布式事務問題變的非常突出。
  • 微服務數量眾多,其測試、部署、監控等都變的更加困難。

隨着RPC框架的成熟,第一個問題已經逐漸得到解決。例如springcloud可以非常好的支持restful調用,dubbo可以支持多種通訊協議。

對於第三個問題,隨着docker、devops技術的發展以及各公有雲paas平台自動化運維工具的推出,微服務的測試、部署與運維會變得越來越容易。

而對於第二個問題,現在還沒有通用方案很好的解決微服務產生的事務問題。分布式事務已經成為微服務落地最大的阻礙,也是最具挑戰性的一個技術難題。

ACID

  • 原子性(Atomicity): 一個事務的所有系列操作步驟被看成是一個動作,所有的步驟要么全部完成要么一個也不會完成,如果事務過程中任何一點失敗,將要被改變的數據庫記錄就不會被真正被改變。

  • 一致性(Consistency): 數據庫的約束 級聯和觸發機制Trigger都必須滿足事務的一致性。也就是說,通過各種途徑包括外鍵約束等任何寫入數據庫的數據都是有效的,不能發生表與表之間存在外鍵約束,但是有數據卻違背這種約束性。所有改變數據庫數據的動作事務必須完成,沒有事務會創建一個無效數據狀態,這是不同於CAP理論的一致性"consistency".

  • 隔離性(Isolation): 主要用於實現並發控制, 隔離能夠確保並發執行的事務能夠順序一個接一個執行,通過隔離,一個未完成事務不會影響另外一個未完成事務。

  • 持久性(Durability): 一旦一個事務被提交,它應該持久保存,不會因為和其他操作沖突而取消這個事務。很多人認為這意味着事務是持久在磁盤上,但是規范沒有特別定義這點。

一致性理論

分布式事務的目的是保障分庫數據一致性,而跨庫事務會遇到各種不可控制的問題,如個別節點永久性宕機,像單機事務一樣的 ACID 是無法奢望的。

另外,業界著名的 CAP 理論也告訴我們,對分布式系統,需要將數據一致性和系統可用性、分區容忍性放在天平上一起考慮。

兩階段提交協議(簡稱2PC)是實現分布式事務較為經典的方案,但 2PC 的可擴展性很差,在分布式架構下應用代價較大,eBay 架構師 Dan Pritchett 提出了 BASE 理論,用於解決大規模分布式系統下的數據一致性問題。

BASE 理論告訴我們:可以通過放棄系統在每個時刻的強一致性來換取系統的可擴展性。

CAP 理論

在分布式系統中,一致性(Consistency)、可用性(Availability)和分區容忍性(Partition Tolerance)3 個要素最多只能同時滿足兩個,不可兼得。其中,分區容忍性又是不可或缺的。

 
 
  • 一致性:分布式環境下,多個節點的數據是否強一致。
  • 可用性:分布式服務能一直保證可用狀態。當用戶發出一個請求后,服務能在有限時間內返回結果。
  • 分區容忍性:特指對網絡分區的容忍性。

舉例:Cassandra、Dynamo 等,默認優先選擇 AP,弱化 C;HBase、MongoDB 等,默認優先選擇 CP,弱化 A。

BASE 理論

核心思想:

  • 基本可用(Basically Available):指分布式系統在出現故障時,允許損失部分的可用性來保證核心可用;
  • 軟狀態(Soft state):指允許分布式系統存在中間狀態,該中間狀態不會影響到系統的整體可用性;
  • 最終一致性(Eventual consistency):指分布式系統中的所有副本數據經過一定時間后,最終能夠達到一致的狀態;
  • 原子性(A)與持久性(D)必須根本保障;
  • 為了可用性、性能與降級服務的需要,只有降低一致性( C ) 與 隔離性( I ) 的要求;
  • 酸鹼平衡(ACID-BASE Balance);

BASE 是對 CAP 中 AP 的一個擴展

一致性模型

數據的一致性模型可以分成以下三類:

  • 強一致性:數據更新成功后,任意時刻所有副本中的數據都是一致的,一般采用同步的方式實現。
  • 弱一致性:數據更新成功后,系統不承諾立即可以讀到最新寫入的值,也不承諾具體多久之后可以讀到。
  • 最終一致性:弱一致性的一種形式,數據更新成功后,系統不承諾立即可以返回最新寫入的值,但是保證最終會返回上一次更新操作的值。

分布式系統數據的強一致性、弱一致性和最終一致性可以通過 Quorum NRW 算法分析。

本地事務

 
 
  • 在單個數據庫的本地並且限制在單個進程內的事務
  • 本地事務不涉及多個數據來源

分布式事務典型方案

  • 兩階段提交(2PC, Two Phase Commit)方案;
  • 本地消息表 (eBay 事件隊列方案);
  • TCC 補償模式;

分類:

  • 兩階段型
  • 補償型
  • 異步確保型
  • 最大努力通知型

服務模式:

  • 可查詢操作
  • 冪等操作
  • TCC操作
  • 可補償操作

兩階段提交2PC(強一致性)

基於XA協議的兩階段提交:

  • 第一階段是表決階段,所有參與者都將本事務能否成功的信息反饋發給協調者;
  • 第二階段是執行階段,協調者根據所有參與者的反饋,通知所有參與者,步調一致地在所有分支上提交或者回滾;
 
 

缺點:

  • 單點問題:事務管理器在整個流程中扮演的角色很關鍵,如果其宕機,比如在第一階段已經完成,在第二階段正准備提交的時候事務管理器宕機,資源管理器就會一直阻塞,導致數據庫無法使用。
  • 同步阻塞:在准備就緒之后,資源管理器中的資源一直處於阻塞,直到提交完成,釋放資源。
  • 數據不一致:兩階段提交協議雖然為分布式數據強一致性所設計,但仍然存在數據不一致性的可能。比如:在第二階段中,假設協調者發出了事務 Commit 的通知,但是因為網絡問題該通知僅被一部分參與者所收到並執行了 Commit 操作,其余的參與者則因為沒有收到通知一直處於阻塞狀態,這時候就產生了數據的不一致性。

總的來說,XA 協議比較簡單,成本較低,但是其單點問題,以及不能支持高並發(由於同步阻塞)依然是其最大的弱點。

本地消息表(最終一致性)

eBay 的架構師 Dan Pritchett,曾在一篇解釋 BASE 原理的論文《Base:An Acid Alternative》中提到一個 eBay 分布式系統一致性問題的解決方案。

 

 

 

它的核心思想是將需要分布式處理的任務通過消息或者日志的方式來異步執行,消息或日志可以存到本地文件、數據庫或消息隊列,再通過業務規則進行失敗重試,它要求各服務的接口是冪等的。

本地消息表與業務數據表處於同一個數據庫中,這樣就能利用本地事務來保證在對這兩個表的操作滿足事務特性,並且使用了消息隊列來保證最終一致性。

  • 在分布式事務操作的一方完成寫業務數據的操作之后向本地消息表發送一個消息,本地事務能保證這個消息一定會被寫入本地消息表中;
  • 之后將本地消息表中的消息轉發到 Kafka 等消息隊列中,如果轉發成功則將消息從本地消息表中刪除,否則繼續重新轉發;
  • 消息消費方處理這個消息,並完成自己的業務邏輯。此時如果本地事務處理成功,表明已經處理成功了,如果處理失敗,那么就會重試執行。如果是業務上面的失敗,可以給生產方發送一個業務補償消息,通知生產方進行回滾等操作;

優點: 一種非常經典的實現,避免了分布式事務,實現了最終一致性。

缺點: 消息表會耦合到業務系統中,如果沒有封裝好的解決方案,會有很多雜活需要處理。

這個方案的核心在於第二階段的重試和冪等執行。失敗后重試,這是一種補償機制,它是能保證系統最終一致的關鍵流程。

可靠消息的最終一致性代碼示例

表結構

DROP TABLE IF EXISTS `rp_transaction_message`;

CREATE TABLE `rp_transaction_message` (
    `id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '主鍵ID',
    `version` INT (11) NOT NULL DEFAULT '0' COMMENT '版本號',
    `editor` VARCHAR (100) DEFAULT NULL COMMENT '修改者',
    `creater` VARCHAR (100) DEFAULT NULL COMMENT '創建者',
    `edit_time` datetime DEFAULT NULL COMMENT '最后修改時間',
    `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '創建時間',
    `message_id` VARCHAR (50) NOT NULL DEFAULT '' COMMENT '消息ID',
    `message_body` LONGTEXT NOT NULL COMMENT '消息內容',
    `message_data_type` VARCHAR (50) DEFAULT NULL COMMENT '消息數據類型',
    `consumer_queue` VARCHAR (100) NOT NULL DEFAULT '' COMMENT '消費隊列',
    `message_send_times` SMALLINT (6) NOT NULL DEFAULT '0' COMMENT '消息重發次數',
    `areadly_dead` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '是否死亡',
    `status` VARCHAR (20) NOT NULL DEFAULT '' COMMENT '狀態',
    `remark` VARCHAR (200) DEFAULT NULL COMMENT '備注',
    `field1` VARCHAR (200) DEFAULT NULL COMMENT '擴展字段1',
    `field2` VARCHAR (200) DEFAULT NULL COMMENT '擴展字段2',
    `field3` VARCHAR (200) DEFAULT NULL COMMENT '擴展字段3',
    PRIMARY KEY (`id`),
    KEY `AK_Key_2` (`message_id`)
) ENGINE = INNODB DEFAULT CHARSET = utf8;

public interface RpTransactionMessageService {
    
    /**
     * 預存儲消息.
     */
    public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 確認並發送消息.
     */
    public void confirmAndSendMessage(String messageId) throws MessageBizException;

    /**
     * 存儲並發送消息.
     */
    public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 直接發送消息.
     */
    public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 重發消息.
     */
    public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;

    /**
     * 根據messageId重發某條消息.
     */
    public void reSendMessageByMessageId(String messageId) throws MessageBizException;

    /**
     * 將消息標記為死亡消息.
     */
    public void setMessageToAreadlyDead(String messageId) throws MessageBizException;

    /**
     * 根據消息ID獲取消息
     */
    public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;

    /**
     * 根據消息ID刪除消息
     */
    public void deleteMessageByMessageId(String messageId) throws MessageBizException;

    /**
     * 重發某個消息隊列中的全部已死亡的消息.
     */
    public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;

    /**
     * 獲取分頁數據
     */
    PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;
    
}
@Service("rpTransactionMessageService")
public class RpTransactionMessageServiceImpl implements RpTransactionMessageService {
    
    private static final Log log = LogFactory.getLog(RpTransactionMessageServiceImpl.class);
    
    @Autowired
    private RpTransactionMessageDao rpTransactionMessageDao;
    
    @Autowired
    private JmsTemplate notifyJmsTemplate;

    public int saveMessageWaitingConfirm(RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");
        }
        message.setEditTime(new Date());
        message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
        message.setAreadlyDead(PublicEnum.NO.name());
        message.setMessageSendTimes(0);
        return rpTransactionMessageDao.insert(message);
    }

    public void confirmAndSendMessage(String messageId) {
        final RpTransactionMessage message = getMessageByMessageId(messageId);
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息為空");
        }
        message.setStatus(MessageStatusEnum.SENDING.name());
        message.setEditTime(new Date());
        rpTransactionMessageDao.update(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public int saveAndSendMessage(final RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");
        }
        message.setStatus(MessageStatusEnum.SENDING.name());
        message.setAreadlyDead(PublicEnum.NO.name());
        message.setMessageSendTimes(0);
        message.setEditTime(new Date());
        int result = rpTransactionMessageDao.insert(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
        return result;
    }

    public void directSendMessage(final RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");
        }
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public void reSendMessage(final RpTransactionMessage message) {
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息為空");
        }
        if (StringUtil.isEmpty(message.getConsumerQueue())) {
            throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消費隊列不能為空 ");
        }
        message.addSendTimes();
        message.setEditTime(new Date());
        rpTransactionMessageDao.update(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public void reSendMessageByMessageId(String messageId) {
        final RpTransactionMessage message = getMessageByMessageId(messageId);
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息為空");
        }
        int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
        if (message.getMessageSendTimes() >= maxTimes) {
            message.setAreadlyDead(PublicEnum.YES.name());
        }
        message.setEditTime(new Date());
        message.setMessageSendTimes(message.getMessageSendTimes() + 1);
        rpTransactionMessageDao.update(message);
        notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
        notifyJmsTemplate.send(new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message.getMessageBody());
            }
        });
    }

    public void setMessageToAreadlyDead(String messageId) {
        RpTransactionMessage message = getMessageByMessageId(messageId);
        if (message == null) {
            throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根據消息id查找的消息為空");
        }
        message.setAreadlyDead(PublicEnum.YES.name());
        message.setEditTime(new Date());
        rpTransactionMessageDao.update(message);
    }

    public RpTransactionMessage getMessageByMessageId(String messageId) {
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("messageId", messageId);
        return rpTransactionMessageDao.getBy(paramMap);
    }

    public void deleteMessageByMessageId(String messageId) {
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("messageId", messageId);
        rpTransactionMessageDao.delete(paramMap);
    }

    @SuppressWarnings("unchecked")
    public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) {
        log.info("==>reSendAllDeadMessageByQueueName");
        int numPerPage = 1000;
        if (batchSize > 0 && batchSize < 100) {
            numPerPage = 100;
        } else if (batchSize > 100 && batchSize < 5000) {
            numPerPage = batchSize;
        } else if (batchSize > 5000) {
            numPerPage = 5000;
        } else {
            numPerPage = 1000;
        }
        int pageNum = 1;
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("consumerQueue", queueName);
        paramMap.put("areadlyDead", PublicEnum.YES.name());
        paramMap.put("listPageSortType", "ASC");
        Map<String, RpTransactionMessage> messageMap = new HashMap<String, RpTransactionMessage>();
        List<Object> recordList = new ArrayList<Object>();
        int pageCount = 1;
        PageBean pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);
        recordList = pageBean.getRecordList();
        if (recordList == null || recordList.isEmpty()) {
            log.info("==>recordList is empty");
            return;
        }
        pageCount = pageBean.getTotalPage();
        for (final Object obj : recordList) {
            final RpTransactionMessage message = (RpTransactionMessage) obj;
            messageMap.put(message.getMessageId(), message);
        }
        for (pageNum = 2; pageNum <= pageCount; pageNum++) {
            pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);
            recordList = pageBean.getRecordList();
            if (recordList == null || recordList.isEmpty()) {
                break;
            }
            for (final Object obj : recordList) {
                final RpTransactionMessage message = (RpTransactionMessage) obj;
                messageMap.put(message.getMessageId(), message);
            }
        }
        recordList = null;
        pageBean = null;
        for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
            final RpTransactionMessage message = entry.getValue();
            message.setEditTime(new Date());
            message.setMessageSendTimes(message.getMessageSendTimes() + 1);
            rpTransactionMessageDao.update(message);
            notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
            notifyJmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(message.getMessageBody());
                }
            });
        }
    }

    @SuppressWarnings("unchecked")
    public PageBean<RpTransactionMessage> listPage(PageParam pageParam, Map<String, Object> paramMap) {
        return rpTransactionMessageDao.listPage(pageParam, paramMap);
    }
    
}
@Component("messageBiz")
public class MessageBiz {

    private static final Log log = LogFactory.getLog(MessageBiz.class);

    @Autowired
    private RpTradePaymentQueryService rpTradePaymentQueryService;

    @Autowired
    private RpTransactionMessageService rpTransactionMessageService;

    /**
     * 處理[waiting_confirm]狀態的消息
     * @param messages
     */
    public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) {
        log.debug("開始處理[waiting_confirm]狀態的消息,總條數[" + messageMap.size() + "]");
        // 單條消息處理(目前該狀態的消息,消費隊列全部是accounting,如果后期有業務擴充,需做隊列判斷,做對應的業務處理。)
        for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
            RpTransactionMessage message = entry.getValue();
            try {
                log.debug("開始處理[waiting_confirm]消息ID為[" + message.getMessageId() + "]的消息");
                String bankOrderNo = message.getField1();
                RpTradePaymentRecord record = rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo);
                // 如果訂單成功,把消息改為待處理,並發送消息
                if (TradeStatusEnum.SUCCESS.name().equals(record.getStatus())) {
                    // 確認並發送消息
                    rpTransactionMessageService.confirmAndSendMessage(message.getMessageId());
                } else if (TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus())) {
                    // 訂單狀態是等到支付,可以直接刪除數據
                    log.debug("訂單沒有支付成功,刪除[waiting_confirm]消息id[" + message.getMessageId() + "]的消息");
                    rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId());
                }
                log.debug("結束處理[waiting_confirm]消息ID為[" + message.getMessageId() + "]的消息");
            } catch (Exception e) {
                log.error("處理[waiting_confirm]消息ID為[" + message.getMessageId() + "]的消息異常:", e);
            }
        }
    }

    /**
     * 處理[SENDING]狀態的消息
     * @param messages
     */
    public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) {
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        log.debug("開始處理[SENDING]狀態的消息,總條數[" + messageMap.size() + "]");
        // 根據配置獲取通知間隔時間
        Map<Integer, Integer> notifyParam = getSendTime();
        // 單條消息處理
        for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
            RpTransactionMessage message = entry.getValue();
            try {
                log.debug("開始處理[SENDING]消息ID為[" + message.getMessageId() + "]的消息");
                // 判斷發送次數
                int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
                log.debug("[SENDING]消息ID為[" + message.getMessageId() + "]的消息,已經重新發送的次數[" 
                        + message.getMessageSendTimes() + "]");
                // 如果超過最大發送次數直接退出
                if (maxTimes < message.getMessageSendTimes()) {
                    // 標記為死亡
                    rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());
                    continue;
                }
                // 判斷是否達到發送消息的時間間隔條件
                int reSendTimes = message.getMessageSendTimes();
                int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes);
                long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
                long needTime = currentTimeInMillis - times * 60 * 1000;
                long hasTime = message.getEditTime().getTime();
                // 判斷是否達到了可以再次發送的時間條件
                if (hasTime > needTime) {
                    log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]消息上次發送時間[" 
                            + sdf.format(message.getEditTime()) + "],必須過了[" + times + "]分鍾才可以再發送。");
                    continue;
                }
                // 重新發送消息
                rpTransactionMessageService.reSendMessage(message);
                log.debug("結束處理[SENDING]消息ID為[" + message.getMessageId() + "]的消息");
            } catch (Exception e) {
                log.error("處理[SENDING]消息ID為[" + message.getMessageId() + "]的消息異常:", e);
            }
        }
    }

    /**
     * 根據配置獲取通知間隔時間
     * @return
     */
    private Map<Integer, Integer> getSendTime() {
        Map<Integer, Integer> notifyParam = new HashMap<Integer, Integer>();
        notifyParam.put(1, Integer.valueOf(PublicConfigUtil.readConfig("message.send.1.time")));
        notifyParam.put(2, Integer.valueOf(PublicConfigUtil.readConfig("message.send.2.time")));
        notifyParam.put(3, Integer.valueOf(PublicConfigUtil.readConfig("message.send.3.time")));
        notifyParam.put(4, Integer.valueOf(PublicConfigUtil.readConfig("message.send.4.time")));
        notifyParam.put(5, Integer.valueOf(PublicConfigUtil.readConfig("message.send.5.time")));
        return notifyParam;
    }

}
public class AccountingMessageListener implements SessionAwareMessageListener<Message> {

    private static final Log LOG = LogFactory.getLog(AccountingMessageListener.class);

    /**
     * 會計隊列模板(由Spring創建並注入進來)
     */
    @Autowired
    private JmsTemplate notifyJmsTemplate;

    @Autowired
    private RpAccountingVoucherService rpAccountingVoucherService;

    @Autowired
    private RpTransactionMessageService rpTransactionMessageService;

    public synchronized void onMessage(Message message, Session session) {
        RpAccountingVoucher param = null;
        String strMessage = null;
        try {
            ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message;
            strMessage = objectMessage.getText();
            LOG.info("strMessage1 accounting:" + strMessage);
            param = JSONObject.parseObject(strMessage, RpAccountingVoucher.class);
            // 這里轉換成相應的對象還有問題
            if (param == null) {
                LOG.info("param參數為空");
                return;
            }
            int entryType = param.getEntryType();
            double payerChangeAmount = param.getPayerChangeAmount();
            String voucherNo = param.getVoucherNo();
            String payerAccountNo = param.getPayerAccountNo();
            int fromSystem = param.getFromSystem();
            int payerAccountType = 0;
            if (param.getPayerAccountType() != null && !param.getPayerAccountType().equals("")) {
                payerAccountType = param.getPayerAccountType();
            }
            double payerFee = param.getPayerFee();
            String requestNo = param.getRequestNo();
            double bankChangeAmount = param.getBankChangeAmount();
            double receiverChangeAmount = param.getReceiverChangeAmount();
            String receiverAccountNo = param.getReceiverAccountNo();
            String bankAccount = param.getBankAccount();
            String bankChannelCode = param.getBankChannelCode();
            double profit = param.getProfit();
            double income = param.getIncome();
            double cost = param.getCost();
            String bankOrderNo = param.getBankOrderNo();
            int receiverAccountType = 0;
            double payAmount = param.getPayAmount();
            if (param.getReceiverAccountType() != null && !param.getReceiverAccountType().equals("")) {
                receiverAccountType = param.getReceiverAccountType();
            }
            double receiverFee = param.getReceiverFee();
            String remark = param.getRemark();
            rpAccountingVoucherService.createAccountingVoucher(entryType, voucherNo, payerAccountNo, receiverAccountNo, 
                    payerChangeAmount, receiverChangeAmount, income, cost, profit, bankChangeAmount, requestNo, 
                    bankChannelCode, bankAccount, fromSystem, remark, bankOrderNo, payerAccountType, payAmount, 
                    receiverAccountType, payerFee, receiverFee);
            //刪除消息
            rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId());
        } catch (BizException e) {
            // 業務異常,不再寫會隊列
            LOG.error("==>BizException", e);
        } catch (Exception e) {
            // 不明異常不再寫會隊列
            LOG.error("==>Exception", e);
        }
    }

    public JmsTemplate getNotifyJmsTemplate() {
        return notifyJmsTemplate;
    }

    public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) {
        this.notifyJmsTemplate = notifyJmsTemplate;
    }

    public RpAccountingVoucherService getRpAccountingVoucherService() {
        return rpAccountingVoucherService;
    }

    public void setRpAccountingVoucherService(RpAccountingVoucherService rpAccountingVoucherService) {
        this.rpAccountingVoucherService = rpAccountingVoucherService;
    }

}

  

與常規MQ的ACK機制對比

常規MQ確認機制:

  • Producer生成消息並發送給MQ(同步、異步);
  • MQ接收消息並將消息數據持久化到消息存儲(持久化操作為可選配置);
  • MQ向Producer返回消息的接收結果(返回值、異常);
  • Consumer監聽並消費MQ中的消息;
  • Consumer獲取到消息后執行業務處理;
  • Consumer對已成功消費的消息向MQ進行ACK確認(確認后的消息將從MQ中刪除);

常規MQ隊列消息的處理流程無法實現消息發送一致性,因此直接使用現成的MQ中間件產品無法實現可靠消息最終一致性的分布式事務解決方案

消息發送一致性:是指產生消息的業務動作與消息發送的一致。也就是說,如果業務操作成功,那么由這個業務操作所產生的消息一定要成功投遞出去(一般是發送到kafka、rocketmq、rabbitmq等消息中間件中),否則就丟消息。

下面用偽代碼進行演示消息發送和投遞的不可靠性:

先進行數據庫操作,再發送消息:

public void test1(){ //1 數據庫操作 //2 發送MQ消息 } 

這種情況下無法保證數據庫操作與發送消息的一致性,因為可能數據庫操作成功,發送消息失敗。

先發送消息,再操作數據庫:

public void test1(){ //1 發送MQ消息 //2 數據庫操作 } 

這種情況下無法保證數據庫操作與發送消息的一致性,因為可能發送消息成功,數據庫操作失敗。

在數據庫事務中,先發送消息,后操作數據庫:

@Transactional public void test1(){ //1 發送MQ消息 //2 數據庫操作 } 

這里使用spring 的@Transactional注解,方法里面的操作都在一個事務中。同樣無法保證一致性,因為發送消息成功了,數據庫操作失敗的情況下,數據庫操作是回滾了,但是MQ消息沒法進行回滾。

在數據庫事務中,先操作數據庫,后發送消息:

@Transactional public void test1(){ //1 數據庫操作 //2 發送MQ消息 } 

這種情況下,貌似沒有問題,如果發送MQ消息失敗,拋出異常,事務一定會回滾(加上了@Transactional注解后,spring方法拋出異常后,會自動進行回滾)。

這只是一個假象,因為發送MQ消息可能事實上已經成功,如果是響應超時導致的異常。這個時候,數據庫操作依然回滾,但是MQ消息實際上已經發送成功,導致不一致。

與消息發送一致性流程的對比:

  • 常規MQ隊列消息的處理流程無法實現消息發送一致性;
  • 投遞消息的流程其實就是消息的消費流程,可細化;

TCC (Try-Confirm-Cancel)補償模式(最終一致性)

TCC 其實就是采用的補償機制,其核心思想是:針對每個操作,都要注冊一個與其對應的確認和補償(撤銷)操作。

它分為三個階段:

  • Try 階段主要是對業務系統做檢測及資源預留
  • Confirm 階段主要是對業務系統做確認提交,Try階段執行成功並開始執行 Confirm階段時,默認 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
  • Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。
 

舉例(Bob 要向 Smith 轉賬):

  • 首先在 Try 階段,要先調用遠程接口把 Smith 和 Bob 的錢給凍結起來。
  • 在 Confirm 階段,執行遠程調用的轉賬的操作,轉賬成功進行解凍。
  • 如果第2步執行成功,那么轉賬成功,如果第二步執行失敗,則調用遠程凍結接口對應的解凍方法 (Cancel)。

優點:
跟2PC比起來,實現以及流程相對簡單了一些,但數據的一致性比2PC也要差一些

缺點:
缺點還是比較明顯的,在2,3步中都有可能失敗。TCC屬於應用層的一種補償方式,所以需要程序員在實現的時候多寫很多補償的代碼,在一些場景中,一些業務流程可能用TCC不太好定義及處理。

可靠消息最終一致(常用)

不要用本地的消息表了,直接基於MQ來實現事務。比如阿里的RocketMQ就支持消息事務。

 
 
可靠消息最終一致性方案

大概流程:

  • A系統先發送一個prepared消息到mq,如果這個prepared消息發送失敗那么就直接取消操作別執行了
  • 如果這個消息發送成功過了,那么接着執行本地事務,如果成功就告訴mq發送確認消息,如果失敗就告訴mq回滾消息
  • 如果發送了確認消息,那么此時B系統會接收到確認消息,然后執行本地的事務
  • mq會自動定時輪詢所有prepared消息回調你的接口,問你,這個消息是不是本地事務處理失敗了,所有沒發送確認消息?那是繼續重試還是回滾?一般來說這里你就可以查下數據庫看之前本地事務是否執行,如果回滾了,那么這里也回滾吧。這個就是避免可能本地事務執行成功了,別確認消息發送失敗了。

這個方案里,要是系統B的事務失敗了咋辦?重試咯,自動不斷重試直到成功,如果實在是不行,要么就是針對重要的資金類業務進行回滾,比如B系統本地回滾后,想辦法通知系統A也回滾;或者是發送報警由人工來手工回滾和補償

目前國內互聯網公司大都是這么玩兒的,要不你使用RocketMQ支持的,要不你就基於其他MQ中間件自己封裝一套類似的邏輯,總之思路就是這樣的。

最大努力通知

業務發起方將協調服務的消息發送到MQ,下游服務接收此消息,如果處理失敗,將進行重試,重試N次后依然失敗,將不進行重試,放棄處理,這個應用場景要求對事物性要求不高的地方。

 
最大努力通知方案

 

最終總結:

      需要討論與學習,請加QQ群:793305035

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM