springboot優雅重試


為什么要重試:

  1. 遠程調用超時、網絡突然中斷可以重試。
  2. 外部 RPC 調用,或者數據入庫等操作,如果一次操作失敗,可以進行多次重試。
  3. 通過重試,可以提交調用成功的可能性。

優雅的重試機制要具備幾點:

  • 無侵入:這個好理解,不改動當前的業務邏輯,對於需要重試的地方,可以很簡單的實現
  • 可配置:包括重試次數,重試的間隔時間,是否使用異步方式等
  • 通用性:最好是無改動(或者很小改動)的支持絕大部分的場景,拿過來直接可用

優雅重試共性和原理:

  • 正常和重試優雅解耦,重試斷言條件實例或邏輯異常實例是兩者溝通的媒介。
  • 約定重試間隔,差異性重試策略,設置重試超時時間,進一步保證重試有效性以及重試流程穩定性。
  • 都使用了命令設計模式,通過委托重試對象完成相應的邏輯操作,同時內部封裝實現重試邏輯。
  • Spring-tryer和guava-tryer工具都是線程安全的重試,能夠支持並發業務場景的重試邏輯正確性。

優雅重試適用場景:

  • 功能邏輯中存在不穩定依賴場景,需要使用重試獲取預期結果或者嘗試重新執行邏輯不立即結束。比如遠程接口訪問,數據加載訪問,數據上傳校驗等等。
  • 對於異常場景存在需要重試場景,同時希望把正常邏輯和重試邏輯解耦。
  • 對於需要基於數據媒介交互,希望通過重試輪詢檢測執行邏輯場景也可以考慮重試方案。 

優雅重試解決思路:

切面方式

這個思路比較清晰,在需要添加重試的方法上添加一個用於重試的自定義注解,然后在切面中實現重試的邏輯,主要的配置參數則根據注解中的選項來初始化

優點:

    • 真正的無侵入

缺點:

    • 某些方法無法被切面攔截的場景無法覆蓋(如spring-aop無法切私有方法,final方法)
    • 直接使用aspecj則有些小復雜;如果用spring-aop,則只能切被spring容器管理的bean

消息總線方式

這個也比較容易理解,在需要重試的方法中,發送一個消息,並將業務邏輯作為回調方法傳入;由一個訂閱了重試消息的consumer來執行重試的業務邏輯

優點:

    • 重試機制不受任何限制,即在任何地方你都可以使用
    • 利用EventBus框架,可以非常容易把框架搭起來

缺點:

    • 業務侵入,需要在重試的業務處,主動發起一條重試消息
    • 調試理解復雜(消息總線方式的最大優點和缺點,就是過於靈活了,你可能都不知道什么地方處理這個消息,特別是新的童鞋來維護這段代碼時)
    • 如果要獲取返回結果,不太好處理, 上下文參數不好處理

模板方式

優點:

    • 簡單(依賴簡單:引入一個類就可以了; 使用簡單:實現抽象類,講業務邏輯填充即可;)
    • 靈活(這個是真正的靈活了,你想怎么干都可以,完全由你控制)

缺點:

    • 強侵入
    • 代碼臃腫

把這個單獨撈出來,主要是某些時候我就一兩個地方要用到重試,簡單的實現下就好了,也沒有必用用到上面這么重的方式;而且我希望可以針對代碼快進行重試

這個的設計還是非常簡單的,基本上代碼都可以直接貼出來,一目了然:

public abstract class RetryTemplate {

    private static final int DEFAULT_RETRY_TIME = 1;
    private int retryTime = DEFAULT_RETRY_TIME; 
    private int sleepTime = 0;// 重試的睡眠時間

    public int getSleepTime() {
        return sleepTime;
    }

    public RetryTemplate setSleepTime(int sleepTime) {
        if(sleepTime < 0) {
            throw new IllegalArgumentException("sleepTime should equal or bigger than 0");
        }
        this.sleepTime = sleepTime;
        return this;
    }

    public int getRetryTime() {
        return retryTime;
    }

    public RetryTemplate setRetryTime(int retryTime) {
        if (retryTime <= 0) {
            throw new IllegalArgumentException("retryTime should bigger than 0");
        }
        this.retryTime = retryTime;
        return this;
    }

    /**
     * 重試的業務執行代碼
     * 失敗時請拋出一個異常
     *
     * todo 確定返回的封裝類,根據返回結果的狀態來判定是否需要重試
     *
     * @return
     */
    protected abstract Object doBiz() throws Exception; //預留一個doBiz方法由業務方來實現,在其中書寫需要重試的業務代碼,然后執行即可

    public Object execute() throws InterruptedException {
        for (int i = 0; i < retryTime; i++) {
            try {
                return doBiz();
            } catch (Exception e) {
                log.error("業務執行出現異常,e: {}", e);
                Thread.sleep(sleepTime);
            }
        }
        return null;
    }

    public Object submit(ExecutorService executorService) {
        if (executorService == null) {
            throw new IllegalArgumentException("please choose executorService!");
        }
        return executorService.submit((Callable) () -> execute());
    }
}

使用示例:

public void retryDemo() throws InterruptedException {
    Object ans = new RetryTemplate() {
        @Override
        protected Object doBiz() throws Exception {
            int temp = (int) (Math.random() * 10);
            System.out.println(temp);
            if (temp > 3) {
                throw new Exception("generate value bigger then 3! need retry");
            }
            return temp;
        }
    }.setRetryTime(10).setSleepTime(10).execute();
    System.out.println(ans);
}

spring-retry

Spring Retry 為 Spring 應用程序提供了聲明性重試支持。 它用於Spring批處理、Spring集成、Apache Hadoop(等等)的Spring。

在分布式系統中,為了保證數據分布式事務的強一致性,在調用RPC接口或者發送MQ時,針對可能會出現網絡抖動請求超時情況采取一下重試操作。 用的最多的重試方式就是MQ了,但是如果你的項目中沒有引入MQ,就不方便了。

還有一種方式,是開發者自己編寫重試機制,但是大多不夠優雅。

缺陷

spring-retry 工具雖能優雅實現重試,但是存在兩個不友好設計:

  • 一個是重試實體限定為 Throwable 子類,說明重試針對的是可捕捉的功能異常為設計前提的,但是我們希望依賴某個數據對象實體作為重試實體, 但 sping-retry框架必須強制轉換為Throwable子類。
  • 另一個是重試根源的斷言對象使用的是 doWithRetry 的 Exception 異常實例,不符合正常內部斷言的返回設計。

Spring Retry 提倡以注解的方式對方法進行重試,重試邏輯是同步執行的,當拋出相關異常后執行重試, 如果你要以返回值的某個狀態來判定是否需要重試,可能只能通過自己判斷返回值然后顯式拋出異常了。只讀操作可以重試,冪等寫操作可以重試,但是非冪等寫操作不能重試,重試可能導致臟寫,或產生重復數據。

@Recover 注解在使用時無法指定方法,如果一個類中多個重試方法,就會很麻煩。

spring-retry 結構

  • BackOff:補償值,一般指失敗后多久進行重試的延遲值。
  • Sleeper:暫停應用的工具,通常用來應用補償值。
  • RetryState:重試狀態,通常包含一個重試的鍵值。
  • RetryCallback:封裝你需要重試的業務邏輯(上文中的doSth)

  • RecoverCallback:封裝了多次重試都失敗后你需要執行的業務邏輯(上文中的doSthWhenStillFail)

  • RetryContext:重試語境下的上下文,代表了能被重試動作使用的資源。可用於在多次Retry或者Retry 和Recover之間傳遞參數或狀態(在多次doSth或者doSth與doSthWhenStillFail之間傳遞參數)

  • RetryOperations: 定義了“重試”的模板(重試的API),要求傳入RetryCallback,可選傳入RecoveryCallback;

  • RetryTemplate :RetryOperations的具體實現,組合了RetryListener[],BackOffPolicy,RetryPolicy。
  • RetryListener:用來監控Retry的執行情況,並生成統計信息。

  • RetryPolicy:重試的策略或條件,可以簡單的進行多次重試,可以是指定超時時間進行重試(上文中的someCondition),決定失敗能否重試。

  • BackOffPolicy: 重試的回退策略,在業務邏輯執行發生異常時。如果需要重試,我們可能需要等一段時間(可能服務器過於繁忙,如果一直不間隔重試可能拖垮服務器),當然這段時間可以是0,也可以是固定的,可以是隨機的(參見tcp的擁塞控制算法中的回退策略)。回退策略在上文中體現為wait();

RetryPolicy提供了如下策略實現:

  • NeverRetryPolicy:只允許調用RetryCallback一次,不允許重試;

  • AlwaysRetryPolicy:允許無限重試,直到成功,此方式邏輯不當會導致死循環;

  • SimpleRetryPolicy:固定次數重試策略,默認重試最大次數為3次,RetryTemplate默認使用的策略;

  • TimeoutRetryPolicy:超時時間重試策略,默認超時時間為1秒,在指定的超時時間內允許重試;

  • CircuitBreakerRetryPolicy:有熔斷功能的重試策略,需設置3個參數openTimeout、resetTimeout和delegate

    • delegate:是真正判斷是否重試的策略,當重試失敗時,則執行熔斷策略;應該配置基於次數的SimpleRetryPolicy或者基於超時的TimeoutRetryPolicy策略,且策略都是全局模式,而非局部模式,所以要注意次數或超時的配置合理性。

    • openTimeout:openWindow,配置熔斷器電路打開的超時時間,當超過openTimeout之后熔斷器電路變成半打開狀態(主要有一次重試成功,則閉合電路);

    • resetTimeout:timeout,配置重置熔斷器重新閉合的超時時間

  • CompositeRetryPolicy:組合重試策略,有兩種組合方式,樂觀組合重試策略是指只要有一個策略允許重試即可以,悲觀組合重試策略是指只要有一個策略不允許重試即可以,但不管哪種組合方式,組合中的每一個策略都會執行。

BackOffPolicy 提供了如下策略實現:

  • NoBackOffPolicy:無退避算法策略,即當重試時是立即重試;

  • FixedBackOffPolicy:固定時間的退避策略,需設置參數sleeper(指定等待策略,默認是Thread.sleep,即線程休眠)、backOffPeriod(休眠時間,默認1秒);

  • UniformRandomBackOffPolicy:隨機時間退避策略,需設置sleeper、minBackOffPeriod、maxBackOffPeriod,該策略在[minBackOffPeriod,maxBackOffPeriod之間取一個隨機休眠時間,minBackOffPeriod默認500毫秒,maxBackOffPeriod默認1500毫秒;

  • ExponentialBackOffPolicy:指數退避策略,需設置參數sleeper、initialInterval、maxInterval和multiplier。initialInterval指定初始休眠時間,默認100毫秒,maxInterval指定最大休眠時間,默認30秒,multiplier指定乘數,即下一次休眠時間為當前休眠時間*multiplier;

  • ExponentialRandomBackOffPolicy:隨機指數退避策略,引入隨機乘數,固定乘數可能會引起很多服務同時重試導致DDos,使用隨機休眠時間來避免這種情況。

RetryTemplate主要流程實現:

//示例一
public void upload(final Map<String, Object> map) throws Exception {
        // 構建重試模板實例
        RetryTemplate retryTemplate = new RetryTemplate();
        // 設置重試策略,主要設置重試次數
        SimpleRetryPolicy policy = 
        new SimpleRetryPolicy(3, Collections.<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true)); // 設置重試回退操作策略,主要設置重試間隔時間 FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(100); retryTemplate.setRetryPolicy(policy); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); // 通過RetryCallback 重試回調實例包裝正常邏輯邏輯,第一次執行和重試執行執行的都是這段邏輯 final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() { //RetryContext 重試操作上下文約定,統一spring-try包裝 public Object doWithRetry(RetryContext context) throws Exception { System.out.println("do some thing"); Exception e = uploadToOdps(map); System.out.println(context.getRetryCount()); throw e;//這個點特別注意,重試的根源通過Exception返回 } }; // 通過RecoveryCallback 重試流程正常結束或者達到重試上限后的退出恢復操作實例 final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { public Object recover(RetryContext context) throws Exception { System.out.println("do recory operation"); return null; } }; try { // 由retryTemplate 執行execute方法開始邏輯執行 retryTemplate.execute(retryCallback, recoveryCallback); } catch (Exception e) { e.printStackTrace(); } } //示例二 protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,RecoveryCallback<T> recoveryCallback,
  RetryState state) throws E, ExhaustedRetryException { //重試策略 RetryPolicy retryPolicy = this.retryPolicy; //退避策略 BackOffPolicy backOffPolicy = this.backOffPolicy; //重試上下文,當前重試次數等都記錄在上下文中 RetryContext context = open(retryPolicy, state); try { //攔截器模式,執行RetryListener#open boolean running = doOpenInterceptors(retryCallback, context); //判斷是否可以重試執行 while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try {//執行RetryCallback回調 return retryCallback.doWithRetry(context); } catch (Throwable e) {//異常時,要進行下一次重試准備 //遇到異常后,注冊該異常的失敗次數 registerThrowable(retryPolicy, state, context, e); //執行RetryListener#onError doOnErrorInterceptors(retryCallback, context, e); //如果可以重試,執行退避算法,比如休眠一小段時間后再重試 if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { backOffPolicy.backOff(backOffContext); } //state != null && state.rollbackFor(context.getLastThrowable()) //在有狀態重試時,如果是需要執行回滾操作的異常,則立即拋出異常 if (shouldRethrow(retryPolicy, context, state)) { throw RetryTemplate.<E>wrapIfNecessary(e); } } //如果是有狀態重試,且有GLOBAL_STATE屬性,則立即跳出重試終止;
      //當拋出的異常是非需要執行回滾操作的異常時,才會執行到此處,CircuitBreakerRetryPolicy會在此跳出循環; if (state != null && context.hasAttribute(GLOBAL_STATE)) { break; } } //重試失敗后,如果有RecoveryCallback,則執行此回調,否則拋出異常 return handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable e) { throw RetryTemplate.<E>wrapIfNecessary(e); } finally { //清理環境 close(retryPolicy, context, state, lastException == null || exhausted); //執行RetryListener#close,比如統計重試信息 doCloseInterceptors(retryCallback, context, lastException); } }

有狀態or無狀態

  • 無狀態重試,是在一個循環中執行完重試策略,即重試上下文保持在一個線程上下文中,在一次調用中進行完整的重試策略判斷。如遠程調用某個查詢方法時是最常見的無狀態重試:
RetryTemplate template = new RetryTemplate();
//重試策略:次數重試策略
RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
template.setRetryPolicy(retryPolicy);
//退避策略:指數退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(100);
backOffPolicy.setMaxInterval(3000);
backOffPolicy.setMultiplier(2);
backOffPolicy.setSleeper(new ThreadWaitSleeper());
template.setBackOffPolicy(backOffPolicy);

//當重試失敗后,拋出異常
String result = template.execute(new RetryCallback<String, RuntimeException>() {
    @Override
    public String doWithRetry(RetryContext context) throws RuntimeException {
        throw new RuntimeException("timeout");
    }
});
//當重試失敗后,執行RecoveryCallback
String result = template.execute(new RetryCallback<String, RuntimeException>() {
    @Override
    public String doWithRetry(RetryContext context) throws RuntimeException {
        System.out.println("retry count:" + context.getRetryCount());
        throw new RuntimeException("timeout");
    }
}, new RecoveryCallback<String>() {
    @Override
    public String recover(RetryContext context) throws Exception {
        return "default";
    }
});
  • 有狀態重試,有兩種情況需要使用有狀態重試,事務操作需要回滾、熔斷器模式。

事務操作需要回滾場景時,當整個操作中拋出的是數據庫異常DataAccessException,則不能進行重試需要回滾,而拋出其他異常則可以進行重試,可以通過RetryState實現:

//當前狀態的名稱,當把狀態放入緩存時,通過該key查詢獲取
Object key = "mykey";
//是否每次都重新生成上下文還是從緩存中查詢,即全局模式(如熔斷器策略時從緩存中查詢)
boolean isForceRefresh = true;
//對DataAccessException進行回滾
BinaryExceptionClassifier rollbackClassifier =
        new BinaryExceptionClassifier(Collections.<Class<? extends Throwable>>singleton(DataAccessException.class));
RetryState state = new DefaultRetryState(key, isForceRefresh, rollbackClassifier);

String result = template.execute(new RetryCallback<String, RuntimeException>() {
    @Override
    public String doWithRetry(RetryContext context) throws RuntimeException {
        System.out.println("retry count:" + context.getRetryCount());
        throw new TypeMismatchDataAccessException("");
    }
}, new RecoveryCallback<String>() {
    @Override
    public String recover(RetryContext context) throws Exception {
        return "default";
    }
}, state);

RetryTemplate中在有狀態重試時,回滾場景時直接拋出異常處理代碼:

//state != null && state.rollbackFor(context.getLastThrowable())
//在有狀態重試時,如果是需要執行回滾操作的異常,則立即拋出異常
if (shouldRethrow(retryPolicy,context, state)) {
    throw RetryTemplate.<E>wrapIfNecessary(e);
}

熔斷器場景。在有狀態重試時,且是全局模式,不在當前循環中處理重試,而是全局重試模式(不是線程上下文),如熔斷器策略時測試代碼如下所示。

RetryTemplate template = new RetryTemplate();
CircuitBreakerRetryPolicy retryPolicy =
        new CircuitBreakerRetryPolicy(new SimpleRetryPolicy(3));
retryPolicy.setOpenTimeout(5000);
retryPolicy.setResetTimeout(20000);
template.setRetryPolicy(retryPolicy);

for (int i = 0; i < 10; i++) {
    try {
        Object key = "circuit";
        boolean isForceRefresh = false;
        RetryState state = new DefaultRetryState(key, isForceRefresh);
        String result = template.execute(new RetryCallback<String, RuntimeException>() {
            @Override
            public String doWithRetry(RetryContext context) throws RuntimeException {
                System.out.println("retry count:" + context.getRetryCount());
                throw new RuntimeException("timeout");
            }
        }, new RecoveryCallback<String>() {
            @Override
            public String recover(RetryContext context) throws Exception {
                return "default";
            }
        }, state);
        System.out.println(result);
    } catch (Exception e) {
        System.out.println(e);
    }
}

為什么說是全局模式呢?我們配置了isForceRefresh為false,則在獲取上下文時是根據key “circuit”從緩存中獲取,從而拿到同一個上下文。

Object key = "circuit";
boolean isForceRefresh = false;
RetryState state = new DefaultRetryState(key,isForceRefresh); 

如下RetryTemplate代碼說明在有狀態模式下,不會在循環中進行重試。

if (state != null && context.hasAttribute(GLOBAL_STATE)) {
   break;
}

判斷熔斷器電路是否打開的代碼:

public boolean isOpen() {
   long time = System.currentTimeMillis() - this.start;
   boolean retryable = this.policy.canRetry(this.context);
   if (!retryable) {//重試失敗
      //在重置熔斷器超時后,熔斷器器電路閉合,重置上下文
      if (time > this.timeout) {
         this.context = createDelegateContext(policy, getParent());
         this.start = System.currentTimeMillis();
         retryable = this.policy.canRetry(this.context);
      } else if (time < this.openWindow) {
         //當在熔斷器打開狀態時,熔斷器電路打開,立即熔斷
         if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
            setAttribute(CIRCUIT_OPEN, true);
         }
         this.start = System.currentTimeMillis();
         return true;
      }
   } else {//重試成功
      //在熔斷器電路半打開狀態時,斷路器電路閉合,重置上下文
      if (time > this.openWindow) {
         this.start = System.currentTimeMillis();
         this.context = createDelegateContext(policy, getParent());
      }
   }
   setAttribute(CIRCUIT_OPEN, !retryable);
   return !retryable;
}

從如上代碼可看出spring-retry的熔斷策略相對簡單:

    • 當重試失敗,且在熔斷器打開時間窗口[0,openWindow) 內,立即熔斷;

    • 當重試失敗,且在指定超時時間后(>timeout),熔斷器電路重新閉合;

    • 在熔斷器半打開狀態[openWindow, timeout] 時,只要重試成功則重置上下文,斷路器閉合。

注解介紹

@EnableRetry

表示是否開始重試。

序號 屬性 類型 默認值 說明
1 proxyTargetClass boolean false 指示是否要創建基於子類的(CGLIB)代理,而不是創建標准的基於Java接口的代理。當proxyTargetClass屬性為true時,使用CGLIB代理。默認使用標准JAVA注解

@Retryable

標注此注解的方法在發生異常時會進行重試

序號 屬性 類型 默認值 說明
1 interceptor String ”” 將 interceptor 的 bean 名稱應用到 retryable()
2 value class[] {} 可重試的異常類型
3 include class[] {} 和value一樣,默認空,當exclude也為空時,所有異常都重試
4 exclude class[] {} 指定異常不重試,默認空,當include也為空時,所有異常都重試 
5 label String ”” 統計報告的唯一標簽。如果沒有提供,調用者可以選擇忽略它,或者提供默認值。
6 maxAttempts int 3 嘗試的最大次數(包括第一次失敗),默認為3次。
7 backoff @Backoff @Backoff() 重試補償機制,指定用於重試此操作的backoff屬性。默認為空

@Backoff

不設置參數時,默認使用FixedBackOffPolicy(指定等待時間),重試等待1000ms

序號 屬性 類型 默認值 說明
1 delay long 0 指定延遲后重試 ,如果不設置則默認使用 1000 milliseconds
2 maxDelay long 0 最大重試等待時間
3 multiplier long 0 指定延遲的倍數,比如delay=5000l,multiplier=2時,第一次重試為5秒后,第二次為10秒,第三次為20秒(大於0生效)
4 random boolean false 隨機重試等待時間

@Recover

用於恢復處理程序的方法調用的注釋。返回類型必須與@retryable方法匹配。 可拋出的第一個參數是可選的(但是沒有它的方法只會被調用)。 從失敗方法的參數列表按順序填充后續的參數。

用於@Retryable重試失敗后處理方法,此注解注釋的方法參數一定要是@Retryable拋出的異常,否則無法識別,可以在該方法中進行日志處理。

說明

  1. 使用了@Retryable的方法不能在本類被調用,不然重試機制不會生效。也就是要標記為@Service,然后在其它類使用@Autowired注入或者@Bean去實例才能生效。
  2. 要觸發@Recover方法,那么在@Retryable方法上不能有返回值,只能是void才能生效。
  3. 使用了@Retryable的方法里面不能使用try...catch包裹,要在發放上拋出異常,不然不會觸發。
  4. 在重試期間這個方法是同步的,如果使用類似Spring Cloud這種框架的熔斷機制時,可以結合重試機制來重試后返回結果。
  5. Spring Retry不只能注入方式去實現,還可以通過API的方式實現,類似熔斷處理的機制就基於API方式實現會比較寬松。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM