redis 實現發布/訂閱模式


 

類似於MQ的主題模式-只能消費訂閱之后發布的消息,一個消息可以被多個訂閱者消費)

1.客戶端發布/訂閱

1.1   普通的發布/訂閱

   除了實現任務隊列外,redis還提供了一組命令可以讓開發者實現"發布/訂閱"(publish/subscribe)模式。"發布/訂閱"模式同樣可以實現進程間的消息傳遞,其原理如下:

  "發布/訂閱"模式包含兩種角色,分別是發布者和訂閱者。訂閱者可以訂閱一個或者多個頻道(channel),而發布者可以向指定的頻道(channel)發送消息,所有訂閱此頻道的訂閱者都會收到此消息。

(1)發布消息

  發布者發布消息的命令是  publish,用法是 publish channel message,如向 channel1.1說一聲hi

127.0.0.1:6379> publish channel:1 hi
(integer) 0

這樣消息就發出去了。返回值表示接收這條消息的訂閱者數量。發出去的消息不會被持久化,也就是有客戶端訂閱channel:1后只能接收到后續發布到該頻道的消息,之前的就接收不到了。

 

(2)訂閱頻道

  訂閱頻道的命令是 subscribe,可以同時訂閱多個頻道,用法是 subscribe channel1 [channel2 ...],例如新開一個客戶端訂閱上面頻道:(不會收到消息,因為不會收到訂閱之前就發布到該頻道的消息)

127.0.0.1:6379> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1

執行上面命令客戶端會進入訂閱狀態,處於此狀態下客戶端不能使用除subscribe、unsubscribe、psubscribe和punsubscribe這四個屬於"發布/訂閱"之外的命令,否則會報錯。

  進入訂閱狀態后客戶端可能收到3種類型的回復。每種類型的回復都包含3個值,第一個值是消息的類型,根據消類型的不同,第二個和第三個參數的含義可能不同。

消息類型的取值可能是以下3個:

  (1)subscribe。表示訂閱成功的反饋信息。第二個值是訂閱成功的頻道名稱,第三個是當前客戶端訂閱的頻道數量。

  (2)message。表示接收到的消息,第二個值表示產生消息的頻道名稱,第三個值是消息的內容。

  (3)unsubscribe。表示成功取消訂閱某個頻道。第二個值是對應的頻道名稱,第三個值是當前客戶端訂閱的頻道數量,當此值為0時客戶端會退出訂閱狀態,之后就可以執行其他非"發布/訂閱"模式的命令了。

 

(3)第一個客戶端重新向channel:1發送一條消息

127.0.0.1:6379> publish channel:1 hi
(integer) 1

 

返回值表示訂閱此頻道的數量

c

上面訂閱的客戶端:

127.0.0.1:6379> subscribe channel:1
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "channel:1"
3) (integer) 1
1) "message"
2) "channel:1"
3) "hi"

紅字部分表示成功的收到消息(依次是消息類型,頻道,消息內容)

 

1.2   按照規則發布/訂閱

  除了可以使用subscribe命令訂閱指定的頻道外,還可以使用psubscribe命令訂閱指定的規則。規則支持通配符格式。命令格式為      psubscribe pattern [pattern ...]訂閱多個模式的頻道。

  通配符中?表示1個占位符,*表示任意個占位符(包括0),?*表示1個以上占位符。

例如:

(1)訂閱者訂閱三個通配符頻道

127.0.0.1:6379> psubscribe c? b* d?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?"
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3

 

(2)新開一個客戶端發送到指定頻道

C:\Users\liqiang>redis-cli
127.0.0.1:6379> publish c m1
(integer) 0
127.0.0.1:6379> publish c1 m1
(integer) 1
127.0.0.1:6379> publish c11 m1
(integer) 0
127.0.0.1:6379> publish b m1
(integer) 1
127.0.0.1:6379> publish b1 m1
(integer) 1
127.0.0.1:6379> publish b11 m1
(integer) 1
127.0.0.1:6379> publish d m1
(integer) 0
127.0.0.1:6379> publish d1 m1
(integer) 1
127.0.0.1:6379> publish d11 m1
(integer) 1

上面返回值為1表示被訂閱者所接受,可以匹配上面的通配符。

 

訂閱者客戶端:

127.0.0.1:6379> psubscribe c? b* d?*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "c?"
3) (integer) 1
1) "psubscribe"
2) "b*"
3) (integer) 2
1) "psubscribe"
2) "d?*"
3) (integer) 3
1) "pmessage"
2) "c?"
3) "c1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b"
4) "m1"
1) "pmessage"
2) "b*"
3) "b1"
4) "m1"
1) "pmessage"
2) "b*"
3) "b11"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d1"
4) "m1"
1) "pmessage"
2) "d?*"
3) "d11"
4) "m1"

 

 

 

注意:

(1)使用psubscribe命令可以重復訂閱同一個頻道,如客戶端執行了psubscribe c? c?*。這時向c1發布消息客戶端會接受到兩條消息,而同時publish命令的返回值是2而不是。.同樣的,如果有另一個客戶端執行了subscribe c1 和psubscribe c?*的話,向c1發送一條消息該客戶頓也會受到兩條消息(但是是兩種類型:message和pmessage),同時publish命令也返回2.

(2)punsubscribe命令可以退訂指定的規則,用法是: punsubscribe [pattern [pattern ...]],如果沒有參數則會退訂所有規則。

(3)使用punsubscribe只能退訂通過psubscribe命令訂閱的規則,不會影響直接通過subscribe命令訂閱的頻道;同樣unsubscribe命令也不會影響通過psubscribe命令訂閱的規則。另外需要注意punsubscribe命令退訂某個規則時不會將其中的通配符展開,而是進行嚴格的字符串匹配,所以punsubscribe * 無法退訂c*規則,而是必須使用punsubscribe c*才可以退訂。

 

2.Java程序實現發布者訂閱者模式

1.生產者

import redis.clients.jedis.Jedis;

/**
 * @Author: qlq
 * @Description
 * @Date: 21:29 2018/10/9
 */
public class MessageProducer extends Thread {
    public static final String CHANNEL_KEY = "channel:1";
    private volatile int count;

    public void putMessage(String message) {
        Jedis jedis = JedisPoolUtils.getJedis();
        Long publish = jedis.publish(CHANNEL_KEY, message);//返回訂閱者數量
        System.out.println(Thread.currentThread().getName() + " put message,count=" + count+",subscriberNum="+publish);
        count++;
    }

    @Override
    public synchronized void run() {
        for (int i = 0; i < 5; i++) {
            putMessage("message" + count);
        }
    }

    public static void main(String[] args) {
        MessageProducer messageProducer = new MessageProducer();
        Thread t1 = new Thread(messageProducer, "thread1");
        Thread t2 = new Thread(messageProducer, "thread2");
        Thread t3 = new Thread(messageProducer, "thread3");
        Thread t4 = new Thread(messageProducer, "thread4");
        Thread t5 = new Thread(messageProducer, "thread5");
        t1.start();
        t2.start();
        t3.start();
        t4.start();
        t5.start();
    }
}

結果:

thread1 put message,count=0,subscriberNum=0
thread1 put message,count=1,subscriberNum=0
thread1 put message,count=2,subscriberNum=0
thread1 put message,count=3,subscriberNum=0
thread1 put message,count=4,subscriberNum=0
thread4 put message,count=5,subscriberNum=0
thread4 put message,count=6,subscriberNum=0
thread4 put message,count=7,subscriberNum=0
thread4 put message,count=8,subscriberNum=0
thread4 put message,count=9,subscriberNum=0
thread5 put message,count=10,subscriberNum=0
thread5 put message,count=11,subscriberNum=0
thread5 put message,count=12,subscriberNum=0
thread5 put message,count=13,subscriberNum=0
thread5 put message,count=14,subscriberNum=0
thread2 put message,count=15,subscriberNum=0
thread2 put message,count=16,subscriberNum=0
thread2 put message,count=17,subscriberNum=0
thread2 put message,count=18,subscriberNum=0
thread2 put message,count=19,subscriberNum=0
thread3 put message,count=20,subscriberNum=0
thread3 put message,count=21,subscriberNum=0
thread3 put message,count=22,subscriberNum=0
thread3 put message,count=23,subscriberNum=0
thread3 put message,count=24,subscriberNum=0
View Code

 

2.消費者

(1)subscribe實現訂閱消費消息(開啟兩個線程訂閱消息)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * @Author: qlq
 * @Description
 * @Date: 22:34 2018/10/9
 */
public class MessageConsumer implements Runnable {
    public static final String CHANNEL_KEY = "channel:1";//頻道

    public static final String EXIT_COMMAND = "exit";//結束程序的消息

    private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//處理接收消息

    public void consumerMessage() {
        Jedis jedis = JedisPoolUtils.getJedis();
        jedis.subscribe(myJedisPubSub, CHANNEL_KEY);//第一個參數是處理接收消息,第二個參數是訂閱的消息頻道
    }

    @Override
    public void run() {
        while (true) {
            consumerMessage();
        }
    }

    public static void main(String[] args) {
        MessageConsumer messageConsumer = new MessageConsumer();
        Thread t1 = new Thread(messageConsumer, "thread5");
        Thread t2 = new Thread(messageConsumer, "thread6");
        t1.start();
        t2.start();
    }
}

/**
 * 繼承JedisPubSub,重寫接收消息的方法
 */
class MyJedisPubSub extends JedisPubSub {
    @Override
    /** JedisPubSub類是一個沒有抽象方法的抽象類,里面方法都是一些空實現
     * 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage
     * 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法
     * 當然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數為byte[]
     **/
    public void onMessage(String channel, String message) {
        System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
        //接收到exit消息后退出
        if (MessageConsumer.EXIT_COMMAND.equals(message)) {
            System.exit(0);
        }
    }
}

 

我們再次啟動生產者生產消息,生產者控制台:

thread5 put message,count=0,subscriberNum=2
thread5 put message,count=1,subscriberNum=2
thread5 put message,count=2,subscriberNum=2
thread5 put message,count=3,subscriberNum=2
thread5 put message,count=4,subscriberNum=2
thread3 put message,count=5,subscriberNum=2
thread3 put message,count=6,subscriberNum=2
thread3 put message,count=7,subscriberNum=2
thread3 put message,count=8,subscriberNum=2
thread3 put message,count=9,subscriberNum=2
thread2 put message,count=10,subscriberNum=2
thread2 put message,count=11,subscriberNum=2
thread2 put message,count=12,subscriberNum=2
thread2 put message,count=13,subscriberNum=2
thread2 put message,count=14,subscriberNum=2
thread4 put message,count=15,subscriberNum=2
thread4 put message,count=16,subscriberNum=2
thread4 put message,count=17,subscriberNum=2
thread4 put message,count=18,subscriberNum=2
thread4 put message,count=19,subscriberNum=2
thread1 put message,count=20,subscriberNum=2
thread1 put message,count=21,subscriberNum=2
thread1 put message,count=22,subscriberNum=2
thread1 put message,count=23,subscriberNum=2
thread1 put message,count=24,subscriberNum=2

Process finished with exit code 0

 

消費者控制台:

thread6-接收到消息:channel=channel:1,message=message0
thread5-接收到消息:channel=channel:1,message=message0
thread5-接收到消息:channel=channel:1,message=message1
thread6-接收到消息:channel=channel:1,message=message1
thread5-接收到消息:channel=channel:1,message=message2
thread6-接收到消息:channel=channel:1,message=message2
thread5-接收到消息:channel=channel:1,message=message3
thread6-接收到消息:channel=channel:1,message=message3
thread5-接收到消息:channel=channel:1,message=message4
thread6-接收到消息:channel=channel:1,message=message4
thread5-接收到消息:channel=channel:1,message=message5
thread6-接收到消息:channel=channel:1,message=message5
thread5-接收到消息:channel=channel:1,message=message6
thread6-接收到消息:channel=channel:1,message=message6
thread5-接收到消息:channel=channel:1,message=message7
thread6-接收到消息:channel=channel:1,message=message7
thread5-接收到消息:channel=channel:1,message=message8
thread6-接收到消息:channel=channel:1,message=message8
thread5-接收到消息:channel=channel:1,message=message9
thread6-接收到消息:channel=channel:1,message=message9
thread5-接收到消息:channel=channel:1,message=message10
thread6-接收到消息:channel=channel:1,message=message10
thread5-接收到消息:channel=channel:1,message=message11
thread6-接收到消息:channel=channel:1,message=message11
thread5-接收到消息:channel=channel:1,message=message12
thread6-接收到消息:channel=channel:1,message=message12
thread5-接收到消息:channel=channel:1,message=message13
thread6-接收到消息:channel=channel:1,message=message13
thread5-接收到消息:channel=channel:1,message=message14
thread6-接收到消息:channel=channel:1,message=message14
thread5-接收到消息:channel=channel:1,message=message15
thread6-接收到消息:channel=channel:1,message=message15
thread5-接收到消息:channel=channel:1,message=message16
thread6-接收到消息:channel=channel:1,message=message16
thread5-接收到消息:channel=channel:1,message=message17
thread6-接收到消息:channel=channel:1,message=message17
thread5-接收到消息:channel=channel:1,message=message18
thread6-接收到消息:channel=channel:1,message=message18
thread5-接收到消息:channel=channel:1,message=message19
thread6-接收到消息:channel=channel:1,message=message19
thread5-接收到消息:channel=channel:1,message=message20
thread6-接收到消息:channel=channel:1,message=message20
thread5-接收到消息:channel=channel:1,message=message21
thread6-接收到消息:channel=channel:1,message=message21
thread5-接收到消息:channel=channel:1,message=message22
thread6-接收到消息:channel=channel:1,message=message22
thread5-接收到消息:channel=channel:1,message=message23
thread6-接收到消息:channel=channel:1,message=message23
thread5-接收到消息:channel=channel:1,message=message24
thread6-接收到消息:channel=channel:1,message=message24

 

 

(2)psubscribe實現訂閱消費消息(開啟兩個線程訂閱消息)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * @Author: qlq
 * @Description
 * @Date: 22:34 2018/10/9
 */
public class MessageConsumer implements Runnable {
    public static final String CHANNEL_KEY = "channel*";//頻道

    public static final String EXIT_COMMAND = "exit";//結束程序的消息

    private MyJedisPubSub myJedisPubSub = new MyJedisPubSub();//處理接收消息

    public void consumerMessage() {
        Jedis jedis = JedisPoolUtils.getJedis();
        jedis.psubscribe(myJedisPubSub, CHANNEL_KEY);//第一個參數是處理接收消息,第二個參數是訂閱的消息頻道
    }

    @Override
    public void run() {
        while (true) {
            consumerMessage();
        }
    }

    public static void main(String[] args) {
        MessageConsumer messageConsumer = new MessageConsumer();
        Thread t1 = new Thread(messageConsumer, "thread5");
        Thread t2 = new Thread(messageConsumer, "thread6");
        t1.start();
        t2.start();
    }
}

/**
 * 繼承JedisPubSub,重寫接收消息的方法
 */
class MyJedisPubSub extends JedisPubSub {
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println(Thread.currentThread().getName()+"-接收到消息:pattern="+pattern+",channel=" + channel + ",message=" + message);
        //接收到exit消息后退出
        if (MessageConsumer.EXIT_COMMAND.equals(message)) {
            System.exit(0);
        }
    }
}

 

重寫JedisPubSub 的onPMessage方法即可

 

啟動生產者生產消息之后查看消費者控制台:

thread6-接收到消息:pattern=channel*,channel=channel:1,message=message0
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message0
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message1
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message1
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message2
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message2
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message3
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message3
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message4
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message4
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message5
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message5
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message6
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message6
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message7
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message7
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message8
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message8
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message9
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message9
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message10
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message10
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message11
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message11
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message12
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message12
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message13
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message13
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message14
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message14
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message15
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message15
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message16
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message16
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message17
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message17
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message18
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message18
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message19
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message19
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message20
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message20
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message21
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message21
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message22
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message22
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message23
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message23
thread5-接收到消息:pattern=channel*,channel=channel:1,message=message24
thread6-接收到消息:pattern=channel*,channel=channel:1,message=message24
View Code

補充:訂閱的時候subscribe()和psubscribe()的第二個參數支持可變參數,也就是可以實現訂閱多個頻道。

  至此實現了兩種方式的消息隊列:

    redis自帶的list類型(lpush和rpop或者brpop,rpush和lpop或者blpop)---blpop和brpop是阻塞讀取。

    "發布/訂閱"模式(publish channel message 和 subscribe channel [channel ...] 或者 psubscribe pattern [pattern ...] 通配符訂閱多個頻道)

 

補充:

1.發布訂閱執行訂閱之后該線程處於阻塞狀態,線程不會終止,如果終止線程需要退訂,需要調用JedisPubSub的unsubscribe()方法

例如:

package plainTest;

import cn.xm.redisChat.util.JedisPoolUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * @Author: qlq
 * @Description
 * @Date: 23:36 2018/10/13
 */
public class Test111 {
    public static void main(String[] args) {
        Jedis jedis = JedisPoolUtils.getJedis();
        System.out.println("訂閱前");
        jedis.subscribe(new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                super.onMessage(channel, message);
            }
        }, "c1");
        System.out.println("訂閱后");
    }
}

 

結果只會打印訂閱前,而且線程不會終止。

 

為了使線程可以停止,必須退訂,而且退訂只能調用  JedisPubSub.unsubscribe()方法,例如:收到quit消息之后會退訂,線程會回到主線程打印訂閱后。

package plainTest;

import cn.xm.redisChat.util.JedisPoolUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

/**
 * @Author: qlq
 * @Description
 * @Date: 23:36 2018/10/13
 */
public class Test111 {
    public static void main(String[] args) {
        Jedis jedis = JedisPoolUtils.getJedis();
        System.out.println("訂閱前");
        jedis.subscribe(new JedisPubSub() {
            @Override
            public void onMessage(String channel, String message) {
                if("quit".equals(message)){
                    unsubscribe("c1");
                }
                System.out.println(message);
            }

            @Override
            public void unsubscribe(String... channels) {
                super.unsubscribe(channels);
            }
        }, "c1");
        System.out.println("訂閱后");
    }
}

 

2.BRPOP:當給定列表內沒有任何元素可供彈出的時候,連接將被BRPOP命令阻塞,直到等待超時或發現可彈出元素為止。(每次只彈出一個元素,當沒有元素的時候處於阻塞,當彈出一個元素之后就會解除阻塞)

package plainTest;

import cn.xm.redisChat.util.JedisPoolUtils;
import redis.clients.jedis.Jedis;

import java.util.List;

/**
 * @Author: qlq
 * @Description
 * @Date: 23:36 2018/10/13
 */
public class Test111 {
    public static void main(String[] args) {
        Jedis jedis = JedisPoolUtils.getJedis();
        System.out.println("brpop之前");
        List<String> messages = jedis.brpop(0,"list1");
        System.out.println(messages);
        System.out.println("brpop之后");
    }
}

 

沒有元素的時候只會打印brpop之前。

 

 

原文:https://www.cnblogs.com/qlqwjy/p/9763754.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM