幾種實現延時任務的方式(二)


在上一節中,我們講了三種方式來實現延時任務,其實,將三種方式結合起來用,對於一些中小型公司已經足夠了,但是在中大型互聯網公司還是遠遠不夠的。

想必大家對Redis起碼有一個初步的概念:基於內存的非關系型數據庫。在平時的業務開發中,Redis經常會被用做緩存,來提高網站的性能,減少數據庫的訪問,所以一想到Redis,腦海中第一個浮現出來的就是緩存。

沒辦法,對於搬磚業務開發,所使用到的Redis基本也僅僅局限於緩存/代替Session了。但是Redis的應用場景很多很多。下面我就用Redis來實現延時任務。

我在這里假設大家已經對Redis有了一個基本的了解,並且使用過Redis,所以我不再過多的講述Redis的基本知識了。

開門見山,我們會使用到Redis的ZSet數據結構。

ZSet可以理解為Set的升級版本,Set是無序的,ZSet是有序的,而且會實時排序,所以ZSet也被稱為 Sort Set,我是比較傾向於后面的稱呼的,因為從字面上的意思就可以知道這個Set擁有一個特點:排序。

ZSet有兩個元素:member ,score,ZSet會根據score進行排序,member就是成員的意思。

要想完成這個需求,我們第一個想到的應該就是怎么把數據推送給Redis,第二個想到的是怎么把Redis數據給讀取出來。

我所用的是Jedis

通過IDEA的自動提示功能,我們很容易找到zadd這方法,看起來推送數據給Redis應該是這個方法:
image.png
讓我們試一下吧。

    private static JedisPoolConfig jedisPoolConfig = null;

    private static JedisPool jedisPool;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");

    static {
        jedisPoolConfig = new JedisPoolConfig();
        jedisPool = new JedisPool(jedisPoolConfig, "", 0, 1500, "", 0);
    }

    private static void zadd(String key, String member, double score) {
        Jedis jedis = jedisPool.getResource();
        jedis.zadd(key, score, member);
        jedis.close();
    }

    private static Set<Tuple> zrangeWithScores(String key, int start, int end) {
        Jedis jedis = jedisPool.getResource();
        Set<Tuple> set = jedis.zrangeWithScores(key, start, end);
        jedis.close();
        return set;
    }

    private static void zrem(String key, String member) {
        Jedis jedis = jedisPool.getResource();
        jedis.zrem(key, member);
        jedis.close();
    }

在這里我把ip和端口號,密碼,DBNum都給隱藏了 ,大家應該可以看懂。

我並沒有用Spring或者是Spring Boot來管理Redis,因為如果加上這些東西,還需要有Spring或者Spring Boot的知識,而且這里我僅僅是想演示下用Redis實現延遲任務的一個思路而已,基於此原因,大家也別糾結 沒有分不同的類,沒有異常處理 等等。

我們在main方法調用下zadd方法:

  zadd("haha","order1",2018005130);
  zadd("haha","order2",2017005130);
  zadd("haha","order3",2016005130);
  zadd("haha","order4",2021115130);

執行完成后,打開Redis管理工具,找到這個key,看一下結果:
image.png
score小的排前面,這也符合orderTime小的,越先過期。但是 score參數類型是double,所以我們不能把Date型的orderTime直接放進去了,但是我們可以把Date經過轉換,再放進去。

數據已經放進去了,怎么拿出來呢?這個方法比較難找,只能借助於搜索引擎了,最后確定了方法:
image.png
不管是參數,還是返回類型,都有點奇怪,難怪找不到。
我們調用下這個方法試一下。 根據參數名稱,猜測start是開始,end是結束,我們要讀取第一個數據,應該是傳0,1,或者是1,2。但是 其實應該是傳0,0。。。真讓人沮喪。。返回的數據也很奇怪,看不懂啊:
image.png

這個先放一下,我們發現 這個數據雖然已經被讀取出來了,但是Redis並么有把這個數據刪掉。

讓我們想想,這個ZSet並不像DelayQueue一樣,這個沒有延遲的功能。我們把數據推到Redis,下一秒就可以讀出來。所以我們需要先把數據讀取出來,然后判斷這個訂單有沒有超時,如果沒有超時的話,Sleep一段時間,再次判斷是否超時。超時了,則修改數據庫狀態,如果還是沒超時,繼續Sleep,再判斷。所以上面讀取方法沒有刪除數據,是符合我們要求的。

我們需要找到刪除的方法,最后確定了方法:
image.png
這個方法比較簡單,就是一個key,一個可變的member。
我們調用下這個方法:

 zrem("haha", "order3");

再看下Redis的管理工具:
image.png
order3 被刪除了。

接下來的問題就是讀取數據方法那個奇怪的返回值怎么使用了,也沒有什么好的辦法,就是在運行的時候,ALT+F8 調出窗口,各種嘗試唄。

下面,直接把最終代碼貼出來吧:

   private static final int expireTime = 15000;
   //其他略,上面有
    public static void main(String[] args) {
        Thread productThread = new Thread(() -> {
            for (int i = 0; i < 15; i++) {
                try {
                    Thread.sleep(1200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                produce(i);
            }
        });
        productThread.start();

        Thread consumThread = new Thread(() -> {
            consum();
        });
        consumThread.start();
    }

    private static void produce(int orderId) {
        Date date = new Date();
        String dateStr = simpleDateFormat.format(date);
        System.out.printf("現在時間是%s,訂單%s加入隊列%n", dateStr, orderId);
        zadd("order", String.valueOf(orderId), date.getTime());
    }

    private static void consum() {
        while (true) {
            Set<Tuple> set = zrangeWithScores("order", 0, 0);
            for (Tuple item : set) {
                Date date = new Date();
                if (date.getTime() - item.getScore() > expireTime) {
                    String dateStr = simpleDateFormat.format(date);
                    System.out.printf("現在時間是%s,訂單%s已過期%n", dateStr, item.getElement());
                    zrem("order", item.getElement());
                } else {
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

最后讓我們執行一下:
image.png
沒有問題。

在這里有兩個細節需要說明下:

  • 如果取出來的訂單沒有超時,會睡300毫秒,然后繼續判斷。這個數值直接影響着訂單過期的延遲情況,如果把數值設置的很大,比如20秒,顯而易見,訂單真正過期的時間遠遠不止15秒了。如果把數據設置的很小,那么服務器壓力也會增大。

  • 取出數據——》刪除數據 會有一定的時間差,如果開幾個線程同時消費,或者部署幾個應用同時消費,會有重復執行的情況,如果僅僅是修改下訂單的狀態,沒什么問題,因為這是一個冪等性操作,不管執行幾次,都是同樣的結果,但是如何還有其他的非冪等性操作,比如 用戶信譽-10,就要用其他的手段來避免 重復執行了,這里就不展開了。

這種實現方式其實也比較簡單,因為大家都或多或少的使用過Redis,只要知道這幾個方法,很容易就可以實現。
但是相比上一節的三個方法來說,這個方法就高端很多了,支持多線程消費,支持多應用(部署)消費,應用服務器宕機也沒事。

好了,到這里,延時任務的第四種方式——使用Redis 就講完了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM