Redis(十八):Redis和隊列


概要

Redis不僅可作為緩存服務器,還可用作消息隊列。它的列表類型天生支持用作消息隊列。如下圖所示:

由於Redis的列表是使用雙向鏈表實現的,保存了頭尾節點,所以在列表頭尾兩邊插取元素都是非常快的。

普通隊列實現

所以可以直接使用Redis的List實現消息隊列,只需簡單的兩個指令lpush和rpop或者rpush和lpop。簡單示例如下:

存放消息端(消息生產者):

package org.yamikaze.redis.messsage.queue;
 
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;
 
import java.util.concurrent.TimeUnit;
 
/**
 * 消息生產者
 * @author yamikaze
 */
public class Producer extends Thread {
 
    public static final String MESSAGE_KEY = "message:queue";
    private Jedis jedis;
    private String producerName;
    private volatile int count;
 
    public Producer(String name) {
        this.producerName = name;
        init();
    }
 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    }
 
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(producerName + ": 當前未被處理消息條數為:" + size);
        count++;
    }
 
    public int getCount() {
        return count;
    }
 
    @Override
    public void run() {
        try {
            while (true) {
                putMessage(StringUtils.generate32Str());
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {
 
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
    public static void main(String[] args) throws InterruptedException{
        Producer producer = new Producer("myProducer");
        producer.start();
 
        for(; ;) {
            System.out.println("main : 已存儲消息條數:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

消息處理端(消息消費者):

package org.yamikaze.redis.messsage.queue;
 
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;
 
/**
 * 消息消費者
 * @author yamikaze
 */
public class Customer extends Thread{
 
    private String customerName;
    private volatile int count;
    private Jedis jedis;
 
    public Customer(String name) {
        this.customerName = name;
        init();
    }
 
    private void init() {
        jedis = MyJedisFactory.getLocalJedis();
    }
 
    public void processMessage() {
        String message = jedis.rpop(Producer.MESSAGE_KEY);
        if(message != null) {
            count++;
            handle(message);
        }
    }
 
    public void handle(String message) {
        System.out.println(customerName + " 正在處理消息,消息內容是: " + message + " 這是第" + count + "條");
    }
 
    @Override
    public void run() {
        while (true) {
            processMessage();
        }
    }
 
    public static void main(String[] args) {
        Customer customer = new Customer("yamikaze");
        customer.start();
    }
}

貌似還不錯,但上述例子中消息消費者有一個問題存在,即需要不停的調用rpop方法查看List中是否有待處理消息。每調用一次都會發起一次連接,這會造成不必要的浪費。也許你會使用Thread.sleep()等方法讓消費者線程隔一段時間再消費,但這樣做有兩個問題:

1)、如果生產者速度大於消費者消費速度,消息隊列長度會一直增大,時間久了會占用大量內存空間。

2)、如果睡眠時間過長,這樣不能處理一些時效性的消息,睡眠時間過短,也會在連接上造成比較大的開銷。

所以可以使用brpop指令,這個指令只有在有元素時才返回,沒有則會阻塞直到超時返回null,於是消費端可以將processMessage可以改為這樣:

public void processMessage() {
    /**
     * brpop支持多個列表(隊列)
     * brpop指令是支持隊列優先級的,比如這個例子中MESSAGE_KEY的優先級大於testKey(順序決定)。
     * 如果兩個列表中都有元素,會優先返回優先級高的列表中的元素,所以這兒優先返回MESSAGE_KEY
     * 0表示不限制等待,會一直阻塞在這兒
     */
    List<String> messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey");
    if(messages.size() != 0) {
        //由於該指令可以監聽多個Key,所以返回的是一個列表
        //列表由2項組成,1) 列表名,2)數據
        String keyName = messages.get(0);
        //如果返回的是MESSAGE_KEY的消息
        if(Producer.MESSAGE_KEY.equals(keyName)) {
            String message = messages.get(1);
            handle(message);
        }
 
    }
    System.out.println("=======================");
}

然后可以運行Customer,清空控制台,可以看到程序沒有任何輸出,阻塞在了brpop這兒。然后在打開Redis的客戶端,輸入指令client list,可以查看當前有兩個連接。

一次生產多次消費的隊列

Redis除了對消息隊列提供支持外,還提供了一組命令用於支持發布/訂閱模式。利用Redis的pub/sub模式可以實現一次生產多次消費的隊列。

1)發布
    PUBLISH指令可用於發布一條消息,格式 PUBLISH channel message

    返回值表示訂閱了該消息的數量。
    2)訂閱
    SUBSCRIBE指令用於接收一條消息,格式 SUBSCRIBE channel

    可以看到使用SUBSCRIBE指令后進入了訂閱模式,但沒有接收到publish發送的消息,這是因為只有在消息發出去前訂閱才會接收到。在這個模式下其他指令,只能看到回復。回復分為三種類型:
    1、如果為subscribe,第二個值表示訂閱的頻道,第三個值表示是第幾個訂閱的頻道?(理解成序號?) 
    2、如果為message(消息),第二個值為產生該消息的頻道,第三個值為消息
    3、如果為unsubscribe,第二個值表示取消訂閱的頻道,第三個值表示當前客戶端的訂閱數量。

    可以使用指令UNSUBSCRIBE退訂,如果不加參數,則會退訂所有由SUBSCRIBE指令訂閱的頻道。
   
    Redis還支持基於通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:

   再試試推送消息會得到以下結果:

   可以看到publish指令返回的是2,而訂閱端這邊接收了兩次消息。這是因為PSUBSCRIBE指令可以重復訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時PUNSUBSCRIBE指令通配符不會展開。
例如:PUNSUBSCRIBE * 不會匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫PUBSUBSCRIBE channel.*。

代碼示范如下:

package org.yamikaze.redis.messsage.subscribe;
 
import org.yamikaze.redis.messsage.queue.StringUtils;
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;
 
/**
 * 消息發布方
 * @author yamikaze
 */
public class Publisher {
 
    public static final String CHANNEL_KEY = "channel:message";
    private Jedis jedis;
 
    public Publisher() {
        jedis = MyJedisFactory.getLocalJedis();
    }
 
    public void publishMessage(String message) {
        if(StringUtils.isBlank(message)) {
            return;
        }
        jedis.publish(CHANNEL_KEY, message);
    }
 
    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.publishMessage("Hello Redis!");
    }
}
簡單的發送一個消息。
消息訂閱方:
package org.yamikaze.redis.messsage.subscribe;
 
import org.yamikaze.redis.test.MyJedisFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
 
import java.util.concurrent.TimeUnit;
 
/**
 * 消息訂閱方客戶端
 * @author yamikaze
 */
public class SubscribeClient {
 
    private Jedis jedis;
    private static final String EXIT_COMMAND = "exit";
 
    public SubscribeClient() {
        jedis = MyJedisFactory.getLocalJedis();
    }
 
    public void subscribe(String ...channel) {
        if(channel == null || channel.length <= 0) {
            return;
        }
        //消息處理,接收到消息時如何處理
        JedisPubSub jps = new JedisPubSub() {
            /**
             * JedisPubSub類是一個沒有抽象方法的抽象類,里面方法都是一些空實現
             * 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage
             * 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法
             * 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數為byte[]
             */
            @Override
            public void onMessage(String channel, String message) {
                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("接收到消息: channel : " + message);
                    //接收到exit消息后退出
                    if(EXIT_COMMAND.equals(message)) {
                        System.exit(0);
                    }
 
                }
            }
 
            /**
             * 訂閱時
             */
            @Override
            public void onSubscribe(String channel, int subscribedChannels) {
                if(Publisher.CHANNEL_KEY.equals(channel)) {
                    System.out.println("訂閱了頻道:" + channel);
                }
            }
        };
        //可以訂閱多個頻道 當前線程會阻塞在這兒
        jedis.subscribe(jps, channel);
    }
 
    public static void main(String[] args) {
        SubscribeClient client = new SubscribeClient();
        client.subscribe(Publisher.CHANNEL_KEY);
        //並沒有 unsubscribe方法
        //相應的也沒有punsubscribe方法
    }
}
先運行client,再運行Publisher進行消息發送,輸出結果:
 
Redis的pub/sub也有其缺點,那就是如果消費者下線,生產者的消息會丟失。
 

延時隊列

背景

在業務發展過程中,會出現一些需要延時處理的場景,比如:

a.訂單下單之后超過30分鍾用戶未支付,需要取消訂單
b.訂單一些評論,如果48h用戶未對商家評論,系統會自動產生一條默認評論
c.點我達訂單下單后,超過一定時間訂單未派出,需要超時取消訂單等。。。
處理這類需求,比較直接簡單的方式就是定時任務輪訓掃表。這種處理方式在數據量不大的場景下是完全沒問題,但是當數據量大的時候高頻的輪訓數據庫就會比較的耗資源,導致數據庫的慢查或者查詢超時。所以在處理這類需求時候,采用了延時隊列來完成。

幾種延時隊列

延時隊列就是一種帶有延遲功能的消息隊列。下面會介紹幾種目前已有的延時隊列:

1.Java中java.util.concurrent.DelayQueue

優點:JDK自身實現,使用方便,量小適用
缺點:隊列消息處於jvm內存,不支持分布式運行和消息持久化

2.Rocketmq延時隊列

優點:消息持久化,分布式
缺點:不支持任意時間精度,只支持特定level的延時消息

3.Rabbitmq延時隊列(TTL+DLX實現)

優點:消息持久化,分布式
缺點:延時相同的消息必須扔在同一個隊列

Redis實現的延時消息隊列適合的項目特點:

  • Spring框架管理對象
  • 有消息需求,但不想維護mq中間件
  • 有使用redis
  • 對消息持久化並沒有很苛刻的要求

Redis實現的延時消息隊列思路

Redis由於其自身的Zset數據結構,本質就是Set結構上加了個排序的功能,除了添加數據value之外,還提供另一屬性score,這一屬性在添加修改元素時候可以指定,每次指定后,Zset會自動重新按新的值調整順序。可以理解為有兩列字段的數據表,一列存value,一列存順序編號。操作中key理解為zset的名字,那么對延時隊列又有何用呢?

試想如果score代表的是想要執行時間的時間戳,在某個時間將它插入Zset集合中,它變會按照時間戳大小進行排序,也就是對執行時間前后進行排序,這樣的話,起一個死循環線程不斷地進行取第一個key值,如果當前時間戳大於等於該key值的socre就將它取出來進行消費刪除,就可以達到延時執行的目的, 注意不需要遍歷整個Zset集合,以免造成性能浪費。

Zset的排列效果如下圖:

java代碼實現如下:

package cn.chinotan.service.delayQueueRedis;

import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Tuple;

import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * @program: test
 * @description: redis實現延時隊列
 * @author: xingcheng
 * @create: 2018-08-19
 **/
public class AppTest {

    private static final String ADDR = "127.0.0.1";
    private static final int PORT = 6379;
    private static JedisPool jedisPool = new JedisPool(ADDR, PORT);
    private static CountDownLatch cdl = new CountDownLatch(10);

    public static Jedis getJedis() {
        return jedisPool.getResource();
    }

    /**
     * 生產者,生成5個訂單
     */
    public void productionDelayMessage() {
        for (int i = 0; i < 5; i++) {
            Calendar instance = Calendar.getInstance();
            // 3秒后執行
            instance.add(Calendar.SECOND, 3 + i);
            AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1));
            System.out.println("生產訂單: " + StringUtils.join("000000000", i + 1) + " 當前時間:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println((3 + i) + "秒后執行");
        }
    }

    //消費者,取訂單
    public static void consumerDelayMessage() {
        Jedis jedis = AppTest.getJedis();
        while (true) {
            Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0);
            if (order == null || order.isEmpty()) {
                System.out.println("當前沒有等待的任務");
                try {
                    TimeUnit.MICROSECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                continue;
            }
            Tuple tuple = (Tuple) order.toArray()[0];
            double score = tuple.getScore();
            Calendar instance = Calendar.getInstance();
            long nowTime = instance.getTimeInMillis() / 1000;
            if (nowTime >= score) {
                String element = tuple.getElement();
                Long orderId = jedis.zrem("orderId", element);
                if (orderId > 0) {
                    System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消費了一個任務:消費的訂單OrderId為" + element);
                }
            }
        }
    }

    static class DelayMessage implements Runnable{
        @Override
        public void run() {
            try {
                cdl.await();
                consumerDelayMessage();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        AppTest appTest = new AppTest();
        appTest.productionDelayMessage();
        for (int i = 0; i < 10; i++) {
            new Thread(new DelayMessage()).start();
            cdl.countDown();
        }
    }
}

實現效果如下:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM