Springboot 指定重發的次數和延遲時間,定時異步執行 重發任務


以前寫過一篇基於Springboot使用retry框架進行實現重試業務場景的文章:

https://blog.csdn.net/qq_35387940/article/details/99676114

 

前言:
重試這個需求場景在我們做項目的時候非常常見,實現這個場景的方法也是非常多,

定期輪詢

ScheduledExecutorService 周期性線程池

消息隊列

redis有序集合

Quartz,job等定時任務框架

Timer

delayQueue

等等,我們該篇介紹的是 異步Async+延遲隊列delayQueue 。

 

進入正題:
 

一個簡單的重試需求場景

我們服務端是個中間平台,
用戶調用我們服務端下單成功,我們需要通知第三方平台發貨。
但是這個通知發貨有可能通知失敗,我們允許最大失敗次數是N次;
也就是說除了第一次通知發出后,我們需要進行額外的N次發貨通知;
而且后面額外進行的N次發貨通知是有延遲時間的, 每個之間的間隔都是動態設置的;
期間只要有一次通知成功了,那么我們就不再重新發送通知;
如果通知沒發成功,就會根據我們設置的N次以及延遲時間,繼續發送通知。

 

先創建一個異步線程池的配置類(如果你還不了解springboot使用異步線程的,可以先去看看我這篇文章:https://blog.csdn.net/qq_35387940/article/details/83991594),AsyncThreadConfig.class:

ps: 這里用的是spring提供的線程池

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;


@Configuration
@ComponentScan("com.jc.mytest.async.service")
@EnableAsync
public class AsyncThreadConfig {
/**
* 執行需要依賴線程池,這里就來配置一個線程池
* @return
*/

// 當池子大小小於corePoolSize,就新建線程,並處理請求
// 當池子大小等於corePoolSize,把請求放入workQueue(QueueCapacity)中,池子里的空閑線程就去workQueue中取任務並處理
// 當workQueue放不下任務時,就新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
// 當池子的線程數大於corePoolSize時,多余的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀

@Bean("getExecutor")
public Executor getExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//設置核心線程數
executor.setCorePoolSize(10);
//設置最大線程數
executor.setMaxPoolSize(100);
//線程池所使用的緩沖隊列
executor.setQueueCapacity(250);
//設置線程名
executor.setThreadNamePrefix("JcTest-Async");
//設置多余線程等待的時間,單位:秒
//executor.setKeepAliveSeconds();
// 初始化線程
executor.initialize();
return executor;
}
}
然后是異步執行方法的service,TestAsyncService.class:

import java.io.IOException;

/**
* @Author : JCccc
* @CreateTime : 2020/4/16
* @Description :
**/
public interface TestAsyncService {


String testNotice(int[] taskDelayMill) throws InterruptedException, IOException;
}
對應的實現類impl,TestAsyncServiceImpl.class:

import com.jc.mytest.async.service.TestAsyncService;
import com.jc.mytest.util.DelayElement;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;

/**
* @Author : JCccc
* @CreateTime : 2020/4/16
* @Description :
**/

@Service
public class TestAsyncServiceImpl implements TestAsyncService {


@Async("getExecutor")
@Override
public String testNotice(int[] taskDelayMill) throws InterruptedException, IOException {

System.out.println(Thread.currentThread().getName() + " -------正在異步執行任務------" + new Date());

DelayQueue delayQueue = new DelayQueue();

//數組的length大小就是額外需要發送的通知數
int taskSum=taskDelayMill.length;

//將每一次發送通知的間隔時間都對應創建一個延遲設置類,放入延遲隊列delayQueue里
for (int i=0;i<taskSum;i++){
delayQueue.put(new DelayElement(taskDelayMill[i]));
}

System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 執行延遲任務
System.out.println("現在執行延遲任務,調用業務接口");

//模擬調用API,通知發貨,得到發貨結果 成功或失敗

String result = getNoticeResult();

System.out.println("通知發貨的結果是:"+result);
if (!result.equals("success")){

System.out.println("任務執行中:"+delayQueue.take());
}else {

break;
}
}
//查詢訂單結果

System.out.println("通知任務不需要再發,訂單結果已經確定");

System.out.println("結束時間:" + DateFormat.getDateTimeInstance().format(new Date()));




return "success";
}


//模擬發貨通知的結果
public String getNoticeResult() throws IOException {


//模擬調用通知發貨API接口,獲取返回結果
String[] strs={"success", "-error-", "--error--","-error--"};

return RandomStr(strs);

}

//隨機返回字符串數組中的字符串
public static String RandomStr(String[] strs){
int random_index = (int) (Math.random()*strs.length);
return strs[random_index];
}


}
延遲隊列需要的參數類,DelayElement.class:

import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
* @Author : JCccc
* @CreateTime : 2020/4/17
* @Description :
**/
public class DelayElement implements Delayed {
// 延遲截止時間(單面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 獲取剩余時間
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// 隊列里元素的排序依據
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}

 

最后寫個小接口來觸發一下這個場景,TestController.class:

/**
* @Author : JCccc
* @CreateTime : 2020/4/8
* @Description :
**/

@RestController
public class TestController {


@Autowired
TestAsyncService testAsyncService;

@GetMapping("/testAsyncNotice")
public void testAsyncNotice() throws Exception {
System.out.println("發貨通知調用開始!");

int[] taskArrays = new int[]{2000, 5000, 10000};
testAsyncService.testNotice(taskArrays);

System.out.println("已經開始通知,異步執行通知");

}

}
 

整個流程實現簡單介紹
可以看到一直傳遞的接收參數是一個數組 taskArrays,

數組的元素就是每個通知任務發出的延遲時間, 可以看到我弄得是 2000,5000,10000 ;

那就是額外發3次,
結合我們的impl代碼,

先判斷隊列里面的任務還有沒有,有的話就回去執行。

第一次是延遲2秒發一次, 然后調用發貨通知接口,得到返回狀態;

如果是success,那么就是通知發貨成功,可以直接結束;

如果不是success,我們繼續調用 delayQueue.take() ,直到隊列里面的任務都被執行完畢,也就是3次都發完。

 

測試效果
 

三次發送通知都是得到失敗的結果

 

 

 

 

第二次發送通知得到成功的結果

 

 

 

 

好了,該篇簡單的應用介紹就到此。

 

 

 

最后,大家可以深入一下這個延遲隊列,它不是個簡單貨,可以看看里面的實現代碼哦(重入鎖ReentrantLock,阻塞和通知的Condition等)

 
————————————————
版權聲明:本文為CSDN博主「小目標青年」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_35387940/article/details/105578433


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM