分布式事務解決方案之最大努力通知


最大努力通知也是一種解決分布式事務的方案,下邊是一個是充值的例子:

 

交互流程:
1、賬戶系統調用充值系統接口
2、充值系統完成支付處理向賬戶系統發起充值結果通知

若通知失敗,則充值系統按策略進行重復通知
3、賬戶系統接收到充值結果通知修改充值狀態。
4、賬戶系統未接收到通知會主動調用充值系統的接口查詢充值結果。
通過上邊的例子我們總結最大努力通知方案的目標:
目標:發起通知方通過一定的機制最大努力將業務處理結果通知到接收方。
具體包括:
1、有一定的消息重復通知機制。
因為接收通知方可能沒有接收到通知,此時要有一定的機制對消息重復通知。
2、消息校對機制。
如果盡最大努力也沒有通知到接收方,或者接收方消費消息后要再次消費,此時可由接收方主動向通知方查詢消息
信息來滿足需求。
最大努力通知與可靠消息一致性有什么不同?
1、解決方案思想不同
可靠消息一致性,發起通知方需要保證將消息發出去,並且將消息發到接收通知方,消息的可靠性關鍵由發起通知
方來保證。
最大努力通知,發起通知方盡最大的努力將業務處理結果通知為接收通知方,但是可能消息接收不到,此時需要接
收通知方主動調用發起通知方的接口查詢業務處理結果,通知的可靠性關鍵在接收通知方。
2、兩者的業務應用場景不同
可靠消息一致性關注的是交易過程的事務一致,以異步的方式完成交易。
最大努力通知關注的是交易后的通知事務,即將交易結果可靠的通知出去。
3、技術解決方向不同
可靠消息一致性要解決消息從發出到接收的一致性,即消息發出並且被接收到。
最大努力通知無法保證消息從發出到接收的一致性,只提供消息接收的可靠性機制。可靠機制是,最大努力的將消
息通知給接收方,當消息無法被接收方接收時,由接收方主動查詢消息(業務處理結果)。

解決方案
通過對最大努力通知的理解,采用MQ的ack機制就可以實現最大努力通知。
方案1:

 

 

 

本方案是利用MQ的ack機制由MQ向接收通知方發送通知,流程如下:
1、發起通知方將通知發給MQ。
使用普通消息機制將通知發給MQ。
注意:如果消息沒有發出去可由接收通知方主動請求發起通知方查詢業務執行結果。(后邊會講)
2、接收通知方監聽 MQ。
3、接收通知方接收消息,業務處理完成回應ack。
4、接收通知方若沒有回應ack則MQ會重復通知。
MQ會按照間隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知間隔 (如果MQ采用
rocketMq,在broker中可進行配置),直到達到通知要求的時間窗口上限。
5、接收通知方可通過消息校對接口來校對消息的一致性。
方案2:
本方案也是利用MQ的ack機制,與方案1不同的是應用程序向接收通知方發送通知,如下圖:

交互流程如下:
1、發起通知方將通知發給MQ。
使用可靠消息一致方案中的事務消息保證本地事務與消息的原子性,最終將通知先發給MQ。
2、通知程序監聽 MQ,接收MQ的消息。
方案1中接收通知方直接監聽MQ,方案2中由通知程序監聽MQ。

通知程序若沒有回應ack則MQ會重復通知。
3、通知程序通過互聯網接口協議(如http、webservice)調用接收通知方案接口,完成通知。
通知程序調用接收通知方案接口成功就表示通知成功,即消費MQ消息成功,MQ將不再向通知程序投遞通知消
息。
4、接收通知方可通過消息校對接口來校對消息的一致性。
方案1和方案2的不同點:
1、方案1中接收通知方與MQ接口,即接收通知方案監聽 MQ,此方案主要應用與內部應用之間的通知。
2、方案2中由通知程序與MQ接口,通知程序監聽MQ,收到MQ的消息后由通知程序通過互聯網接口協議調用接收
通知方。此方案主要應用於外部應用之間的通知,例如支付寶、微信的支付結果通知。

RocketMQ實現最大努力通知型事務
業務說明
本實例通過RocketMq中間件實現最大努力通知型分布式事務,模擬充值過程。
本案例有賬戶系統和充值系統兩個微服務,其中賬戶系統的數據庫是bank1數據庫,其中有張三賬戶。充值系統的
數據庫使用bank1_pay數據庫,記錄了賬戶的充值記錄。
業務流程如下圖:

 

交互流程如下:
1、用戶請求充值系統進行充值。
2、充值系統完成充值將充值結果發給MQ。
3、賬戶系統監聽MQ,接收充值結果通知,如果接收不到消息,MQ會重復發送通知。接收到充值結果通知賬戶系
統增加充值金額。
4、賬戶系統也可以主動查詢充值系統的充值結果查詢接口,增加金額。

pay實現如下功能:
1、充值接口
2、充值完成要通知
3、充值結果查詢接口
2)Dao

@Mapper
@Component
public interface AccountPayDao {
    @Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},#{accountNo},#{payAmount},#{result})")
    int insertAccountPay(@Param("id") String id, @Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount, @Param("result") String result);

    @Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}")
    AccountPay findByIdTxNo(@Param("txNo") String txNo);



}

3)Service

@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService {

    @Autowired
    AccountPayDao accountPayDao;

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    //插入充值記錄
    @Override
    public AccountPay insertAccountPay(AccountPay accountPay) {
        int success = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
        if(success>0){
            //發送通知,使用普通消息發送通知
            accountPay.setResult("success");
            rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
            return accountPay;
        }
        return null;
    }

    //查詢充值記錄,接收通知方調用此方法來查詢充值結果
    @Override
    public AccountPay getAccountPay(String txNo) {
        AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
        return accountPay;
    }
}

4)Controller

@RestController
public class AccountPayController {

    @Autowired
    AccountPayService accountPayService;

    //充值
    @GetMapping(value = "/paydo")
    public AccountPay pay(AccountPay accountPay){
        //生成事務編號
        String txNo = UUID.randomUUID().toString();
        accountPay.setId(txNo);
        return accountPayService.insertAccountPay(accountPay);
    }

    //查詢充值結果
    @GetMapping(value = "/payresult/{txNo}")
    public AccountPay payresult(@PathVariable("txNo") String txNo){
        return accountPayService.getAccountPay(txNo);
    }
}

bank1實現如下功能:
1、監聽MQ,接收充值結果,根據充值結果完成賬戶金額修改。
2、主動查詢充值系統,根據充值結果完成賬戶金額修改。
1)Dao

@Mapper
@Component
public interface AccountInfoDao {
    //修改賬戶金額
    @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);


   //查詢冪等記錄,用於冪等控制
    @Select("select count(1) from de_duplication where tx_no = #{txNo}")
    int isExistTx(String txNo);

    //添加事務記錄,用於冪等控制
    @Insert("insert into de_duplication values(#{txNo},now());")
    int addTx(String txNo);

}

2)AccountInfoService

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

    @Autowired
    AccountInfoDao accountInfoDao;

    @Autowired
    PayClient payClient;

    //更新賬戶金額
    @Override
    @Transactional
    public void updateAccountBalance(AccountChangeEvent accountChange) {
        //冪等校驗
        if(accountInfoDao.isExistTx(accountChange.getTxNo())>0){
            return ;
        }
        int i = accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount());
        //插入事務記錄,用於冪等控制
        accountInfoDao.addTx(accountChange.getTxNo());
    }

    //遠程調用查詢充值結果
    @Override
    public AccountPay queryPayResult(String tx_no) {

        //遠程調用
        AccountPay payresult = payClient.payresult(tx_no);
        if("success".equals(payresult.getResult())){
            //更新賬戶金額
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(payresult.getAccountNo());//賬號
            accountChangeEvent.setAmount(payresult.getPayAmount());//金額
            accountChangeEvent.setTxNo(payresult.getId());//充值事務號
            updateAccountBalance(accountChangeEvent);
        }
        return payresult;
    }
}
@FeignClient(value = "dtx-notifymsg-demo-pay",fallback = PayFallback.class)
public interface PayClient {

    //遠程調用充值系統的接口查詢充值結果
    @GetMapping(value = "/pay/payresult/{txNo}")
    public AccountPay payresult(@PathVariable("txNo") String txNo);
}

3)監聽MQ

@Component
@Slf4j
@RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1")
public class NotifyMsgListener implements RocketMQListener<AccountPay> {

    @Autowired
    AccountInfoService accountInfoService;

    //接收消息
    @Override
    public void onMessage(AccountPay accountPay) {
        log.info("接收到消息:{}", JSON.toJSONString(accountPay));
        if("success".equals(accountPay.getResult())){
            //更新賬戶金額
            AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
            accountChangeEvent.setAccountNo(accountPay.getAccountNo());
            accountChangeEvent.setAmount(accountPay.getPayAmount());
            accountChangeEvent.setTxNo(accountPay.getId());
            accountInfoService.updateAccountBalance(accountChangeEvent);
        }
        log.info("處理消息完成:{}", JSON.toJSONString(accountPay));
    }
}

 

 4)Controller

@RestController
@Slf4j
public class AccountInfoController {

    @Autowired
    private AccountInfoService accountInfoService;

    //主動查詢充值結果
    @GetMapping(value = "/payresult/{txNo}")
    public AccountPay result(@PathVariable("txNo") String txNo){
        AccountPay accountPay = accountInfoService.queryPayResult(txNo);
        return accountPay;
    }
}

測試場景

2020-03-12 18:54:21.442 DEBUG 8108 --- [io-9901-exec-10] c.t.m.d.AccountPayDao.insertAccountPay   : ==>  Preparing: insert into account_pay(id,account_no,pay_amount,result) values(?,?,?,?) 
2020-03-12 18:54:21.442 DEBUG 8108 --- [io-9901-exec-10] c.t.m.d.AccountPayDao.insertAccountPay   : ==> Parameters: 4db8776f-fc55-4190-b014-7caa21b77ec0(String), 1(String), 2000.0(Double), success(String)
2020-03-12 18:54:21.455 DEBUG 8108 --- [io-9901-exec-10] c.t.m.d.AccountPayDao.insertAccountPay   : <==    Updates: 1
2020-03-12 18:54:21.464  INFO 20964 --- [MessageThread_3] c.topcheer.eq.message.NotifyMsgListener  : 接收到消息:{"accountNo":"1","id":"4db8776f-fc55-4190-b014-7caa21b77ec0","payAmount":2000.0,"result":"success"}
2020-03-12 18:54:21.466 DEBUG 20964 --- [MessageThread_3] c.t.eq.dao.AccountInfoDao.isExistTx      : ==>  Preparing: select count(1) from de_duplication where tx_no = ? 
2020-03-12 18:54:21.466 DEBUG 20964 --- [MessageThread_3] c.t.eq.dao.AccountInfoDao.isExistTx      : ==> Parameters: 4db8776f-fc55-4190-b014-7caa21b77ec0(String)
2020-03-12 18:54:21.467 DEBUG 20964 --- [MessageThread_3] c.t.eq.dao.AccountInfoDao.isExistTx      : <==      Total: 1
2020-03-12 18:54:21.468 DEBUG 20964 --- [MessageThread_3] c.t.e.d.A.updateAccountBalance           : ==>  Preparing: update account_info set account_balance=account_balance+? where account_no=? 
2020-03-12 18:54:21.468 DEBUG 20964 --- [MessageThread_3] c.t.e.d.A.updateAccountBalance           : ==> Parameters: 2000.0(Double), 1(String)
2020-03-12 18:54:21.470 DEBUG 20964 --- [MessageThread_3] c.t.e.d.A.updateAccountBalance           : <==    Updates: 1
2020-03-12 18:54:21.470 DEBUG 20964 --- [MessageThread_3] c.topcheer.eq.dao.AccountInfoDao.addTx   : ==>  Preparing: insert into de_duplication values(?,now()); 
2020-03-12 18:54:21.470 DEBUG 20964 --- [MessageThread_3] c.topcheer.eq.dao.AccountInfoDao.addTx   : ==> Parameters: 4db8776f-fc55-4190-b014-7caa21b77ec0(String)
2020-03-12 18:54:21.473 DEBUG 20964 --- [MessageThread_3] c.topcheer.eq.dao.AccountInfoDao.addTx   : <==    Updates: 1
2020-03-12 18:54:21.486  INFO 20964 --- [MessageThread_3] c.topcheer.eq.message.NotifyMsgListener  : 處理消息完成:{"accountNo":"1","id":"4db8776f-fc55-4190-b014-7caa21b77ec0","payAmount":2000.0,"result":"success"}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM