基於redisson的延遲隊列


通常在一個jvm進程中,若想實現延遲邏輯,可以使用jdk自帶的延遲隊列DelayQueue來實現。DelayQueue中的元素PriorityQueue來實現的,DelayQueue中的元素會實現

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

即可在DelayQueue進行poll操作時候獲取最近需要的元素。但是這種延時隊列是保存在內存中,所以一旦進程關閉或崩潰,隊列中的數據都會丟失,所以只有配合持久化才可以保證數據不丟失。

 

那么如果在多進程條件下,如果要實現延遲隊列,則需要一個統一的地方保存延遲元素,這個元素可以被稱為任務,redis是一個不錯的選擇。Redisson實現了集群環境下延遲隊列的實現。

引入reddison依賴

<dependency>
     <groupId>org.redisson</groupId>
     <artifactId>redisson</artifactId>
     <version>3.10.3</version>
</dependency>

 

redis基本配置

    private Config initRedissonConfig() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://" + host + ":" + port)
                .setTimeout(timeout)
                .setConnectionPoolSize(maxIdle)
                .setConnectionMinimumIdleSize(minIdle);
        return config;
    }

    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() {
        Config config = initRedissonConfig();
        return Redisson.create(config);
    }

定義redisson阻塞隊列,注冊相關bean

public class QueueConfig {

private final String queueName = "queue";

@Bean
public RBlockingQueue<String> rBlockingQueue(@Qualifier("redissonClient") RedissonClient redissonClient) {
return redissonClient.getBlockingQueue(queueName);
}

@Bean
public RDelayedQueue<String> rDelayedQueue(@Qualifier("redissonClient") RedissonClient redissonClient,
@Qualifier("rBlockingQueue") RBlockingQueue<String> blockQueue) {
return redissonClient.getDelayedQueue(blockQueue);
}
}

 

下面進行測試,TakeTask負責消費隊列中的任務

public class TakeTask {

    @Resource(name = "rBlockingQueue")
    private RBlockingQueue<String> rBlockingQueue;

    @PostConstruct
    public void take() {

        new Thread(() -> {
            while (true) {
                try {
                    String s = rBlockingQueue.take();
                    System.out.println(s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

 

在延時隊列rDelayQueue中延遲添加任務,這里需要調用帶參數的offerAsync的方法,延時添加。

@RestController
@RequestMapping("/")
public class TestController {

    @Resource(name = "rDelayedQueue")
    private RDelayedQueue<String> rDelayedQueue;

    @GetMapping("/offer")
    public void offer() {
        for (int i = 1; i <= 2; i++) {
            rDelayedQueue.offerAsync("task: " + i, 1, TimeUnit.SECONDS);
        }
    }
}

由於延時隊列持久化在redis中,所以機器宕機數據不會異常丟失,機器重啟后,會正常消費隊列中積累的任務。

 

對於jdk中的DelayQueue延時隊列是采用zset來實現,每次add,會立即將元素添加到隊列中,zset會根據指定的字段進行排序,維護一個優先隊列,當進行take操作時候,取到頭節點的數據一定是最大或者最小的,但是此時頭節點不一定能取出來,需要多一步判斷,這一步其實就是  public long getDelay(TimeUnit unit);要實現的方法,只有返回值大於0才會真正被取出來。redission的延時隊列是異步延時加入的,也就是說並沒有立刻加入隊列中,而是在指定的延時時間delay之后才會加入,所以在take的時候是一定可以直接取出來隊列中的元素。

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM