基於Redisson的延遲隊列實現


package com.dong.mytest.demo.client;

import cn.hutool.extra.spring.SpringUtil;
import com.dong.mytest.demo.common.dto.DelayMessage;
import com.dong.mytest.demo.common.util.DateUtil;
import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author dong
 */
@Slf4j
@Component
public class RedissonDelayQueueClient implements InitializingBean {

    @Resource
    private RedissonClient redissonClient;

    private final Map<String, RDelayedQueue<DelayMessage>> delayQueueMap = new ConcurrentHashMap<>(16);

    public void addDelayMessage(DelayMessage delayMessage) {
        log.info("delayMessage={}", delayMessage);
        if (delayQueueMap.get(delayMessage.getQueueName()) == null) {
            log.warn("queueName={},該延遲隊列不存在,請確認后再試...", delayMessage.getQueueName());
            return;
        }
        delayMessage.setCreateTime(DateUtil.getNowFormatStr());
        RDelayedQueue<DelayMessage> rDelayedQueue = delayQueueMap.get(delayMessage.getQueueName());
        rDelayedQueue.offer(delayMessage, delayMessage.getDelayTime(), delayMessage.getTimeUnit() == null ? TimeUnit.SECONDS : delayMessage.getTimeUnit());
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        // 有新的延遲隊列在這里添加,隊列消費類需要繼承DelayQueueConsumer,並且service名稱為 ${queueName}Consumer
        List<String> queueNameList = Lists.newArrayList("orderAutoCancelDelayQueue");

        // 加載延遲隊列
        for (String queueName : queueNameList) {
            DelayQueueConsumer delayQueueConsumer = SpringUtil.getBean(queueName + "Consumer");
            if (delayQueueConsumer == null) {
                throw new RuntimeException("queueName=" + queueName + ",delayQueueConsumer=null,請檢查配置...");
            }
            // Redisson的延時隊列是對另一個隊列的再包裝,使用時要先將延時消息添加到延時隊列中,當延時隊列中的消息達到設定的延時時間后,
            // 該延時消息才會進行進入到被包裝隊列中,因此,我們只需要對被包裝隊列進行監聽即可。
            RBlockingQueue<DelayMessage> rBlockingQueue = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<DelayMessage> rDelayedQueue = redissonClient.getDelayedQueue(rBlockingQueue);
            delayQueueMap.put(queueName, rDelayedQueue);

            // 訂閱新元素的到來,調用的是takeAsync(),異步執行
            rBlockingQueue.subscribeOnElements(delayQueueConsumer::execute);
        }
    }
}

 

package com.dong.mytest.demo.service.delayqueue;

import com.dong.mytest.demo.common.dto.DelayMessage;

/**
 * @author dong
 */
public interface DelayQueueConsumer {

    /**
     * 執行延遲消息
     *
     * @param delayMessage delayMessage
     */
    void execute(DelayMessage delayMessage);

}

 

package com.dong.mytest.demo.service.delayqueue.impl;

import com.dong.mytest.demo.common.dto.DelayMessage;
import com.dong.mytest.demo.service.delayqueue.DelayQueueConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

/**
 * @author dong
 */
@Service("orderAutoCancelDelayQueueConsumer")
@Slf4j
public class OrderAutoCancelDelayQueueConsumer implements DelayQueueConsumer {

    @Override
    public void execute(DelayMessage delayMessage) {
        log.info("====OrderAutoCancelConsumer=====delayMessage={}", delayMessage);
    }
}

 

package com.dong.mytest.demo.common.dto;

import com.alibaba.fastjson.JSON;
import lombok.Data;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 * @author dong
 */
@Data
public class DelayMessage implements Serializable {

    private String queueName;

    private Long delayTime;

    private TimeUnit timeUnit;

    private String msgBody;

    private String createTime;

    @Override
    public String toString() {
        return JSON.toJSONString(this);
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM