redis延遲隊列


異步消息隊列
Redis 的 list(列表) 數據結構常用來作為異步消息隊列使用,使用rpush/lpush操作入隊列,
使用 lpop 和 rpop 來出隊列。

 

 

> rpush notify-queue apple banana pear
(integer) 3
> llen notify-queue
(integer) 3
> lpop notify-queue
"apple"
> llen notify-queue
(integer) 2
> lpop notify-queue
"banana"
> llen notify-queue
(integer) 1
> lpop notify-queue
"pear"
> llen notify-queue
(integer) 0
> lpop notify-queue
(nil)
上面是 rpush 和 lpop 結合使用的例子。還可以使用 lpush 和 rpop 結合使用,效果是一
樣的。
 
隊列空了怎么辦?
通常我們使用 sleep 來解決這個問題,讓線程睡一會,睡個 1s 鍾就可以了。不但客戶端
的 CPU 能降下來,Redis 的 QPS 也降下來了。
Thread.sleep(1000) # java 睡 1s
 
隊列延遲
阻塞讀在隊列沒有數據的時候,會立即進入休眠狀態,一旦數據到來,則立刻醒過來。消
息的延遲幾乎為零。用 blpop/brpop 替代前面的 lpop/rpop,就完美解決了上面的問題。
但是有問題就是如果線程一直阻塞在哪里,Redis 的客戶端連接就成了閑置連接,閑置過久,服務器一般
會主動斷開連接,減少閑置資源占用。這個時候 blpop/brpop 會拋出異常來。
所以編寫客戶端消費者的時候要小心,注意捕獲異常,還要重試。
 
延時隊列的實現
import java.lang.reflect.Type; 
import java.util.Set; 
import java.util.UUID;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference; 
import redis.clients.jedis.Jedis; 
public class RedisDelayingQueue<T> { 
static class TaskItem<T> { 
public String id; 
public T msg; 
} 
// fastjson 序列化對象中存在 generic 類型時,需要使用 TypeReference 
private Type TaskType = new TypeReference<TaskItem<T>>() { }.getType(); 
private Jedis jedis; 
private String queueKey; 
public RedisDelayingQueue(Jedis jedis, String queueKey) { 
this.jedis = jedis; 
this.queueKey = queueKey; 
} 
public void delay(T msg) { 
TaskItem task = new TaskItem(); 
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid 
task.msg = msg; 
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時隊列 ,5s 后再試
} 
public void loop() { 
while (!Thread.interrupted()) {
// 只取一條
Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); 
if (values.isEmpty()) { 
try { 
Thread.sleep(500); // 歇會繼續
} 
catch (InterruptedException e) { 
break;
} 
continue; 
} 
String s = values.iterator().next(); 
if (jedis.zrem(queueKey, s) > 0) { // 搶到了
TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg); 
} 
} 
} 
public void handleMsg(T msg) { 
System.out.println(msg); 
} 
public static void main(String[] args) { 
Jedis jedis = new Jedis(); 
RedisDelayingQueue queue = new RedisDelayingQueue<>(jedis, "q-demo"); 
Thread producer = new Thread() { 
public void run() { 
for (int i = 0; i < 10; i++) { 
queue.delay("codehole" + i); 
} 
} 
}; 
Thread consumer = new Thread() { 
public void run() { 
queue.loop(); 
} 
}; 
producer.start(); 
consumer.start(); 
try { 
producer.join(); 
Thread.sleep(6000); 
consumer.interrupt(); 
consumer.join(); 
} 
catch (InterruptedException e) { 
} 
} 
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM