異步消息隊列
Redis 的 list(列表) 數據結構常用來作為異步消息隊列使用,使用rpush/lpush操作入隊列,
使用 lpop 和 rpop 來出隊列。

> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)
上面是 rpush 和 lpop 結合使用的例子。還可以使用 lpush 和 rpop 結合使用,效果是一
樣的。
隊列空了怎么辦?
通常我們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鍾就可以了。不但客戶端
的 CPU 能降下來,Redis 的 QPS 也降下來了。
Thread.sleep(1000) # java 睡 1s
隊列延遲
阻塞讀在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消
息的延遲幾乎為零。用 blpop/brpop 替代前面的 lpop/rpop,就完美解決了上面的問題。
但是有問題就是如果線程一直阻塞在哪里,Redis 的客戶端連接就成了閑置連接,閑置過久,服務器一般
會主動斷開連接,減少閑置資源占用。這個時候 blpop/brpop 會拋出異常來。
所以編寫客戶端消費者的時候要小心,注意捕獲異常,還要重試。
延時隊列的實現
import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import redis.clients.jedis.Jedis; public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg; } // fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem task = new TaskItem(); task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid task.msg = msg; String s = JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 后再試 } public void loop() { while (!Thread.interrupted()) { // 只取一條 Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); if (values.isEmpty()) { try { Thread.sleep(500); // 歇會繼續 } catch (InterruptedException e) { break; } continue; } String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 搶到了 TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis(); RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread() { public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; Thread consumer = new Thread() { public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { } } }