最開始,先描述一下交易場景:
- 商戶發起請求到交易系統,等待交易系統的返回;
- 交易系統接收請求並按渠道要求組裝報文發送到銀行渠道;
- 渠道接收信息,通過mq的方式異步返回通知結果到交易系統;
- 交易系統通過監聽mq隊列,接收到渠道返回的交易結果后,把mq的異步消息轉化為同步的消息返回給商戶;
- 商戶獲取到交易結果,完成交易;
一般情況下,我們是用 HttpClient 的方式,將商戶上送的交易信息組裝報文后,通過 post 方式發送到渠道,然后 HttpClient 返回的流信息就是對應的交易結果。這個就是一個同步的交易請求。
現在要解決的問題是,把請求發送到渠道后,渠道不是立即返回信息,而是通過 mq 的方式異步返回結果,而在這個請求過程中,商戶還在頁面上等待返回交易結果,這就需要交易系統做一個異步轉同步的處理,把結果同步的返回給頁面等待的商戶。
解決思路:
- 通過使用 Object 的 wait 方式,讓交易系統的請求線程等待指定的時間
- 在 mq 隊列監聽接收到渠道返回的交易結果后,根據交易信息中的 sequenceNumber 字段,關聯找到正在 wait 阻塞狀態的線程,並調用 notifyAll 方法喚醒線程
- 交易請求線程被喚醒后,就按照同步的邏輯組裝報文返回給商戶端
實現過程
- 建立一個類,來封裝交易對象信息,后面對這個交易對象調用 wait 方法
public class TransactionInfo {
// 流水號, 在異步結果中關聯找到原請求的信息
private String sequenceNumber;
// 原交易對象
private Object originalTrxObj;
// 異步交易結果
private Object trxResultObj;
public TransactionInfo(String sequenceNumber, Object originalTrxObj) {
this.sequenceNumber = sequenceNumber;
this.originalTrxObj = originalTrxObj;
}
- 建立一個同步管理類,來管理交易請求封裝的信息對象,這里使用的是一個 Map 進行封裝
public class SyncFactory {
/**
* Key : 交易的請求流水號
* Value :交易的請求信息的封裝
*/
private Map<String, TransactionInfo> map = new ConcurrentHashMap<String, TransactionInfo>();
public synchronized TransactionInfo put(String seqNum, Object req){
TransactionInfo info = new TransactionInfo(seqNum, req);
map.put(seqNum, info);
return info;
}
}
- 交易系統接收到商戶發起的交易請求時,阻塞線程,調用 SyncFactory 緩存交易信息,當然要先獲取流水號,要填充那個 Map 對象的嘛
// 這里自定義實現獲取交易流水號的邏輯
String seqNum = getSeqNum();
// 調用 SyncFactory 的方法,緩存交易信息
TransactionInfo info = syncFactory.put(seqNum, trxReqObj);
// 調用 wait 方法,自定義自己合適的 timeout, 不可能等交易結果等上幾小時的嘛
synchronized (info) {
info.wait(syncTimeOut * 1000);
}
- 在 mq 監聽消費者的邏輯中, 獲取異步結果,根據流水號 seqNum 來調用 notifyAll 喚醒阻塞的線程
public class TransactionResultHttpHandler implements HttpHandler {
@Override
public void handle(HttpExchange he) throws IOException {
// 自定義從異步結果中獲取交易流水號的邏輯
String seqNum = getNumFromResult();
// 根據上面獲取到的流水號,取到緩存的交易信息對象
TransactionInfo info = syncFactory.put(seqNum, trxReqObj);
// 將異步結果的交易結果信息
synchronized (info) {
info .setTrxResultObj(res);
info .notifyAll();
}
}
}
整個異步轉同步的過程大概就是這個樣子。
然后補充兩點這個設計的問題。
- 在 TransactionInfo 類中,可以增加一個表示這個信息緩存的時間信息。用來判斷當結果返回或者是等待結果超時之后,從 SyncFactory 中把對應的信息給移除掉。
// 未返回結果, 判斷是否超過自定義的 syncTimeOut, 從 Map 中移除
public boolean expired(int timeout){
return (System.currentTimeMillis()- info.getLiveTime() > syncTimeOut);
}
// 有回結果后, 判斷如果在自定義的 syncTimeOut 時間內就返回的話,把 seqNum 對應的交易信息從 Map 中移除
if(System.currentTimeMillis()-info.getLiveTime()< syncTimeOut){
syncFactory.remove(seqNum)
}
- 第二個問題就是這個設計有個很明顯的缺陷,在調用 wait 方法的時候,線程阻塞等待請求結果的返回,如果在同一時間出現並發數較大的情況,線程池一下盛滿,直接把應用搞掛了。當然這個問題就是這整個設計實現方式的問題了。暫時沒有找到其他的異步轉同步的解決辦法,所以第二個問題沒有解決。
大概就是這些。有其他辦法的,可以留個鏈接哇,我也再找其他解決思路~~~