通常在一個jvm進程中,若想實現延遲邏輯,可以使用jdk自帶的延遲隊列DelayQueue來實現。DelayQueue中的元素PriorityQueue來實現的,DelayQueue中的元素會實現
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
即可在DelayQueue進行poll操作時候獲取最近需要的元素。但是這種延時隊列是保存在內存中,所以一旦進程關閉或崩潰,隊列中的數據都會丟失,所以只有配合持久化才可以保證數據不丟失。
那么如果在多進程條件下,如果要實現延遲隊列,則需要一個統一的地方保存延遲元素,這個元素可以被稱為任務,redis是一個不錯的選擇。Redisson實現了集群環境下延遲隊列的實現。
引入reddison依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.10.3</version> </dependency>
redis基本配置
private Config initRedissonConfig() { Config config = new Config(); config.useSingleServer() .setAddress("redis://" + host + ":" + port) .setTimeout(timeout) .setConnectionPoolSize(maxIdle) .setConnectionMinimumIdleSize(minIdle); return config; } @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = initRedissonConfig(); return Redisson.create(config); }
定義redisson阻塞隊列,注冊相關bean
public class QueueConfig {
private final String queueName = "queue";
@Bean
public RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonClient") RedissonClient redissonClient) {
return redissonClient.getBlockingQueue(queueName);
}
@Bean
public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonClient") RedissonClient redissonClient,
@Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {
return redissonClient.getDelayedQueue(blockQueue);
}
}
下面進行測試,TakeTask負責消費隊列中的任務
public class TakeTask { @Resource(name = "rBlockingQueue") private RBlockingQueue<String> rBlockingQueue; @PostConstruct public void take() { new Thread(() -> { while (true) { try { String s = rBlockingQueue.take(); System.out.println(s); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
在延時隊列rDelayQueue中延遲添加任務,這里需要調用帶參數的offerAsync的方法,延時添加。
@RestController @RequestMapping("/") public class TestController { @Resource(name = "rDelayedQueue") private RDelayedQueue<String> rDelayedQueue; @GetMapping("/offer") public void offer() { for (int i = 1; i <= 2; i++) { rDelayedQueue.offerAsync("task: " + i, 1, TimeUnit.SECONDS); } } }
由於延時隊列持久化在redis中,所以機器宕機數據不會異常丟失,機器重啟后,會正常消費隊列中積累的任務。
對於jdk中的DelayQueue延時隊列是采用zset來實現,每次add,會立即將元素添加到隊列中,zset會根據指定的字段進行排序,維護一個優先隊列,當進行take操作時候,取到頭節點的數據一定是最大或者最小的,但是此時頭節點不一定能取出來,需要多一步判斷,這一步其實就是 public long getDelay(TimeUnit unit);要實現的方法,只有返回值大於0才會真正被取出來。redission的延時隊列是異步延時加入的,也就是說並沒有立刻加入隊列中,而是在指定的延時時間delay之后才會加入,所以在take的時候是一定可以直接取出來隊列中的元素。
