在上一節中,我們講了三種方式來實現延時任務,其實,將三種方式結合起來用,對於一些中小型公司已經足夠了,但是在中大型互聯網公司還是遠遠不夠的。
想必大家對Redis起碼有一個初步的概念:基於內存的非關系型數據庫。在平時的業務開發中,Redis經常會被用做緩存,來提高網站的性能,減少數據庫的訪問,所以一想到Redis,腦海中第一個浮現出來的就是緩存。
沒辦法,對於搬磚業務開發,所使用到的Redis基本也僅僅局限於緩存/代替Session了。但是Redis的應用場景很多很多。下面我就用Redis來實現延時任務。
我在這里假設大家已經對Redis有了一個基本的了解,並且使用過Redis,所以我不再過多的講述Redis的基本知識了。
開門見山,我們會使用到Redis的ZSet數據結構。
ZSet可以理解為Set的升級版本,Set是無序的,ZSet是有序的,而且會實時排序,所以ZSet也被稱為 Sort Set,我是比較傾向於后面的稱呼的,因為從字面上的意思就可以知道這個Set擁有一個特點:排序。
ZSet有兩個元素:member ,score,ZSet會根據score進行排序,member就是成員的意思。
要想完成這個需求,我們第一個想到的應該就是怎么把數據推送給Redis,第二個想到的是怎么把Redis數據給讀取出來。
我所用的是Jedis
通過IDEA的自動提示功能,我們很容易找到zadd這方法,看起來推送數據給Redis應該是這個方法:
讓我們試一下吧。
private static JedisPoolConfig jedisPoolConfig = null;
private static JedisPool jedisPool;
private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
static {
jedisPoolConfig = new JedisPoolConfig();
jedisPool = new JedisPool(jedisPoolConfig, "", 0, 1500, "", 0);
}
private static void zadd(String key, String member, double score) {
Jedis jedis = jedisPool.getResource();
jedis.zadd(key, score, member);
jedis.close();
}
private static Set<Tuple> zrangeWithScores(String key, int start, int end) {
Jedis jedis = jedisPool.getResource();
Set<Tuple> set = jedis.zrangeWithScores(key, start, end);
jedis.close();
return set;
}
private static void zrem(String key, String member) {
Jedis jedis = jedisPool.getResource();
jedis.zrem(key, member);
jedis.close();
}
在這里我把ip和端口號,密碼,DBNum都給隱藏了 ,大家應該可以看懂。
我並沒有用Spring或者是Spring Boot來管理Redis,因為如果加上這些東西,還需要有Spring或者Spring Boot的知識,而且這里我僅僅是想演示下用Redis實現延遲任務的一個思路而已,基於此原因,大家也別糾結 沒有分不同的類,沒有異常處理 等等。
我們在main方法調用下zadd方法:
zadd("haha","order1",2018005130);
zadd("haha","order2",2017005130);
zadd("haha","order3",2016005130);
zadd("haha","order4",2021115130);
執行完成后,打開Redis管理工具,找到這個key,看一下結果:
score小的排前面,這也符合orderTime小的,越先過期。但是 score參數類型是double,所以我們不能把Date型的orderTime直接放進去了,但是我們可以把Date經過轉換,再放進去。
數據已經放進去了,怎么拿出來呢?這個方法比較難找,只能借助於搜索引擎了,最后確定了方法:
不管是參數,還是返回類型,都有點奇怪,難怪找不到。
我們調用下這個方法試一下。 根據參數名稱,猜測start是開始,end是結束,我們要讀取第一個數據,應該是傳0,1,或者是1,2。但是 其實應該是傳0,0。。。真讓人沮喪。。返回的數據也很奇怪,看不懂啊:
這個先放一下,我們發現 這個數據雖然已經被讀取出來了,但是Redis並么有把這個數據刪掉。
讓我們想想,這個ZSet並不像DelayQueue一樣,這個沒有延遲的功能。我們把數據推到Redis,下一秒就可以讀出來。所以我們需要先把數據讀取出來,然后判斷這個訂單有沒有超時,如果沒有超時的話,Sleep一段時間,再次判斷是否超時。超時了,則修改數據庫狀態,如果還是沒超時,繼續Sleep,再判斷。所以上面讀取方法沒有刪除數據,是符合我們要求的。
我們需要找到刪除的方法,最后確定了方法:
這個方法比較簡單,就是一個key,一個可變的member。
我們調用下這個方法:
zrem("haha", "order3");
再看下Redis的管理工具:
order3 被刪除了。
接下來的問題就是讀取數據方法那個奇怪的返回值怎么使用了,也沒有什么好的辦法,就是在運行的時候,ALT+F8 調出窗口,各種嘗試唄。
下面,直接把最終代碼貼出來吧:
private static final int expireTime = 15000;
//其他略,上面有
public static void main(String[] args) {
Thread productThread = new Thread(() -> {
for (int i = 0; i < 15; i++) {
try {
Thread.sleep(1200);
} catch (InterruptedException e) {
e.printStackTrace();
}
produce(i);
}
});
productThread.start();
Thread consumThread = new Thread(() -> {
consum();
});
consumThread.start();
}
private static void produce(int orderId) {
Date date = new Date();
String dateStr = simpleDateFormat.format(date);
System.out.printf("現在時間是%s,訂單%s加入隊列%n", dateStr, orderId);
zadd("order", String.valueOf(orderId), date.getTime());
}
private static void consum() {
while (true) {
Set<Tuple> set = zrangeWithScores("order", 0, 0);
for (Tuple item : set) {
Date date = new Date();
if (date.getTime() - item.getScore() > expireTime) {
String dateStr = simpleDateFormat.format(date);
System.out.printf("現在時間是%s,訂單%s已過期%n", dateStr, item.getElement());
zrem("order", item.getElement());
} else {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
最后讓我們執行一下:
沒有問題。
在這里有兩個細節需要說明下:
-
如果取出來的訂單沒有超時,會睡300毫秒,然后繼續判斷。這個數值直接影響着訂單過期的延遲情況,如果把數值設置的很大,比如20秒,顯而易見,訂單真正過期的時間遠遠不止15秒了。如果把數據設置的很小,那么服務器壓力也會增大。
-
取出數據——》刪除數據 會有一定的時間差,如果開幾個線程同時消費,或者部署幾個應用同時消費,會有重復執行的情況,如果僅僅是修改下訂單的狀態,沒什么問題,因為這是一個冪等性操作,不管執行幾次,都是同樣的結果,但是如何還有其他的非冪等性操作,比如 用戶信譽-10,就要用其他的手段來避免 重復執行了,這里就不展開了。
這種實現方式其實也比較簡單,因為大家都或多或少的使用過Redis,只要知道這幾個方法,很容易就可以實現。
但是相比上一節的三個方法來說,這個方法就高端很多了,支持多線程消費,支持多應用(部署)消費,應用服務器宕機也沒事。
好了,到這里,延時任務的第四種方式——使用Redis 就講完了。