以前寫過一篇基於Springboot使用retry框架進行實現重試業務場景的文章:
https://blog.csdn.net/qq_35387940/article/details/99676114
前言:
重試這個需求場景在我們做項目的時候非常常見,實現這個場景的方法也是非常多,
定期輪詢
ScheduledExecutorService 周期性線程池
消息隊列
redis有序集合
Quartz,job等定時任務框架
Timer
delayQueue
等等,我們該篇介紹的是 異步Async+延遲隊列delayQueue 。
進入正題:
一個簡單的重試需求場景
我們服務端是個中間平台,
用戶調用我們服務端下單成功,我們需要通知第三方平台發貨。
但是這個通知發貨有可能通知失敗,我們允許最大失敗次數是N次;
也就是說除了第一次通知發出后,我們需要進行額外的N次發貨通知;
而且后面額外進行的N次發貨通知是有延遲時間的, 每個之間的間隔都是動態設置的;
期間只要有一次通知成功了,那么我們就不再重新發送通知;
如果通知沒發成功,就會根據我們設置的N次以及延遲時間,繼續發送通知。
先創建一個異步線程池的配置類(如果你還不了解springboot使用異步線程的,可以先去看看我這篇文章:https://blog.csdn.net/qq_35387940/article/details/83991594),AsyncThreadConfig.class:
ps: 這里用的是spring提供的線程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@ComponentScan("com.jc.mytest.async.service")
@EnableAsync
public class AsyncThreadConfig {
/**
* 執行需要依賴線程池,這里就來配置一個線程池
* @return
*/
// 當池子大小小於corePoolSize,就新建線程,並處理請求
// 當池子大小等於corePoolSize,把請求放入workQueue(QueueCapacity)中,池子里的空閑線程就去workQueue中取任務並處理
// 當workQueue放不下任務時,就新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
// 當池子的線程數大於corePoolSize時,多余的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀
@Bean("getExecutor")
public Executor getExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//設置核心線程數
executor.setCorePoolSize(10);
//設置最大線程數
executor.setMaxPoolSize(100);
//線程池所使用的緩沖隊列
executor.setQueueCapacity(250);
//設置線程名
executor.setThreadNamePrefix("JcTest-Async");
//設置多余線程等待的時間,單位:秒
//executor.setKeepAliveSeconds();
// 初始化線程
executor.initialize();
return executor;
}
}
然后是異步執行方法的service,TestAsyncService.class:
import java.io.IOException;
/**
* @Author : JCccc
* @CreateTime : 2020/4/16
* @Description :
**/
public interface TestAsyncService {
String testNotice(int[] taskDelayMill) throws InterruptedException, IOException;
}
對應的實現類impl,TestAsyncServiceImpl.class:
import com.jc.mytest.async.service.TestAsyncService;
import com.jc.mytest.util.DelayElement;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
/**
* @Author : JCccc
* @CreateTime : 2020/4/16
* @Description :
**/
@Service
public class TestAsyncServiceImpl implements TestAsyncService {
@Async("getExecutor")
@Override
public String testNotice(int[] taskDelayMill) throws InterruptedException, IOException {
System.out.println(Thread.currentThread().getName() + " -------正在異步執行任務------" + new Date());
DelayQueue delayQueue = new DelayQueue();
//數組的length大小就是額外需要發送的通知數
int taskSum=taskDelayMill.length;
//將每一次發送通知的間隔時間都對應創建一個延遲設置類,放入延遲隊列delayQueue里
for (int i=0;i<taskSum;i++){
delayQueue.put(new DelayElement(taskDelayMill[i]));
}
System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 執行延遲任務
System.out.println("現在執行延遲任務,調用業務接口");
//模擬調用API,通知發貨,得到發貨結果 成功或失敗
String result = getNoticeResult();
System.out.println("通知發貨的結果是:"+result);
if (!result.equals("success")){
System.out.println("任務執行中:"+delayQueue.take());
}else {
break;
}
}
//查詢訂單結果
System.out.println("通知任務不需要再發,訂單結果已經確定");
System.out.println("結束時間:" + DateFormat.getDateTimeInstance().format(new Date()));
return "success";
}
//模擬發貨通知的結果
public String getNoticeResult() throws IOException {
//模擬調用通知發貨API接口,獲取返回結果
String[] strs={"success", "-error-", "--error--","-error--"};
return RandomStr(strs);
}
//隨機返回字符串數組中的字符串
public static String RandomStr(String[] strs){
int random_index = (int) (Math.random()*strs.length);
return strs[random_index];
}
}
延遲隊列需要的參數類,DelayElement.class:
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Author : JCccc
* @CreateTime : 2020/4/17
* @Description :
**/
public class DelayElement implements Delayed {
// 延遲截止時間(單面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 獲取剩余時間
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// 隊列里元素的排序依據
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}
最后寫個小接口來觸發一下這個場景,TestController.class:
/**
* @Author : JCccc
* @CreateTime : 2020/4/8
* @Description :
**/
@RestController
public class TestController {
@Autowired
TestAsyncService testAsyncService;
@GetMapping("/testAsyncNotice")
public void testAsyncNotice() throws Exception {
System.out.println("發貨通知調用開始!");
int[] taskArrays = new int[]{2000, 5000, 10000};
testAsyncService.testNotice(taskArrays);
System.out.println("已經開始通知,異步執行通知");
}
}
整個流程實現簡單介紹
可以看到一直傳遞的接收參數是一個數組 taskArrays,
數組的元素就是每個通知任務發出的延遲時間, 可以看到我弄得是 2000,5000,10000 ;
那就是額外發3次,
結合我們的impl代碼,
先判斷隊列里面的任務還有沒有,有的話就回去執行。
第一次是延遲2秒發一次, 然后調用發貨通知接口,得到返回狀態;
如果是success,那么就是通知發貨成功,可以直接結束;
如果不是success,我們繼續調用 delayQueue.take() ,直到隊列里面的任務都被執行完畢,也就是3次都發完。
測試效果
三次發送通知都是得到失敗的結果
第二次發送通知得到成功的結果
好了,該篇簡單的應用介紹就到此。
最后,大家可以深入一下這個延遲隊列,它不是個簡單貨,可以看看里面的實現代碼哦(重入鎖ReentrantLock,阻塞和通知的Condition等)
————————————————
版權聲明:本文為CSDN博主「小目標青年」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_35387940/article/details/105578433