為什么要重試:
- 遠程調用超時、網絡突然中斷可以重試。
- 外部 RPC 調用,或者數據入庫等操作,如果一次操作失敗,可以進行多次重試。
- 通過重試,可以提交調用成功的可能性。
優雅的重試機制要具備幾點:
- 無侵入:這個好理解,不改動當前的業務邏輯,對於需要重試的地方,可以很簡單的實現
- 可配置:包括重試次數,重試的間隔時間,是否使用異步方式等
- 通用性:最好是無改動(或者很小改動)的支持絕大部分的場景,拿過來直接可用
優雅重試共性和原理:
- 正常和重試優雅解耦,重試斷言條件實例或邏輯異常實例是兩者溝通的媒介。
- 約定重試間隔,差異性重試策略,設置重試超時時間,進一步保證重試有效性以及重試流程穩定性。
- 都使用了命令設計模式,通過委托重試對象完成相應的邏輯操作,同時內部封裝實現重試邏輯。
- Spring-tryer和guava-tryer工具都是線程安全的重試,能夠支持並發業務場景的重試邏輯正確性。
優雅重試適用場景:
- 功能邏輯中存在不穩定依賴場景,需要使用重試獲取預期結果或者嘗試重新執行邏輯不立即結束。比如遠程接口訪問,數據加載訪問,數據上傳校驗等等。
- 對於異常場景存在需要重試場景,同時希望把正常邏輯和重試邏輯解耦。
- 對於需要基於數據媒介交互,希望通過重試輪詢檢測執行邏輯場景也可以考慮重試方案。
優雅重試解決思路:
切面方式
這個思路比較清晰,在需要添加重試的方法上添加一個用於重試的自定義注解,然后在切面中實現重試的邏輯,主要的配置參數則根據注解中的選項來初始化
優點:
-
- 真正的無侵入
缺點:
-
- 某些方法無法被切面攔截的場景無法覆蓋(如spring-aop無法切私有方法,final方法)
- 直接使用aspecj則有些小復雜;如果用spring-aop,則只能切被spring容器管理的bean
消息總線方式
這個也比較容易理解,在需要重試的方法中,發送一個消息,並將業務邏輯作為回調方法傳入;由一個訂閱了重試消息的consumer來執行重試的業務邏輯
優點:
-
- 重試機制不受任何限制,即在任何地方你都可以使用
- 利用
EventBus
框架,可以非常容易把框架搭起來
缺點:
-
- 業務侵入,需要在重試的業務處,主動發起一條重試消息
- 調試理解復雜(消息總線方式的最大優點和缺點,就是過於靈活了,你可能都不知道什么地方處理這個消息,特別是新的童鞋來維護這段代碼時)
- 如果要獲取返回結果,不太好處理, 上下文參數不好處理
模板方式
優點:
-
- 簡單(依賴簡單:引入一個類就可以了; 使用簡單:實現抽象類,講業務邏輯填充即可;)
- 靈活(這個是真正的靈活了,你想怎么干都可以,完全由你控制)
缺點:
-
- 強侵入
- 代碼臃腫
把這個單獨撈出來,主要是某些時候我就一兩個地方要用到重試,簡單的實現下就好了,也沒有必用用到上面這么重的方式;而且我希望可以針對代碼快進行重試
這個的設計還是非常簡單的,基本上代碼都可以直接貼出來,一目了然:
public abstract class RetryTemplate { private static final int DEFAULT_RETRY_TIME = 1; private int retryTime = DEFAULT_RETRY_TIME; private int sleepTime = 0;// 重試的睡眠時間 public int getSleepTime() { return sleepTime; } public RetryTemplate setSleepTime(int sleepTime) { if(sleepTime < 0) { throw new IllegalArgumentException("sleepTime should equal or bigger than 0"); } this.sleepTime = sleepTime; return this; } public int getRetryTime() { return retryTime; } public RetryTemplate setRetryTime(int retryTime) { if (retryTime <= 0) { throw new IllegalArgumentException("retryTime should bigger than 0"); } this.retryTime = retryTime; return this; } /** * 重試的業務執行代碼 * 失敗時請拋出一個異常 * * todo 確定返回的封裝類,根據返回結果的狀態來判定是否需要重試 * * @return */ protected abstract Object doBiz() throws Exception; //預留一個doBiz方法由業務方來實現,在其中書寫需要重試的業務代碼,然后執行即可 public Object execute() throws InterruptedException { for (int i = 0; i < retryTime; i++) { try { return doBiz(); } catch (Exception e) { log.error("業務執行出現異常,e: {}", e); Thread.sleep(sleepTime); } } return null; } public Object submit(ExecutorService executorService) { if (executorService == null) { throw new IllegalArgumentException("please choose executorService!"); } return executorService.submit((Callable) () -> execute()); } }
使用示例:
public void retryDemo() throws InterruptedException { Object ans = new RetryTemplate() { @Override protected Object doBiz() throws Exception { int temp = (int) (Math.random() * 10); System.out.println(temp); if (temp > 3) { throw new Exception("generate value bigger then 3! need retry"); } return temp; } }.setRetryTime(10).setSleepTime(10).execute(); System.out.println(ans); }
spring-retry
Spring Retry 為 Spring 應用程序提供了聲明性重試支持。 它用於Spring批處理、Spring集成、Apache Hadoop(等等)的Spring。
在分布式系統中,為了保證數據分布式事務的強一致性,在調用RPC接口或者發送MQ時,針對可能會出現網絡抖動請求超時情況采取一下重試操作。 用的最多的重試方式就是MQ了,但是如果你的項目中沒有引入MQ,就不方便了。
還有一種方式,是開發者自己編寫重試機制,但是大多不夠優雅。
缺陷
spring-retry 工具雖能優雅實現重試,但是存在兩個不友好設計:
- 一個是重試實體限定為
Throwable
子類,說明重試針對的是可捕捉的功能異常為設計前提的,但是我們希望依賴某個數據對象實體作為重試實體, 但 sping-retry框架必須強制轉換為Throwable子類。 - 另一個是重試根源的斷言對象使用的是 doWithRetry 的 Exception 異常實例,不符合正常內部斷言的返回設計。
Spring Retry 提倡以注解的方式對方法進行重試,重試邏輯是同步執行的,當拋出相關異常后執行重試, 如果你要以返回值的某個狀態來判定是否需要重試,可能只能通過自己判斷返回值然后顯式拋出異常了。只讀操作可以重試,冪等寫操作可以重試,但是非冪等寫操作不能重試,重試可能導致臟寫,或產生重復數據。
@Recover
注解在使用時無法指定方法,如果一個類中多個重試方法,就會很麻煩。
spring-retry 結構
- BackOff:補償值,一般指失敗后多久進行重試的延遲值。
- Sleeper:暫停應用的工具,通常用來應用補償值。
- RetryState:重試狀態,通常包含一個重試的鍵值。
-
RetryCallback:封裝你需要重試的業務邏輯(上文中的doSth)
-
RecoverCallback:封裝了多次重試都失敗后你需要執行的業務邏輯(上文中的doSthWhenStillFail)
-
RetryContext:重試語境下的上下文,代表了能被重試動作使用的資源。可用於在多次Retry或者Retry 和Recover之間傳遞參數或狀態(在多次doSth或者doSth與doSthWhenStillFail之間傳遞參數)
-
RetryOperations: 定義了“重試”的模板(重試的API),要求傳入RetryCallback,可選傳入RecoveryCallback;
- RetryTemplate :RetryOperations的具體實現,組合了RetryListener[],BackOffPolicy,RetryPolicy。
-
RetryListener:用來監控Retry的執行情況,並生成統計信息。
-
RetryPolicy:重試的策略或條件,可以簡單的進行多次重試,可以是指定超時時間進行重試(上文中的someCondition),決定失敗能否重試。
-
BackOffPolicy: 重試的回退策略,在業務邏輯執行發生異常時。如果需要重試,我們可能需要等一段時間(可能服務器過於繁忙,如果一直不間隔重試可能拖垮服務器),當然這段時間可以是0,也可以是固定的,可以是隨機的(參見tcp的擁塞控制算法中的回退策略)。回退策略在上文中體現為wait();
RetryPolicy提供了如下策略實現:
-
NeverRetryPolicy:只允許調用RetryCallback一次,不允許重試;
-
AlwaysRetryPolicy:允許無限重試,直到成功,此方式邏輯不當會導致死循環;
-
SimpleRetryPolicy:固定次數重試策略,默認重試最大次數為3次,RetryTemplate默認使用的策略;
-
TimeoutRetryPolicy:超時時間重試策略,默認超時時間為1秒,在指定的超時時間內允許重試;
-
CircuitBreakerRetryPolicy:有熔斷功能的重試策略,需設置3個參數openTimeout、resetTimeout和delegate
-
-
delegate:是真正判斷是否重試的策略,當重試失敗時,則執行熔斷策略;應該配置基於次數的SimpleRetryPolicy或者基於超時的TimeoutRetryPolicy策略,且策略都是全局模式,而非局部模式,所以要注意次數或超時的配置合理性。
-
openTimeout:openWindow,配置熔斷器電路打開的超時時間,當超過openTimeout之后熔斷器電路變成半打開狀態(主要有一次重試成功,則閉合電路);
-
resetTimeout:timeout,配置重置熔斷器重新閉合的超時時間
-
-
CompositeRetryPolicy:組合重試策略,有兩種組合方式,樂觀組合重試策略是指只要有一個策略允許重試即可以,悲觀組合重試策略是指只要有一個策略不允許重試即可以,但不管哪種組合方式,組合中的每一個策略都會執行。
BackOffPolicy 提供了如下策略實現:
-
NoBackOffPolicy:無退避算法策略,即當重試時是立即重試;
-
FixedBackOffPolicy:固定時間的退避策略,需設置參數sleeper(指定等待策略,默認是Thread.sleep,即線程休眠)、backOffPeriod(休眠時間,默認1秒);
-
UniformRandomBackOffPolicy:隨機時間退避策略,需設置sleeper、minBackOffPeriod、maxBackOffPeriod,該策略在[minBackOffPeriod,maxBackOffPeriod之間取一個隨機休眠時間,minBackOffPeriod默認500毫秒,maxBackOffPeriod默認1500毫秒;
-
ExponentialBackOffPolicy:指數退避策略,需設置參數sleeper、initialInterval、maxInterval和multiplier。initialInterval指定初始休眠時間,默認100毫秒,maxInterval指定最大休眠時間,默認30秒,multiplier指定乘數,即下一次休眠時間為當前休眠時間*multiplier;
-
ExponentialRandomBackOffPolicy:隨機指數退避策略,引入隨機乘數,固定乘數可能會引起很多服務同時重試導致DDos,使用隨機休眠時間來避免這種情況。
RetryTemplate主要流程實現:
//示例一 public void upload(final Map<String, Object> map) throws Exception { // 構建重試模板實例 RetryTemplate retryTemplate = new RetryTemplate(); // 設置重試策略,主要設置重試次數 SimpleRetryPolicy policy =
new SimpleRetryPolicy(3, Collections.<Class<? extends Throwable>, Boolean> singletonMap(Exception.class, true)); // 設置重試回退操作策略,主要設置重試間隔時間 FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy(); fixedBackOffPolicy.setBackOffPeriod(100); retryTemplate.setRetryPolicy(policy); retryTemplate.setBackOffPolicy(fixedBackOffPolicy); // 通過RetryCallback 重試回調實例包裝正常邏輯邏輯,第一次執行和重試執行執行的都是這段邏輯 final RetryCallback<Object, Exception> retryCallback = new RetryCallback<Object, Exception>() { //RetryContext 重試操作上下文約定,統一spring-try包裝 public Object doWithRetry(RetryContext context) throws Exception { System.out.println("do some thing"); Exception e = uploadToOdps(map); System.out.println(context.getRetryCount()); throw e;//這個點特別注意,重試的根源通過Exception返回 } }; // 通過RecoveryCallback 重試流程正常結束或者達到重試上限后的退出恢復操作實例 final RecoveryCallback<Object> recoveryCallback = new RecoveryCallback<Object>() { public Object recover(RetryContext context) throws Exception { System.out.println("do recory operation"); return null; } }; try { // 由retryTemplate 執行execute方法開始邏輯執行 retryTemplate.execute(retryCallback, recoveryCallback); } catch (Exception e) { e.printStackTrace(); } } //示例二 protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,RecoveryCallback<T> recoveryCallback,
RetryState state) throws E, ExhaustedRetryException { //重試策略 RetryPolicy retryPolicy = this.retryPolicy; //退避策略 BackOffPolicy backOffPolicy = this.backOffPolicy; //重試上下文,當前重試次數等都記錄在上下文中 RetryContext context = open(retryPolicy, state); try { //攔截器模式,執行RetryListener#open boolean running = doOpenInterceptors(retryCallback, context); //判斷是否可以重試執行 while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { try {//執行RetryCallback回調 return retryCallback.doWithRetry(context); } catch (Throwable e) {//異常時,要進行下一次重試准備 //遇到異常后,注冊該異常的失敗次數 registerThrowable(retryPolicy, state, context, e); //執行RetryListener#onError doOnErrorInterceptors(retryCallback, context, e); //如果可以重試,執行退避算法,比如休眠一小段時間后再重試 if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) { backOffPolicy.backOff(backOffContext); } //state != null && state.rollbackFor(context.getLastThrowable()) //在有狀態重試時,如果是需要執行回滾操作的異常,則立即拋出異常 if (shouldRethrow(retryPolicy, context, state)) { throw RetryTemplate.<E>wrapIfNecessary(e); } } //如果是有狀態重試,且有GLOBAL_STATE屬性,則立即跳出重試終止;
//當拋出的異常是非需要執行回滾操作的異常時,才會執行到此處,CircuitBreakerRetryPolicy會在此跳出循環; if (state != null && context.hasAttribute(GLOBAL_STATE)) { break; } } //重試失敗后,如果有RecoveryCallback,則執行此回調,否則拋出異常 return handleRetryExhausted(recoveryCallback, context, state); } catch (Throwable e) { throw RetryTemplate.<E>wrapIfNecessary(e); } finally { //清理環境 close(retryPolicy, context, state, lastException == null || exhausted); //執行RetryListener#close,比如統計重試信息 doCloseInterceptors(retryCallback, context, lastException); } }
有狀態or無狀態
- 無狀態重試,是在一個循環中執行完重試策略,即重試上下文保持在一個線程上下文中,在一次調用中進行完整的重試策略判斷。如遠程調用某個查詢方法時是最常見的無狀態重試:
RetryTemplate template = new RetryTemplate(); //重試策略:次數重試策略 RetryPolicy retryPolicy = new SimpleRetryPolicy(3); template.setRetryPolicy(retryPolicy); //退避策略:指數退避策略 ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(100); backOffPolicy.setMaxInterval(3000); backOffPolicy.setMultiplier(2); backOffPolicy.setSleeper(new ThreadWaitSleeper()); template.setBackOffPolicy(backOffPolicy); //當重試失敗后,拋出異常 String result = template.execute(new RetryCallback<String, RuntimeException>() { @Override public String doWithRetry(RetryContext context) throws RuntimeException { throw new RuntimeException("timeout"); } }); //當重試失敗后,執行RecoveryCallback String result = template.execute(new RetryCallback<String, RuntimeException>() { @Override public String doWithRetry(RetryContext context) throws RuntimeException { System.out.println("retry count:" + context.getRetryCount()); throw new RuntimeException("timeout"); } }, new RecoveryCallback<String>() { @Override public String recover(RetryContext context) throws Exception { return "default"; } });
- 有狀態重試,有兩種情況需要使用有狀態重試,事務操作需要回滾、熔斷器模式。
事務操作需要回滾場景時,當整個操作中拋出的是數據庫異常DataAccessException,則不能進行重試需要回滾,而拋出其他異常則可以進行重試,可以通過RetryState實現:
//當前狀態的名稱,當把狀態放入緩存時,通過該key查詢獲取 Object key = "mykey"; //是否每次都重新生成上下文還是從緩存中查詢,即全局模式(如熔斷器策略時從緩存中查詢) boolean isForceRefresh = true; //對DataAccessException進行回滾 BinaryExceptionClassifier rollbackClassifier = new BinaryExceptionClassifier(Collections.<Class<? extends Throwable>>singleton(DataAccessException.class)); RetryState state = new DefaultRetryState(key, isForceRefresh, rollbackClassifier); String result = template.execute(new RetryCallback<String, RuntimeException>() { @Override public String doWithRetry(RetryContext context) throws RuntimeException { System.out.println("retry count:" + context.getRetryCount()); throw new TypeMismatchDataAccessException(""); } }, new RecoveryCallback<String>() { @Override public String recover(RetryContext context) throws Exception { return "default"; } }, state);
RetryTemplate中在有狀態重試時,回滾場景時直接拋出異常處理代碼:
//state != null && state.rollbackFor(context.getLastThrowable()) //在有狀態重試時,如果是需要執行回滾操作的異常,則立即拋出異常 if (shouldRethrow(retryPolicy,context, state)) { throw RetryTemplate.<E>wrapIfNecessary(e); }
熔斷器場景。在有狀態重試時,且是全局模式,不在當前循環中處理重試,而是全局重試模式(不是線程上下文),如熔斷器策略時測試代碼如下所示。
RetryTemplate template = new RetryTemplate(); CircuitBreakerRetryPolicy retryPolicy = new CircuitBreakerRetryPolicy(new SimpleRetryPolicy(3)); retryPolicy.setOpenTimeout(5000); retryPolicy.setResetTimeout(20000); template.setRetryPolicy(retryPolicy); for (int i = 0; i < 10; i++) { try { Object key = "circuit"; boolean isForceRefresh = false; RetryState state = new DefaultRetryState(key, isForceRefresh); String result = template.execute(new RetryCallback<String, RuntimeException>() { @Override public String doWithRetry(RetryContext context) throws RuntimeException { System.out.println("retry count:" + context.getRetryCount()); throw new RuntimeException("timeout"); } }, new RecoveryCallback<String>() { @Override public String recover(RetryContext context) throws Exception { return "default"; } }, state); System.out.println(result); } catch (Exception e) { System.out.println(e); } }
為什么說是全局模式呢?我們配置了isForceRefresh為false,則在獲取上下文時是根據key “circuit”從緩存中獲取,從而拿到同一個上下文。
Object key = "circuit"; boolean isForceRefresh = false; RetryState state = new DefaultRetryState(key,isForceRefresh);
如下RetryTemplate代碼說明在有狀態模式下,不會在循環中進行重試。
if (state != null && context.hasAttribute(GLOBAL_STATE)) { break; }
判斷熔斷器電路是否打開的代碼:
public boolean isOpen() { long time = System.currentTimeMillis() - this.start; boolean retryable = this.policy.canRetry(this.context); if (!retryable) {//重試失敗 //在重置熔斷器超時后,熔斷器器電路閉合,重置上下文 if (time > this.timeout) { this.context = createDelegateContext(policy, getParent()); this.start = System.currentTimeMillis(); retryable = this.policy.canRetry(this.context); } else if (time < this.openWindow) { //當在熔斷器打開狀態時,熔斷器電路打開,立即熔斷 if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) { setAttribute(CIRCUIT_OPEN, true); } this.start = System.currentTimeMillis(); return true; } } else {//重試成功 //在熔斷器電路半打開狀態時,斷路器電路閉合,重置上下文 if (time > this.openWindow) { this.start = System.currentTimeMillis(); this.context = createDelegateContext(policy, getParent()); } } setAttribute(CIRCUIT_OPEN, !retryable); return !retryable; }
從如上代碼可看出spring-retry的熔斷策略相對簡單:
-
-
當重試失敗,且在熔斷器打開時間窗口[0,openWindow) 內,立即熔斷;
-
當重試失敗,且在指定超時時間后(>timeout),熔斷器電路重新閉合;
-
在熔斷器半打開狀態[openWindow, timeout] 時,只要重試成功則重置上下文,斷路器閉合。
-
注解介紹
@EnableRetry
表示是否開始重試。
序號 | 屬性 | 類型 | 默認值 | 說明 |
---|---|---|---|---|
1 | proxyTargetClass | boolean | false | 指示是否要創建基於子類的(CGLIB)代理,而不是創建標准的基於Java接口的代理。當proxyTargetClass屬性為true時,使用CGLIB代理。默認使用標准JAVA注解 |
@Retryable
標注此注解的方法在發生異常時會進行重試
序號 | 屬性 | 類型 | 默認值 | 說明 |
---|---|---|---|---|
1 | interceptor | String | ”” | 將 interceptor 的 bean 名稱應用到 retryable() |
2 | value | class[] | {} | 可重試的異常類型 |
3 | include | class[] | {} | 和value一樣,默認空,當exclude也為空時,所有異常都重試 |
4 | exclude | class[] | {} | 指定異常不重試,默認空,當include也為空時,所有異常都重試 |
5 | label | String | ”” | 統計報告的唯一標簽。如果沒有提供,調用者可以選擇忽略它,或者提供默認值。 |
6 | maxAttempts | int | 3 | 嘗試的最大次數(包括第一次失敗),默認為3次。 |
7 | backoff | @Backoff | @Backoff() | 重試補償機制,指定用於重試此操作的backoff屬性。默認為空 |
@Backoff
不設置參數時,默認使用FixedBackOffPolicy(指定等待時間),重試等待1000ms
序號 | 屬性 | 類型 | 默認值 | 說明 |
---|---|---|---|---|
1 | delay | long | 0 | 指定延遲后重試 ,如果不設置則默認使用 1000 milliseconds |
2 | maxDelay | long | 0 | 最大重試等待時間 |
3 | multiplier | long | 0 | 指定延遲的倍數,比如delay=5000l,multiplier=2時,第一次重試為5秒后,第二次為10秒,第三次為20秒(大於0生效) |
4 | random | boolean | false | 隨機重試等待時間 |
@Recover
用於恢復處理程序的方法調用的注釋。返回類型必須與@retryable方法匹配。 可拋出的第一個參數是可選的(但是沒有它的方法只會被調用)。 從失敗方法的參數列表按順序填充后續的參數。
用於@Retryable重試失敗后處理方法,此注解注釋的方法參數一定要是@Retryable拋出的異常,否則無法識別,可以在該方法中進行日志處理。
說明:
- 使用了@Retryable的方法不能在本類被調用,不然重試機制不會生效。也就是要標記為@Service,然后在其它類使用@Autowired注入或者@Bean去實例才能生效。
- 要觸發@Recover方法,那么在@Retryable方法上不能有返回值,只能是void才能生效。
- 使用了@Retryable的方法里面不能使用try...catch包裹,要在發放上拋出異常,不然不會觸發。
- 在重試期間這個方法是同步的,如果使用類似Spring Cloud這種框架的熔斷機制時,可以結合重試機制來重試后返回結果。
- Spring Retry不只能注入方式去實現,還可以通過API的方式實現,類似熔斷處理的機制就基於API方式實現會比較寬松。