Java按時間梯度實現異步回調接口


1. 背景

  在業務處理完之后,需要調用其他系統的接口,將相應的處理結果通知給對方,若是同步請求,假如調用的系統出現異常或是宕機等事件,會導致自身業務受到影響,事務會一直阻塞,數據庫連接不夠用等異常現象,可以通過異步回調來防止阻塞,但異步的情況還存在一個問題,若調用一次不成功的話接下來怎么處理?這個地方就需要按時間梯度回調,比如前期按10s間隔回調,回調3次,若不成功按30s回調,回調2次,再不成功按分鍾回調,依次類推……相當於給了對方系統恢復的時間,不可能一直處於異常或宕機等異常狀態,若是再不成功可以再通過人工干預的手段去處理了,具體業務具體實現。

2. 技術實現

  大體實現思路如下圖,此過程用到兩個隊列,當前隊列和Next隊列,當前隊列用來存放第一次需要回調的數據對象,如果調用不成功則放入Next隊列,按照制定的時間策略再繼續回調,直到成功或最終持久化后人工接入處理。

 

  用到的技術如下:

  • http請求庫,retrofit2
  • 隊列,LinkedBlockingQueue
  • 調度線程池,ScheduledExecutorService

 

3. 主要代碼說明

3.1 回調時間梯度的策略設計

采用枚舉來對策略規則進行處理,便於代碼上的維護,該枚舉設計三個參數,級別、回調間隔、回調次數;

/**
 * 回調策略
 */
public enum CallbackType {

    //等級1,10s執行3次
    SECONDS_10(1, 10, 3),
    //等級2,30s執行2次
    SECONDS_30(2, 30, 2),
    //等級3,60s執行2次
    MINUTE_1(3, 60, 2),
    //等級4,5min執行1次
    MINUTE_5(4, 300, 1),
    //等級5,30min執行1次
    MINUTE_30(5, 30*60, 1),
    //等級6,1h執行2次
    HOUR_1(6, 60*60, 1),
    //等級7,3h執行2次
    HOUR_3(7, 60*60*3, 1),
    //等級8,6h執行2次
    HOUR_6(8, 60*60*6, 1);

    //級別
    private int level;
    //回調間隔時間 秒
    private int intervalTime;
    //回調次數
    private int count;
}

3.2 數據傳輸對象設計

聲明抽象父類,便於其他對象調用傳輸繼承。

/**
 * 消息對象父類
 */
public abstract class MessageInfo {

    //開始時間
    private long startTime;
    //更新時間
    private long updateTime;
    //是否回調成功
    private boolean isSuccess=false;
    //回調次數
    private int count=0;
    //回調策略
    private CallbackType callbackType;
}

要傳輸的對象,繼承消息父類;

/**
 * 工單回調信息
 */
public class WorkOrderMessage extends MessageInfo {

    //車架號
    private String vin;
    //工單號
    private String workorderno;
    //工單狀態
    private Integer status;
    //工單原因
    private String reason;
    //操作用戶
    private Integer userid;
}

3.3 調度線程池的使用

//聲明線程池,大小為16
private ScheduledExecutorService pool = Executors.newScheduledThreadPool(16);

...略
while (true){ //從隊列獲取數據,交給定時器執行 try { WorkOrderMessage message = MessageQueue.getMessageFromNext(); long excueTime = message.getUpdateTime()+message.getCallbackType().getIntervalTime()* 1000; long t = excueTime - System.currentTimeMillis(); if (t/1000 < 5) {//5s之內將要執行的數據提交給調度線程池 System.out.println("MessageHandleNext-滿足定時器執行條件"+JSONObject.toJSONString(message)); pool.schedule(new Callable<Boolean>() { @Override public Boolean call() throws Exception { remoteCallback(message); return true; } }, t, TimeUnit.MILLISECONDS); }else { MessageQueue.putMessageToNext(message); } } catch (InterruptedException e) { System.out.println(e); } }

 

3.4 retrofit2的使用,方便好用。

具體可查看官網相關文檔進行了解,用起來還是比較方便的。http://square.github.io/retrofit/

retrofit初始化:

import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

public class RetrofitHelper {

    private static final String HTTP_URL = "http://baidu.com/";
    private static Retrofit retrofit;

    public static Retrofit instance(){
        if (retrofit == null){
            retrofit = new Retrofit.Builder()
                    .baseUrl(HTTP_URL)
                    .addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
}

如果需要修改超時時間,連接時間等可以這樣初始話,Retrofit采用OkHttpClient

import okhttp3.OkHttpClient;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

import java.util.concurrent.TimeUnit;

public class RetrofitHelper {

    private static final String HTTP_URL = "http://baidu.com/";
    private static Retrofit retrofit;

    public static Retrofit instance(){
        if (retrofit == null){
            retrofit = new Retrofit.Builder()
                    .baseUrl(HTTP_URL)
                    .client(new OkHttpClient.Builder()
                            .connectTimeout(30, TimeUnit.SECONDS)//連接時間
                            .readTimeout(30, TimeUnit.SECONDS)//讀時間
                            .writeTimeout(30, TimeUnit.SECONDS)//寫時間
                            .build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .build();
        }
        return retrofit;
    }
}

Retrofit使用通過接口調用,要先聲明一個接口;

import com.alibaba.fastjson.JSONObject;
import com.woasis.callbackdemo.bean.WorkOrderMessage;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.POST;

public interface WorkOrderMessageInterface {

    @POST("/api")
    Call<JSONObject> updateBatteryInfo(@Body WorkOrderMessage message);

}

接口和實例對象准備好了,接下來就是調用;

private void remoteCallback(WorkOrderMessage message){
        //實例接口對象
        WorkOrderMessageInterface workOrderMessageInterface = RetrofitHelper.instance().create(WorkOrderMessageInterface.class);
        
        //調用接口方法
        Call<JSONObject> objectCall = workOrderMessageInterface.updateBatteryInfo(message);
        System.out.println("遠程調用執行:"+new Date());

        //異步調用執行
        objectCall.enqueue(new Callback<JSONObject>() {
            @Override
            public void onResponse(Call<JSONObject> call, Response<JSONObject> response) {
                System.out.println("MessageHandleNext****調用成功"+Thread.currentThread().getId());
                message.setSuccess(true);
                System.out.println("MessageHandleNext-回調成功"+JSONObject.toJSONString(message));
            }

            @Override
            public void onFailure(Call<JSONObject> call, Throwable throwable) {
                System.out.println("MessageHandleNext++++調用失敗"+Thread.currentThread().getId());
                //失敗后再將數據放入隊列
                try {
                    //對回調策略初始化
                    long currentTime = System.currentTimeMillis();
                    message.setUpdateTime(currentTime);
                    message.setSuccess(false);
                    CallbackType callbackType = message.getCallbackType();
                    //獲取等級
                    int level = CallbackType.getLevel(callbackType);
                    //獲取次數
                    int count = CallbackType.getCount(callbackType);
                    //如果等級已經最高,則不再回調
                    if (CallbackType.HOUR_6.getLevel() == callbackType.getLevel() && count == message.getCount()){
                        System.out.println("MessageHandleNext-等級最高,不再回調, 線下處理:"+JSONObject.toJSONString(message));
                    }else {
                        //看count是否最大,count次數最大則增加level
                        if (message.getCount()<callbackType.getCount()){
                            message.setCount(message.getCount()+1);
                        }else {//如果不小,則增加level
                            message.setCount(1);
                            level += 1;
                            message.setCallbackType(CallbackType.getTypeByLevel(level));
                        }
                        MessageQueue.putMessageToNext(message);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    System.out.println("MessageHandleNext-放入隊列數據失敗");
                }
            }
        });
    }

3.5結果實現

4.總結

本次實現了按照時間梯度去相應其他系統的接口,不再導致本身業務因其他系統的異常而阻塞。請大佬們看實現有沒有不合理的地方,歡迎批評指正。

源碼:https://github.com/liuzwei/callback-demo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM