Redis的Pub/Sub機制存在的問題以及解決方案


  Redis的Pub/Sub機制使用非常簡單的方式實現了觀察者模式,但是在使用過程中我們發現,它僅僅是實現了發布訂閱機制,但是很多的場景沒有考慮到。例如一下的幾種場景:

  1.數據可靠性無法保證

  一個redis_cli發送消息的時候,消息是無狀態的,也就是說負責發送消息的redis_cli只管發送消息,並不會理會消息是否被訂閱者接收到,也不會理會是否在傳輸過程中丟失,即對於發布者來說,消息是”即發即失”的。

  2.擴展性差

  不能通過增加消費者來加快消耗發布者的寫入的數據,如果發布者發布的消息很多,則數據阻塞在通道中已等待被消費着來消耗。阻塞時間越久,數據丟失的風險越大(網絡或者服務器的一個不穩定就會導致數據的丟失)

  3.資源消耗高

  在pub/sub中消息發布者不需要獨占一個Redis的鏈接,而消費者則需要單獨占用一個Redis的鏈接,在java中便不得獨立出分出一個線程來處理消費者。這種場景一般對應這多個消費者,此時則有着過高的資源消耗。

  對於如上的幾種不足,如果在項目中需要考慮的話可以使用JMS來實現該功能。JMS提供了消息的持久化/耐久性等各種企業級的特性。如果依然想使用Redis來實現並做一些數據的持久化操作,則可以根據JMS的特性來通過Redis模擬出來.

  模擬的步驟如下:
  1.subscribe端首先向一個Set集合中增加“訂閱者ID”,此Set集合保存了“活躍訂閱”者,訂閱者ID標記每個唯一的訂閱者,例如:sub:email,sub:web。此SET稱為“活躍訂閱者集合”

  2.subcribe端開啟訂閱操作,並基於Redis創建一個以“訂閱者ID”為KEY的LIST數據結構,此LIST中存儲了所有的尚未消費的消息。此LIST稱為“訂閱者消息隊列”

  3.publish端:每發布一條消息之后,publish端都需要遍歷“活躍訂閱者集合”,並依次向每個“訂閱者消息隊列”尾部追加此次發布的消息。到此為止,我們可以基本保證,發布的每一條消息,都會持久保存在每個“訂閱者消息隊列”中。

  4.subscribe端,每收到一個訂閱消息,在消費之后,必須刪除自己的“訂閱者消息隊列”頭部的一條記錄。subscribe端啟動時,如果發現自己的自己的“訂閱者消息隊列”有殘存記錄,那么將會首先消費這些記錄,然后再去訂閱。

  實現如下:

public class PubSubListener extends JedisPubSub{

    private String clientId;
    private RedisHandler redisHandler;

    public PubSubListener(String clientId, Jedis jedis) {
        this.clientId = clientId;
        this.redisHandler = new RedisHandler(jedis);
    }

    @Override
    public void onMessage(String channel, String message) {
        if ("exit".equals(message)) {
            redisHandler.onUnsubscribe(channel);
        }

        redisHandler.hanlder(channel,message);
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        redisHandler.subscribe(channel);
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        redisHandler.onUnsubscribe(channel);
    }

    class RedisHandler{

        private Jedis jedis =null;

        public RedisHandler(Jedis jedis) {
            this.jedis = jedis;
        }

        /**
         * 訂閱操作步驟:
         * 1.判斷clientID是否在PERSITS_SUB隊列中
         * 2.如果在隊列中說明已經訂閱,或則把clientID添加到隊列中
         * @param channel
         */
        public void subscribe(String channel){
            String key = clientId + "/" + channel;
            boolean isExists = this.jedis.sismember("PERSITS_SUB",key);
            if(!isExists){
                this.jedis.sadd("PERSITS_SUB",key);
            }
        }

        /**
         * 取消訂閱
         * @param channel
         */
        public void onUnsubscribe(String channel){
            String key = clientId + "/" + channel;
            //從訂閱者隊列中刪除
            this.jedis.srem("",key);
            //刪除訂閱者消息隊列
            this.jedis.del(channel);
        }

        public void hanlder(String channel,String message){
            int index = message.indexOf("/");
            if(index  < 0){
                //消息不合法,丟棄
                return;
            }

            Long txid = Long.valueOf(message.substring(0,index));
            String key = clientId + "/" + channel;
            while(true){
                String lm = this.jedis.lindex(key,0);//獲取第一個消息
                if(lm == null){
                    break;
                }
                int li = lm.indexOf("/");
                if(li < 0){
                    //消息不合法
                    String result = this.jedis.lpop(key);
                    if(result == null){
                        break;
                    }
                    message(channel,message);
                    continue;
                }
                long lmid = Long.parseLong(lm.substring(0,li));
                if(txid >= lmid){
                    this.jedis.lpop(key);
                    message(channel,message);
                    continue;
                }else{
                    break;
                }
            }
        }
    }

    private void message(String channel, String message) {
        System.out.println("receive message " + message);
    }
}

  

public class SubClient {

    private Jedis jedis = null;
    private PubSubListener listener = null;

    public SubClient(Jedis jedis, PubSubListener listener) {
        this.jedis = jedis;
        this.listener = listener;
    }

    public void subscribe(String channel){
        jedis.subscribe(listener,channel);
    }

    public void onUnsubscribe(String channel){
        listener.unsubscribe(channel);
    }
}

  

public class PubClient {

    private Jedis jedis = null;

    public PubClient(String host) {
        this.jedis = new Jedis(host);
    }

    public void put(String message){
        Set<String> clients = this.jedis.smembers("PERSITS_SUB");
        for(String client : clients){
            //在每個客戶端對應的消息隊列中持久化消息
            this.jedis.rpush(client,message);
        }
    }

    /**
     * 每個消息,都有具有一個全局唯一的id
     * txid為了防止訂閱端在數據處理時“亂序”,這就要求訂閱者需要解析message
     * @param channel
     * @param message
     */
    public void publish(String channel, String message) {
        Long txid = jedis.incr("MESSAGE_TXID");
        String content = txid + "/" + message;
        this.put(content);
        jedis.publish(channel, content);//為每個消息設定id,最終消息格式1000/messageContent
    }

    public void close(String channel){
        jedis.publish(channel, "exit");
        jedis.del(channel);//刪除
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM