使用redis的zset實現簡單的延時隊列


一、需求切入點
   在公司做的一個系統業務需要有個定時提醒的功能(數據在mysql中),要求提醒的時間差精准到分鍾
解決方案有:

  1. 使用定時器,每分鍾執行一次,查符合提醒的數據,發起提醒(數據庫連接與系統的負載都承受不住的!!)
  2. 將待提醒數據提前查出存進redis中,根據提醒時間設置過期時間,做redis的過期監聽,監聽到過期的數據再做業務處理(優點 : 不用實時查數據庫,一定程度上減少系統壓力 缺點: 一旦系統重啟或者系統出現異常,可能導致一些過期的數據沒有監聽到,造成數據沒有推送)
  3. 使用一個延時隊列,利用redis的zset(sort set,有序不重復集合,關聯分數score進行排序),將提醒時間作為分數,提取符合條件的score對應的集合發起提醒(本文所述也是圍繞這個方案)
    二、延時隊列的基本操作流程
    1. 基本流程圖
    2. 代碼實現
      生產者,只關心數據進隊列
public class MessageProvider {
    // 延時隊列的服務 通過這個bean來統一管理數據
    private final DelayingQueueService delayingQueueService;

    private static String APPOINTMENT_REMIND = "APPOINTMENT_REMIND";

    /**
     * 往隊列中添加消息
     * @param messageContent
     */
    public void sendMessage(String messageContent, long delay) {
        try {
            //業務代碼 ……
              //  將分裝好的數據寫進隊列
                delayingQueueService.push(queueMessage);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 撤回消息 業務延伸
     * @param members
     */
    public  void withdrawMessage(Long members){
        delayingQueueService.removeByMembersId(members);
    }


}

消費者,只關心需要消費的數據

// 從延伸隊列拉取符合消費的數據
 List<QueueMessage> msgList = delayingQueueService.pull();

   ```
msgList.stream().forEach(msg -> {
                        // 拿出已經到期的預約提示 發起提醒
                        if (current >= msg.getDelayTime()) {
                            try {
                            //  進行業務消費  ……   
                            //成功消費后移除消息
                            delayingQueueService.remove(msg);
                        }
```
                

延時隊列實現

public class DelayingQueueService {

    private static ObjectMapper mapper = Jackson2ObjectMapperBuilder.json().build();
    //key:membersId value:message
    private static ConcurrentHashMap <Long,String> membersMap= new ConcurrentHashMap<>();

    private final StringRedisTemplate redisTemplate;

    /**
     * 可以不同業務用不同的key
     */
    public static final String QUEUE_NAME = "message:queue";
    /**
     * 鎖key
     */
    public static final String LOCK_KEY="message_lock_key";


    /**
     * 插入消息
     *
     * @param queueMessage
     * @return
     */
    @SneakyThrows
    public Boolean push(QueueMessage queueMessage) {
 
        String messageStr = mapper.writeValueAsString(queueMessage);
        Boolean addFlag = redisTemplate.opsForZSet().add(QUEUE_NAME, messageStr, queueMessage.getDelayTime());
        membersMap.put(membersId,messageStr);
        return addFlag;
    }

    /**
     * 移除消息
     *
     * @param queueMessage
     * @return
     */
    @SneakyThrows
    public Boolean remove(QueueMessage queueMessage) {
  
        Long remove = redisTemplate.opsForZSet().remove(QUEUE_NAME, mapper.writeValueAsString(queueMessage));
        if(remove>0){
            membersMap.remove(membersId);
        }
        return remove > 0 ? true : false;
    }




    /**
     * 拉取最新需要
     * 被消費的消息
     * rangeByScore 根據score范圍獲取 0-當前時間戳可以拉取當前時間及以前的需要被消費的消息
     *
     * @return
     */
    public List<QueueMessage> pull() {

        List<QueueMessage> msgList  =new ArrayList<>();
            try {
                Set<String> strings = redisTemplate.opsForZSet().rangeByScore(QUEUE_NAME, 0, System.currentTimeMillis());
                if (strings == null) {
                    return null;
                }
                msgList = strings.stream().map(msg -> {
                    QueueMessage message = null;
                    try {
                        message = mapper.readValue(msg, QueueMessage.class);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                    return message;
                }).collect(Collectors.toList());
            } catch (Exception e) {
                log.error(e.toString());
            }
        return msgList;

    }

    //獲得鎖
    public Boolean getLock(){
         boolean lock = false;
        //獲得鎖
        lock = redisTemplate.opsForValue().setIfAbsent(LOCK_KEY,QUEUE_NAME+"is locking !",30, TimeUnit.SECONDS);
        return lock;
    }

    public void releaseLock(){
        redisTemplate.delete(LOCK_KEY);
    }


}

三、結束語

本文所述的方法也是存在一些小的缺點,比如,數據的正常操作依賴於第三方組件,如果redis掛掉了,這個服務就down掉了,實現延時隊列的方法有很多種,基於業務與系統本身的情況,兼容利弊去做一些取舍,以達到最好的效果


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM