交易訂單請求異步轉同步的一種解決辦法


最開始,先描述一下交易場景:

  1. 商戶發起請求到交易系統,等待交易系統的返回;
  2. 交易系統接收請求並按渠道要求組裝報文發送到銀行渠道;
  3. 渠道接收信息,通過mq的方式異步返回通知結果到交易系統;
  4. 交易系統通過監聽mq隊列,接收到渠道返回的交易結果后,把mq的異步消息轉化為同步的消息返回給商戶;
  5. 商戶獲取到交易結果,完成交易;

一般情況下,我們是用 HttpClient 的方式,將商戶上送的交易信息組裝報文后,通過 post 方式發送到渠道,然后 HttpClient 返回的流信息就是對應的交易結果。這個就是一個同步的交易請求。
現在要解決的問題是,把請求發送到渠道后,渠道不是立即返回信息,而是通過 mq 的方式異步返回結果,而在這個請求過程中,商戶還在頁面上等待返回交易結果,這就需要交易系統做一個異步轉同步的處理,把結果同步的返回給頁面等待的商戶。

解決思路:

  1. 通過使用 Object 的 wait 方式,讓交易系統的請求線程等待指定的時間
  2. 在 mq 隊列監聽接收到渠道返回的交易結果后,根據交易信息中的 sequenceNumber 字段,關聯找到正在 wait 阻塞狀態的線程,並調用 notifyAll 方法喚醒線程
  3. 交易請求線程被喚醒后,就按照同步的邏輯組裝報文返回給商戶端

實現過程

  1. 建立一個類,來封裝交易對象信息,后面對這個交易對象調用 wait 方法
public class TransactionInfo {

    // 流水號, 在異步結果中關聯找到原請求的信息
    private String sequenceNumber;
    // 原交易對象
    private Object originalTrxObj;
    // 異步交易結果
    private Object trxResultObj;

    public TransactionInfo(String sequenceNumber, Object originalTrxObj) {
        this.sequenceNumber = sequenceNumber;
        this.originalTrxObj = originalTrxObj;
    }

  1. 建立一個同步管理類,來管理交易請求封裝的信息對象,這里使用的是一個 Map 進行封裝
public class SyncFactory {

    /**
     * Key : 交易的請求流水號
     * Value :交易的請求信息的封裝
     */
    private Map<String, TransactionInfo> map = new ConcurrentHashMap<String, TransactionInfo>();

    public synchronized TransactionInfo put(String seqNum, Object req){
        TransactionInfo info = new TransactionInfo(seqNum, req);
        map.put(seqNum, info);
        return info;
    }

}
  1. 交易系統接收到商戶發起的交易請求時,阻塞線程,調用 SyncFactory 緩存交易信息,當然要先獲取流水號,要填充那個 Map 對象的嘛
      // 這里自定義實現獲取交易流水號的邏輯
      String seqNum = getSeqNum();
      
      // 調用 SyncFactory 的方法,緩存交易信息
      TransactionInfo info = syncFactory.put(seqNum, trxReqObj);

      // 調用 wait 方法,自定義自己合適的 timeout, 不可能等交易結果等上幾小時的嘛
      synchronized (info) {
            info.wait(syncTimeOut * 1000);							
      }
  1. 在 mq 監聽消費者的邏輯中, 獲取異步結果,根據流水號 seqNum 來調用 notifyAll 喚醒阻塞的線程
public class TransactionResultHttpHandler implements HttpHandler {
      
      @Override
      public void handle(HttpExchange he) throws IOException {
            // 自定義從異步結果中獲取交易流水號的邏輯
            String seqNum = getNumFromResult();
            
            // 根據上面獲取到的流水號,取到緩存的交易信息對象
            TransactionInfo info = syncFactory.put(seqNum, trxReqObj);

            // 將異步結果的交易結果信息
            synchronized (info) {
                  info .setTrxResultObj(res);
                  info .notifyAll();
            }
      }
}

整個異步轉同步的過程大概就是這個樣子。
然后補充兩點這個設計的問題。

  1. 在 TransactionInfo 類中,可以增加一個表示這個信息緩存的時間信息。用來判斷當結果返回或者是等待結果超時之后,從 SyncFactory 中把對應的信息給移除掉。
            // 未返回結果, 判斷是否超過自定義的 syncTimeOut, 從 Map 中移除
            public boolean expired(int timeout){
		return (System.currentTimeMillis()- info.getLiveTime() > syncTimeOut);
	    }
            // 有回結果后, 判斷如果在自定義的 syncTimeOut 時間內就返回的話,把 seqNum 對應的交易信息從 Map 中移除
            if(System.currentTimeMillis()-info.getLiveTime()< syncTimeOut){
                  syncFactory.remove(seqNum)
            }
  1. 第二個問題就是這個設計有個很明顯的缺陷,在調用 wait 方法的時候,線程阻塞等待請求結果的返回,如果在同一時間出現並發數較大的情況,線程池一下盛滿,直接把應用搞掛了。當然這個問題就是這整個設計實現方式的問題了。暫時沒有找到其他的異步轉同步的解決辦法,所以第二個問題沒有解決。

大概就是這些。有其他辦法的,可以留個鏈接哇,我也再找其他解決思路~~~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM