Redis的Pub/Sub機制使用非常簡單的方式實現了觀察者模式,但是在使用過程中我們發現,它僅僅是實現了發布訂閱機制,但是很多的場景沒有考慮到。例如一下的幾種場景:
1.數據可靠性無法保證
一個redis_cli發送消息的時候,消息是無狀態的,也就是說負責發送消息的redis_cli只管發送消息,並不會理會消息是否被訂閱者接收到,也不會理會是否在傳輸過程中丟失,即對於發布者來說,消息是”即發即失”的。
2.擴展性差
不能通過增加消費者來加快消耗發布者的寫入的數據,如果發布者發布的消息很多,則數據阻塞在通道中已等待被消費着來消耗。阻塞時間越久,數據丟失的風險越大(網絡或者服務器的一個不穩定就會導致數據的丟失)
3.資源消耗高
在pub/sub中消息發布者不需要獨占一個Redis的鏈接,而消費者則需要單獨占用一個Redis的鏈接,在java中便不得獨立出分出一個線程來處理消費者。這種場景一般對應這多個消費者,此時則有着過高的資源消耗。
對於如上的幾種不足,如果在項目中需要考慮的話可以使用JMS來實現該功能。JMS提供了消息的持久化/耐久性等各種企業級的特性。如果依然想使用Redis來實現並做一些數據的持久化操作,則可以根據JMS的特性來通過Redis模擬出來.
模擬的步驟如下:
1.subscribe端首先向一個Set集合中增加“訂閱者ID”,此Set集合保存了“活躍訂閱”者,訂閱者ID標記每個唯一的訂閱者,例如:sub:email,sub:web。此SET稱為“活躍訂閱者集合”
2.subcribe端開啟訂閱操作,並基於Redis創建一個以“訂閱者ID”為KEY的LIST數據結構,此LIST中存儲了所有的尚未消費的消息。此LIST稱為“訂閱者消息隊列”
3.publish端:每發布一條消息之后,publish端都需要遍歷“活躍訂閱者集合”,並依次向每個“訂閱者消息隊列”尾部追加此次發布的消息。到此為止,我們可以基本保證,發布的每一條消息,都會持久保存在每個“訂閱者消息隊列”中。
4.subscribe端,每收到一個訂閱消息,在消費之后,必須刪除自己的“訂閱者消息隊列”頭部的一條記錄。subscribe端啟動時,如果發現自己的自己的“訂閱者消息隊列”有殘存記錄,那么將會首先消費這些記錄,然后再去訂閱。
實現如下:
public class PubSubListener extends JedisPubSub{ private String clientId; private RedisHandler redisHandler; public PubSubListener(String clientId, Jedis jedis) { this.clientId = clientId; this.redisHandler = new RedisHandler(jedis); } @Override public void onMessage(String channel, String message) { if ("exit".equals(message)) { redisHandler.onUnsubscribe(channel); } redisHandler.hanlder(channel,message); } @Override public void onSubscribe(String channel, int subscribedChannels) { redisHandler.subscribe(channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { redisHandler.onUnsubscribe(channel); } class RedisHandler{ private Jedis jedis =null; public RedisHandler(Jedis jedis) { this.jedis = jedis; } /** * 訂閱操作步驟: * 1.判斷clientID是否在PERSITS_SUB隊列中 * 2.如果在隊列中說明已經訂閱,或則把clientID添加到隊列中 * @param channel */ public void subscribe(String channel){ String key = clientId + "/" + channel; boolean isExists = this.jedis.sismember("PERSITS_SUB",key); if(!isExists){ this.jedis.sadd("PERSITS_SUB",key); } } /** * 取消訂閱 * @param channel */ public void onUnsubscribe(String channel){ String key = clientId + "/" + channel; //從訂閱者隊列中刪除 this.jedis.srem("",key); //刪除訂閱者消息隊列 this.jedis.del(channel); } public void hanlder(String channel,String message){ int index = message.indexOf("/"); if(index < 0){ //消息不合法,丟棄 return; } Long txid = Long.valueOf(message.substring(0,index)); String key = clientId + "/" + channel; while(true){ String lm = this.jedis.lindex(key,0);//獲取第一個消息 if(lm == null){ break; } int li = lm.indexOf("/"); if(li < 0){ //消息不合法 String result = this.jedis.lpop(key); if(result == null){ break; } message(channel,message); continue; } long lmid = Long.parseLong(lm.substring(0,li)); if(txid >= lmid){ this.jedis.lpop(key); message(channel,message); continue; }else{ break; } } } } private void message(String channel, String message) { System.out.println("receive message " + message); } }
public class SubClient { private Jedis jedis = null; private PubSubListener listener = null; public SubClient(Jedis jedis, PubSubListener listener) { this.jedis = jedis; this.listener = listener; } public void subscribe(String channel){ jedis.subscribe(listener,channel); } public void onUnsubscribe(String channel){ listener.unsubscribe(channel); } }
public class PubClient { private Jedis jedis = null; public PubClient(String host) { this.jedis = new Jedis(host); } public void put(String message){ Set<String> clients = this.jedis.smembers("PERSITS_SUB"); for(String client : clients){ //在每個客戶端對應的消息隊列中持久化消息 this.jedis.rpush(client,message); } } /** * 每個消息,都有具有一個全局唯一的id * txid為了防止訂閱端在數據處理時“亂序”,這就要求訂閱者需要解析message * @param channel * @param message */ public void publish(String channel, String message) { Long txid = jedis.incr("MESSAGE_TXID"); String content = txid + "/" + message; this.put(content); jedis.publish(channel, content);//為每個消息設定id,最終消息格式1000/messageContent } public void close(String channel){ jedis.publish(channel, "exit"); jedis.del(channel);//刪除 } }