redis(五)---- 簡單消息隊列


消息隊列一個消息的鏈表,是一個異步處理的數據處理引擎。不僅能夠提高系統的負荷,還能夠改善因網絡阻塞導致的數據缺失。一般用於郵件發送、手機短信發送,數據表單提交、圖片生成、視頻轉換、日志儲存等。

redis的list類型天生支持用作消息隊列。由於redis的list是使用雙向鏈表實現的,保存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。所以可以直接使用redis的list實現消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。簡單示例如下:

public class MessageQueue {

    private final String redisChanel1 = "redisChanel1";
    private final String redisChanel2 = "redisChanel2";

    private String redisHost = "10.5.31.155";
    private int redisPort = 6379;
    private Jedis redis;

    @Before
    public void before() {
        redis = new Jedis(redisHost, redisPort);
    }

    @Test
    public void pubChanel1() throws InterruptedException {
        for (int i = 0; i < 1000; i++) {
            redis.lpush(redisChanel1, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "" + i + "條消息");
            Thread.sleep(Math.round(Math.floor(Math.random() * 2000)));
        }
    }

    @Test
    public void pubChanel2() throws InterruptedException {
        for (int i = 0; i < 1000; i++) {
            redis.lpush(redisChanel2, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + "" + i + "條消息");
            Thread.sleep(Math.round(Math.floor(Math.random() * 2000)));
        }
    }

    @Test
    public void sub() throws InterruptedException {
        while (true) {
            String message1 = redis.rpop(redisChanel1);
            System.out.println(redisChanel1 + "-->" + message1);
            Thread.sleep(200);
            String message2 = redis.rpop(redisChanel2);
            System.out.println(redisChanel2 + "-->" + message2);
            Thread.sleep(200);
        }
    }

    @After
    public void after() {
        redis.close();
    }
}

先后運行pubChanel1、pubChanel2、sub方法,可以看到類似如下輸出:

redisChanel1-->2018-09-28 10:23:44 第0條消息
redisChanel2-->2018-09-28 10:23:46 第0條消息
redisChanel1-->2018-09-28 10:23:44 第1條消息
redisChanel2-->2018-09-28 10:23:47 第1條消息
redisChanel1-->2018-09-28 10:23:46 第2條消息
redisChanel2-->null
redisChanel1-->2018-09-28 10:23:47 第3條消息
redisChanel2-->2018-09-28 10:23:49 第2條消息
redisChanel1-->2018-09-28 10:23:48 第4條消息
redisChanel2-->null
redisChanel1-->2018-09-28 10:23:49 第5條消息

以上實現原理非常簡單,但是由於消費者取消息是循環+sleep實現的,所以會出現如下問題:

  1. 需要考慮消費者拿到空消息的情況(輸出結果中出現了null)。
  2. 如果生產者速度大於消費者消費速度,消息隊列長度會一直增大,時間久了會占用大量內存空間。
  3. 消費者存在資源浪費的情況。

索性redis還支持另外一個命令:brpop,這個命令只有當list中有元素的時候才會返回,沒有元素時會阻塞,直到阻塞超時返回null。那么改變下sub方法的寫法如下:

@Test
public void sub() {
    while (true) {
        List<String> messages = redis.brpop(0, redisChanel1, redisChanel2);
        for (String message : messages) {
            System.out.println(message);
        }
    }
}

其中,brpop中的第一個參數值為0表示無超時時間,沒有取到消息的時候一直阻塞下去。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM