來源:blog.csdn.net/qq330983778/article/details/99341671
業務流程
首先我們分析下這個流程
- 用戶提交任務。首先將任務推送至延遲隊列中。
- 延遲隊列接收到任務后,首先將任務推送至job pool中,然后計算其執行時間。
- 然后生成延遲任務(僅僅包含任務id)放入某個桶中
- 時間組件時刻輪詢各個桶,當時間到達的時候從job pool中獲得任務元信息。
- 監測任務的合法性如果已經刪除則pass。繼續輪詢。如果任務合法則再次計算時間
- 如果合法則計算時間,如果時間合法:根據topic將任務放入對應的ready queue,然后從bucket中移除。如果時間不合法,則重新計算時間再次放入bucket,並移除之前的bucket中的內容
- 消費端輪詢對應topic的ready queue。獲取job后做自己的業務邏輯。與此同時,服務端將已經被消費端獲取的job按照其設定的TTR,重新計算執行時間,並將其放入bucket。
- 完成消費后,發送finish消息,服務端根據job id刪除對應信息。
對象
我們現在可以了解到中間存在的幾個組件
- 延遲隊列,為Redis延遲隊列。實現消息傳遞
- Job pool 任務池保存job元信息。根據文章描述使用K/V的數據結構,key為ID,value為job
- Delay Bucket 用來保存業務的延遲任務。文章中描述使用輪詢方式放入某一個Bucket可以知道其並沒有使用topic來區分,個人這里默認使用順序插入
- Timer 時間組件,負責掃描各個Bucket。根據文章描述存在多個Timer,但是同一個Timer同一時間只能掃描一個Bucket
- Ready Queue 負責存放需要被完成的任務,但是根據描述根據Topic的不同存在多個Ready Queue。
其中Timer負責輪詢,Job pool、Delay Bucket、Ready Queue都是不同職責的集合。
任務狀態
- ready:可執行狀態,
- delay:不可執行狀態,等待時鍾周期。
- reserved:已被消費者讀取,但沒有完成消費。
- deleted:已被消費完成或者已被刪除。
對外提供的接口
額外的內容
- 首先根據狀態狀態描述,finish和delete操作都是將任務設置成deleted狀態。
- 根據文章描述的操作,在執行finish或者delete的操作的時候任務已經從元數據中移除,此時deleted狀態可能只存在極短時間,所以實際實現中就直接刪除了。
- 文章中並沒有說明響應超時后如何處理,所以個人現在將其重新投入了待處理隊列。
- 文章中因為使用了集群,所以使用redis的setnx鎖來保證多個時間循環處理多個桶的時候不會出現重復循環。這里因為是簡單的實現,所以就很簡單的每個桶設置一個時間隊列處理。也是為了方便簡單處理。關於分布式鎖可以看我之前的文章里面有描述。
實現
現在我們根據設計內容完成設計。這一塊設計我們分四步完成
任務及相關對象
目前需要兩個對象,一個是任務對象(job)一個負責保存任務引用的對象(delay job),Spring Boot 基礎就不介紹了,推薦下這個實戰教程:
https://github.com/javastacks/spring-boot-best-practice
任務對象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
/**
* 延遲任務的唯一標識,用於檢索任務
*/
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
/**
* 任務類型(具體業務類型)
*/
private String topic;
/**
* 任務的延遲時間
*/
private long delayTime;
/**
* 任務的執行超時時間
*/
private long ttrTime;
/**
* 任務具體的消息內容,用於處理具體業務邏輯用
*/
private String message;
/**
* 重試次數
*/
private int retryCount;
/**
* 任務狀態
*/
private JobStatus status;
}
任務引用對象
@Data
@AllArgsConstructor
public class DelayJob implements Serializable {
/**
* 延遲任務的唯一標識
*/
private long jodId;
/**
* 任務的執行時間
*/
private long delayDate;
/**
* 任務類型(具體業務類型)
*/
private String topic;
public DelayJob(Job job) {
this.jodId = job.getId();
this.delayDate = System.currentTimeMillis() + job.getDelayTime();
this.topic = job.getTopic();
}
public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}
容器
目前我們需要完成三個容器的創建,Job任務池、延遲任務容器、待完成任務容器
job任務池,為普通的K/V結構,提供基礎的操作
@Component
@Slf4j
public class JobPool {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "job.pool";
private BoundHashOperations getPool () {
BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
return ops;
}
/**
* 添加任務
* @param job
*/
public void addJob (Job job) {
log.info("任務池添加任務:{}", JSON.toJSONString(job));
getPool().put(job.getId(),job);
return ;
}
/**
* 獲得任務
* @param jobId
* @return
*/
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
if (o instanceof Job) {
return (Job) o;
}
return null;
}
/**
* 移除任務
* @param jobId
*/
public void removeDelayJob (Long jobId) {
log.info("任務池移除任務:{}",jobId);
// 移除任務
getPool().delete(jobId);
}
}
延遲任務,使用可排序的ZSet保存數據,提供取出最小值等操作
@Slf4j
@Component
public class DelayBucket {
@Autowired
private RedisTemplate redisTemplate;
private static AtomicInteger index = new AtomicInteger(0);
@Value("${thread.size}")
private int bucketsSize;
private List <String> bucketNames = new ArrayList <>();
@Bean
public List <String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}
/**
* 獲得桶的名稱
* @return
*/
private String getThisBucketName() {
int thisIndex = index.addAndGet(1);
int i1 = thisIndex % bucketsSize;
return bucketNames.get(i1);
}
/**
* 獲得桶集合
* @param bucketName
* @return
*/
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}
/**
* 放入延時任務
* @param job
*/
public void addDelayJob(DelayJob job) {
log.info("添加延遲任務:{}", JSON.toJSONString(job));
String thisBucketName = getThisBucketName();
BoundZSetOperations bucket = getBucket(thisBucketName);
bucket.add(job,job.getDelayDate());
}
/**
* 獲得最新的延期任務
* @return
*/
public DelayJob getFirstDelayTime(Integer index) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
if (CollectionUtils.isEmpty(set)) {
return null;
}
ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
Object value = typedTuple.getValue();
if (value instanceof DelayJob) {
return (DelayJob) value;
}
return null;
}
/**
* 移除延時任務
* @param index
* @param delayJob
*/
public void removeDelayTime(Integer index,DelayJob delayJob) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
bucket.remove(delayJob);
}
}
待完成任務,內部使用topic進行細分,每個topic對應一個list集合
@Component
@Slf4j
public class ReadyQueue {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "process.queue";
private String getKey(String topic) {
return NAME + topic;
}
/**
* 獲得隊列
* @param topic
* @return
*/
private BoundListOperations getQueue (String topic) {
BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
return ops;
}
/**
* 設置任務
* @param delayJob
*/
public void pushJob(DelayJob delayJob) {
log.info("執行隊列添加任務:{}",delayJob);
BoundListOperations listOperations = getQueue(delayJob.getTopic());
listOperations.leftPush(delayJob);
}
/**
* 移除並獲得任務
* @param topic
* @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
log.info("執行隊列取出任務:{}", JSON.toJSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}
}
輪詢處理
設置了線程池為每個bucket設置一個輪詢操作
@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
@Autowired
private DelayBucket delayBucket;
@Autowired
private JobPool jobPool;
@Autowired
private ReadyQueue readyQueue;
@Value("${thread.size}")
private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());
for (int i = 0; i < length; i++) {
executorService.execute(
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}
}
}
測試請求
/**
* 測試用請求
* @author daify
**/
@RestController
@RequestMapping("delay")
public class DelayController {
@Autowired
private JobService jobService;
/**
* 添加
* @param request
* @return
*/
@RequestMapping(value = "add",method = RequestMethod.POST)
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
return JSON.toJSONString(delayJob);
}
/**
* 獲取
* @return
*/
@RequestMapping(value = "pop",method = RequestMethod.GET)
public String getProcessJob(String topic) {
Job process = jobService.getProcessJob(topic);
return JSON.toJSONString(process);
}
/**
* 完成一個執行的任務
* @param jobId
* @return
*/
@RequestMapping(value = "finish",method = RequestMethod.DELETE)
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}
@RequestMapping(value = "delete",method = RequestMethod.DELETE)
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}
}
測試
添加延遲任務
通過postman請求:localhost:8000/delay/add
此時這條延時任務被添加進了線程池中
2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任務池添加任務:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延遲任務:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
根據設置10秒鍾之后任務會被添加至ReadyQueue中
2019-08-12 21:21:46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 執行隊列添加任務:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
獲得任務
這時候我們請求localhost:8000/delay/pop
這個時候任務被響應,修改狀態的同時設置其超時時間,然后放置在DelayBucket中
2019-08-09 19:36:02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 執行隊列取出任務:{"delayDate":1565321728704,"jodId":1,"topic":"測試"}
2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任務池添加任務:{"delayTime":10000,"id":1,"message":"延遲10秒,超時30秒","retryCount":0,"status":"RESERVED","topic":"測試","ttrTime":30000}
2019-08-09 19:36:02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延遲任務:{"delayDate":1565321792364,"jodId":1,"topic":"測試"}
按照設計在30秒后,任務假如沒有被消費將會重新放置在ReadyQueue中
2019-08-12 21:21:48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 執行隊列取出任務:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任務池添加任務:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
任務的刪除/消費
現在我們請求:localhost:8000/delay/delete
此時在Job pool中此任務將會被移除,此時元數據已經不存在,但任務還在DelayBucket中循環,然而在循環中當檢測到元數據已經不存的話此延時任務會被移除。
2019-08-12 21:21:54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任務池移除任務:3
2019-08-12 21:21:59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任務:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.別在再滿屏的 if/ else 了,試試策略模式,真香!!
3.卧槽!Java 中的 xx ≠ null 是什么新語法?
4.Spring Boot 2.5 重磅發布,黑暗模式太炸了!
覺得不錯,別忘了隨手點贊+轉發哦!