redis實現簡單延時隊列(轉)


   繼之前用rabbitMQ實現延時隊列,Redis由於其自身的Zset數據結構,也同樣可以實現延時的操作

    Zset本質就是Set結構上加了個排序的功能,除了添加數據value之外,還提供另一屬性score,這一屬性在添加修改元素時候可以指定,每次指定后,Zset會自動重新按新的值調整順序。可以理解為有兩列字段的數據表,一列存value,一列存順序編號。操作中key理解為zset的名字,那么對延時隊列又有何用呢?試想如果score代表的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它變會按照時間戳大小進行排序,也就是對執行時間前后進行排序,這樣的話,起一個死循環線程不斷地進行取第一個key值,如果當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,就可以達到延時執行的目的, 注意不需要遍歷整個Zset集合,以免造成性能浪費。

    Zset的排列效果如下圖:

java代碼實現如下:

package cn.chinotan.service.delayQueueRedis;

import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @program: test
 * @description: redis實現延時隊列
 * @author: xingcheng
 * @create: 2018-08-19
 **/
public class AppTest {

    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
    private static CountDownLatch cdl = new CountDownLatch(10);

    public static Jedis getJedis() {
        return jedisPool.getResource();
    }

    /**
     * 生產者,生成5個訂單
     */
    public void productionDelayMessage() {
        for (int i = 0; i < 5; i++) {
            Calendar instance = Calendar.getInstance();
            // 3秒后執行
            instance.add(Calendar.SECOND, 3 + i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
            System.out.println("生產訂單: " + StringUtils.join("000000000", i + 1) + " 當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3 + i) + "秒后執行");
        }
    }

    //消費者,取訂單
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();
        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);
            if (order == null || order.isEmpty()) {
                System.out.println("當前沒有等待的任務");
                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];
            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();
            long nowTime = instance.getTimeInMillis() / 1000;
            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);
                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消費了一個任務:消費的訂單OrderId為" + element);
                }
            }
        }
    }

    static class DelayMessage implements Runnable{
        @Override
        public void run() {
            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();
        for (int i = 0; i < 10; i++) {
            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

實現效果如下:

 

生產環境使用注意:

由於這種實現方式簡單,但在生產環境下大多是多實例部署,所以存在並發問題,即緩存的查找和刪除不具有原子性(zrangeWithScores和zrem操作不是一個命令,不具有原子性),會導致消息的多次發送問題,這個問題的避免方法如下:

1.可以采用單獨一個實例部署解決(不具備高可用特性,容易單機出現故障后消息不能及時發送)

2.采用redis的lua腳本進行原子操作,即原子操作查找和刪除(實現難度大)

因此,延時隊列的實現最好采用rabbitMQ來實現,rabbitMQ天然具備分布式的特性,可以很好的用在多服務,多實例環境下,具體的實現參考https://my.oschina.net/u/3266761/blog/1926588

轉載地址:https://my.oschina.net/u/3266761/blog/1930360


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM