背景介紹
在實際的項目應用場景中,經常會需要遇到遠程服務接口的調用,時不時會出現一些接口調用超時,或者函數執行失敗需要重試的情況,例如下邊的這種場景:
某些不太穩定的接口,需要依賴於第三方的遠程調用,例如數據加載,數據上傳相關的類型。
方案整理
基於try catch機制
這種方式來做重試處理的話,會比較簡單粗暴。
public void test(){ try{ //執行遠程調用方法 doRef(); }catch(Exception e){ //重新執行遠程調用方法 doRef(); } }
當出現了異常的時候,立即執行遠程調用,此時可能忽略了幾個問題:
- 如果重試出現了問題,是否還能繼續重試
- 第一次遠程調用出現了異常,此時可能第三方服務此時負載已達到瓶頸,或許需要間隔一段時間再發送遠程調用的成功率會高些。
- 多次重試都失敗之后如何通知調用方自己。
使用Spring的Retry組件
Spring的Retry組件提供了非常豐富的功能用於請求重試。接入這款組件的方式也很簡單, 首先需要引入相關的依賴配置:
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
然后是在啟動類上加入一個@EnableRetry注解
@SpringBootApplication @EnableRetry public class Application { public static void main(String[] args) { SpringApplication.run(Application.class); } }
最后是在需要被執行的函數頭部加入這一@Retryable注解:
@RestController @RequestMapping(value = "/retry") public class RetryController { @GetMapping(value = "/test") @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 1.5)) public int retryServiceOne(int code) throws Exception { System.out.println("retryServiceOne 被調用,時間" + LocalTime.now()); System.out.println("執行當前線程為:" + Thread.currentThread().getName()); if(code==0){ throw new Exception("業務執行異常!"); } System.out.println("retryServiceOne 執行成功!"); return 200; } }
測試結果:
請求url:http://localhost:8080/retry/test?code=0
控制台會輸出相關的調用信息:
從輸出記錄來看,確實是spring封裝好的retry組件幫我們在出現了異常的情況下會重復調用該方法多次,並且每次調用都會有對應的時間間隔。
好的,看到了這里,目前大概了解了Spring的這款重試組件該如何去使用,那么我們再來深入思考一下,如果需要通過我們手寫去實現一款重試組件需要考慮哪些因素呢?下邊我和大家分享下自己的一些設計思路,可能有些部分設計得並不是特別完善。
手寫一款重試組件
首先我們需要定義一個retry注解:
@Documented @Target(value = ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface Retry { int maxAttempts() default 3; int delay() default 3000; Class<? extends Throwable>[] value() default {}; Class<? extends RetryStrategy> strategy() default FastRetryStrategy.class; Class<? extends RetryListener> listener() default AbstractRetryListener.class; }
這款注解里面主要屬性有:
- 最大重試次數
- 每次重試的間隔時間
- 關注異常(僅當拋出了相應異常的條件下才會重試)
- 重試策略(默認是快速重試)
- 重試監聽器
為了減少代碼的耦合性,所以這里我將重試接口的攔截和處理都歸到了aop層面去處理,因此需要引入一個對應的依賴配置:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
重試部分的Aop模塊代碼如下所示:
@Aspect @Component public class RetryAop { @Resource private ApplicationContext applicationContext; @Pointcut("@annotation(org.idea.qiyu.framework.retry.jdk.config.Retry)") public void pointCut() { } @Around(value = "pointCut()") public Object doBiz(ProceedingJoinPoint point) { MethodSignature methodSignature = (MethodSignature) point.getSignature(); Method method = methodSignature.getMethod(); Retry retry = method.getDeclaredAnnotation(Retry.class); RetryStrategy retryStrategy = applicationContext.getBean(retry.strategy()); RetryTask retryTask = new RetryTaskImpl(point); retryStrategy.initArgs(retry, retryTask); try { Object result = point.proceed(); return result; } catch (Throwable throwable) { retryStrategy.retryTask(); } return null; } private class RetryTaskImpl implements RetryTask { private ProceedingJoinPoint proceedingJoinPoint; private Object result; private volatile Boolean asyncRetryState = null; public RetryTaskImpl(ProceedingJoinPoint proceedingJoinPoint) { this.proceedingJoinPoint = proceedingJoinPoint; } public ProceedingJoinPoint getProceedingJoinPoint() { return proceedingJoinPoint; } public void setProceedingJoinPoint(ProceedingJoinPoint proceedingJoinPoint) { this.proceedingJoinPoint = proceedingJoinPoint; } public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } public Boolean getAsyncRetryState() { return asyncRetryState; } public void setAsyncRetryState(Boolean asyncRetryState) { this.asyncRetryState = asyncRetryState; } @Override public Object getRetryResult() { return result; } @Override public Boolean getRetryStatus() { return asyncRetryState; } @Override public void setRetrySuccess() { this.setAsyncRetryState(true); } @Override public void doTask() throws Throwable { this.result = proceedingJoinPoint.proceed(); } } }
這里解釋一下,這個模塊主要是攔截帶有 @Retry 注解的方法,然后將需要執行的部分放入到一個RetryTask類型的對象當中,內部的doTask函數會觸發真正的方法調用。
RetryTask接口的代碼如下:
public interface RetryTask { Object getRetryResult(); Boolean getRetryStatus(); void setRetrySuccess(); void doTask() throws Throwable; }
首次函數執行的過程中,會有一個try catch的捕獲,如果出現了異常情況就會進入了retryTask函數內部:
在進入retryTask函數當中,則相當於進入了具體的重試策略函數執行邏輯中。
從代碼截圖可以看出,重試策略是從Spring容器中加載出來的,這是需要提前注入到Spring容器。
重試策略接口:
public interface RetryStrategy { /** * 初始化一些參數配置 * * @param retry * @param retryTask */ void initArgs(Retry retry,RetryTask retryTask); /** * 重試策略 */ void retryTask(); }
默認的重試策略為快速重試策略,相關代碼為:
public class FastRetryStrategy implements RetryStrategy, ApplicationContextAware { private Retry retry; private RetryTask retryTask; private ApplicationContext applicationContext; private ExecutorService retryThreadPool; public FastRetryStrategy() { } public ExecutorService getRetryThreadPool() { return retryThreadPool; } public void setRetryThreadPool(ExecutorService retryThreadPool) { this.retryThreadPool = retryThreadPool; } @Override public void initArgs(Retry retry, RetryTask retryTask) { this.retry = retry; this.retryTask = retryTask; } @Override public void retryTask() { if (!FastRetryStrategy.class.equals(retry.strategy())) { System.err.println("error retry strategy"); return; } //安全類型bean查找 String[] beanNames = applicationContext.getBeanNamesForType(retry.listener()); RetryListener retryListener = null; if (beanNames != null && beanNames.length > 0) { retryListener = applicationContext.getBean(retry.listener()); } Class<? extends Throwable>[] exceptionClasses = retry.value(); RetryListener finalRetryListener = retryListener; //如果沒有支持異步功能,那么在進行重試的時候就會一直占用着服務器的業務線程,導致服務器線程負載暴增 retryThreadPool.submit(new Runnable() { @Override public void run() { for (int i = 1; i <= retry.maxAttempts(); i++) { int finalI = i; try { retryTask.doTask(); retryTask.setRetrySuccess(); return; } catch (Throwable e) { for (Class<? extends Throwable> clazz : exceptionClasses) { if (e.getClass().equals(clazz) || e.getClass().isInstance(clazz)) { if (finalRetryListener != null) { finalRetryListener.notifyObserver(); } System.err.println("[FastRetryStrategy] retry again,attempt's time is " + finalI + ",tims is " + System.currentTimeMillis()); try { Thread.sleep(retry.delay()); } catch (InterruptedException ex) { ex.printStackTrace(); } continue; } } } } } }); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; ExecutorService executorService = (ExecutorService) applicationContext.getBean("retryThreadPool"); this.setRetryThreadPool(executorService); } }
重試的過程中專門采用了一個單獨的線程池來執行相應邏輯,這樣可以避免一直消耗着服務器的業務線程,導致業務線程被長時間占用影響整體吞吐率。
另外,當重試出現異常的時候,還可以通過回調對應的監聽器組件做一些記錄:例如日志記錄,操作記錄寫入等等操作。
public interface RetryListener { /** * 通知觀察者 */ void notifyObserver(); }
默認抽象類
public abstract class AbstractRetryListener implements RetryListener { @Override public abstract void notifyObserver(); }
自定義的一個監聽器對象:
public class DefaultRetryListener implements RetryListener { @Override public void notifyObserver() { System.out.println("this is a DefaultRetryListener"); } }
好了,此時基本的配置都差不多了,如果需要使用的話,則需要進行一些bean的初始化配置:
@Configuration public class RetryConfig { @Bean public FastRetryStrategy fastRetryStrategy(){ return new FastRetryStrategy(); } @Bean public RetryListener defaultRetryListener(){ return new DefaultRetryListener(); } @Bean public ExecutorService retryThreadPool(){ ExecutorService executorService = new ThreadPoolExecutor(2,4,0L, TimeUnit.SECONDS,new LinkedBlockingQueue<>()); return executorService; } }
這里面主要將重試策略,重試監聽器,重試所使用的線程池都分別進行了裝載配置到Spring容器當中。
測試方式:
通過http請求url的方式進行驗證:http://localhost:8080/do-test?code=2
@RestController public class TestController { public static int count = 0; @Retry(maxAttempts = 5, delay = 100, value = {ArithmeticException.class}, strategy = FastRetryStrategy.class, listener = DefaultRetryListener.class) @GetMapping(value = "/do-test") public String doTest(int code) { count++; System.out.println("code is :" + code + " result is :" + count % 3 + " count is :" + count); if (code == 1) { System.out.println("--this is a test"); } else { if (count % 5 != 0) { System.out.println(4 / 0); } } return "success"; } }
請求之后可以看到控制台輸出了對應的內容:
不足點:
- 需要指定完全匹配的異常才能做到相關的重試處理,這部分的處理步驟會比較繁瑣,並不是特別靈活。
- 一定要是出現了異常才能進行重試,但是往往有些時候可能會返回一些錯誤含義的DTO對象,這方面的處理並不是那么靈活。
guava-retryer的重試組件就在上述的幾個不足點中有所完善,關於其具體使用就不在本文中介紹了,感興趣的小伙伴可以去了解下這款組件的使用細節。