發布和訂閱模式是常用和很方便的模式,下面記錄redis中對pub/sub的支持;
Pub/Sub: "發布/訂閱"在redis中,被設計的非常輕量級和簡潔,它做到了消息的“發布”和“訂閱”的 基本能力;但是尚未提供關於消息的持久化等各種企業級的特性。 一個Redis client發布消息,其他多個redis client訂閱消息,發布的消息“即發即失”,redis 不會持久保存發布的消息;消息訂閱者也將只能得到訂閱之后的消息,通道中此前的消息將無 從獲得。 消息發布者,即publish客戶端,無需獨占鏈接,你可以在publish消息的同時,使用同一個redis-client鏈接進行其他操作(例如:INCR等) 消息訂閱者,即subscribe客戶端,需要獨占鏈接,即進行subscribe期間,redis-client無法穿插其他操作, 此時client以阻塞的方式等待“publish端”的消息;因此這里subscribe端需要使用單獨的鏈接,甚至需要在額外的線程中使用。
Tcp默認連接時間固定,如果在這時間內sub端沒有接收到pub端消息,或pub端沒有消息產生,sub端的連接都會被強制回收,
這里就需要使用特殊手段解決,用定時器來模擬pub和sub之間的保活機制,定時器時間不能超過TCP最大連接時間,具體根據機器環境來定;
一旦subscribe端斷開鏈接,將會失去部分消息,即鏈接失效期間的消息將會丟失,所以這里就需要考慮到借助redis的list來持久化;
如果你非常關注每個消息,那么你應該基於Redis做一些額外的補充工作,如果你期望訂閱是持久的,那么如下的設計思路可以借鑒: 1) subscribe端: 首先向一個Set集合中增加“訂閱者ID”, 此Set集合保存了“活躍訂閱”者, 訂閱者ID標記每個唯一的訂閱者,此Set為 "活躍訂閱者集合"
2) subcribe端開啟訂閱操作,並基於Redis創建一個以 "訂閱者ID" 為KEY的LIST數據結構, 此LIST中存儲了所有的尚未消費的消息,此List稱為 "訂閱者消息隊列" 3) publish端: 每發布一條消息之后,publish端都需要遍歷 "活躍訂閱者集合",並依次 向每個 "訂閱者消息隊列" 尾部追加此次發布的消息. 4) 到此為止,我們可以基本保證,發布的每一條消息,都會持久保存在每個 "訂閱者消息隊列" 中. 5) subscribe端,每收到一個訂閱消息,在消費之后,必須刪除自己的 "訂閱者消息隊列" 頭部的一條記錄. 6) subscribe端啟動時,如果發現自己的 "訂閱者消息隊列" 有殘存記錄, 那么將會首先消費這些記錄,然后再去訂閱.
以上方法可以保證成功到達的消息必消費不丟失;
但還是會存在ngx業務機方自丟失數據問題,也就是ngx業務機自身問題或網絡問題導致ngx業務機發布的消息沒有送達redis機器;
更完善的確認機制才能徹底解決上述存在問題; 注意,在實際ngx_lua_redis應用中,redis單個客戶端訂閱模式下僅能使用有限的幾個命令,不能使用其它結構命令,如lpop,rpush等;
因為 publish是普通的request/response模式, 但subscribe不是,否則會報錯:
ERR only (P)SUBSCRIBE \/ (P)UNSUBSCRIBE \/ PING \/ QUIT allowed in this cont
關於這點以下是官網一般解釋: You are required to use two connections for pub and sub. A subscriber connection cannot issue any commands other than subscribe, psubscribe, unsubscribe, punsubscribe (although @Antirez has hinted of a subscriber-safe ping in the future). If you try to do anything else, redis tells you: -ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context (note that you can't test this with redis-cli, since that understands the protocol well enough to prevent you from issuing commands once you have subscribed - but any other basic socket tool should work fine) This is because subscriber connections work very differently - rather than working on a request/response basis, incoming messages can now come in at any time, unsolicited. publish is a regular request/response command, so must be sent on a regular connection, not a subscriber connection.
於是該特性不適用單例模式,要解決上面局限,需要多客戶端輔助操作同一結果,下列代碼中會有展示;
下面示例是ngx_lua_redis生產環境下實驗結果,有興趣的可以分析

--[[ cosocket即coroutine+socket 順序執行,但它是非阻塞執行方式 因為nginx core是非阻塞執行; redis中subscribe是阻塞方式, 因此在nginx_lua平台中使用redis 中sub特性無法保持阻塞連接狀態; 流程模型:http://www.cnblogs.com/foundwant/p/6382083.html ]] local args = ngx.req.get_uri_args() local ttype = args.type -- pub/sub local function newRedis(timeout, ip, port, section) local red = redis.new() red:set_timeout(timeout) local ok, err = red:connect(ip, port) if not ok then nlog.dinfo("connect:" .. err) end red:select(section) return red end local red = newRedis(10000, "127.0.0.1", "6379", 0) local bak = newRedis(10000, "127.0.0.1", "6379", 0) local function subscribe(channel) local res, err = red:subscribe(channel) if not res then nlog.dinfo("subscribe error.") return nil, err end --這里以函數返回,不然sub會在這里斷連失去可操作性 --這就是提到的特殊之一 local function read_func(do_read) if nil == do_read or true == do_read then res, err = red:read_reply() if not res then return nil, err end return res end red:unsubscribe(channel) red:set_keepalive(60000, 100) --連接回收 bak:close() bak:set_keepalive(60000, 100) --斷連后重啟等待 red = newRedis(10000, "127.0.0.1", "6379", 0) red:subscribe(channel) bak = newRedis(10000, "127.0.0.1", "6379", 0) return end return read_func end local subset = "subset" --set local channel = "test" --list consume = function(length) --若訂閱者消息隊列有殘余,先消費,再訂閱 for i=1, llength do local recv, err = red:lpop(channel) --頭部開始消費 nlog.dinfo("recv:" .. cjson.encode(recv)) end redis_util.coroutine_count = 1 coroutine.yield() end --訂閱者 if "sub" == ttype then --向set集合增加"訂閱者id" red:sadd(subset, channel) --為每個"訂閱者id"建立list local llength = red:llen(channel) if 0 == llength then red:rpush(channel, "hello") else --若訂閱者消息隊列有殘余,先消費,再訂閱 for i=1, llength do local recv, err = red:lpop(channel) --頭部開始消費 nlog.dinfo("recv:" .. cjson.encode(recv)) end end nlog.dinfo("run coroutine after...") --開始訂閱 local func, err = subscribe(channel) while true do local res, err = func() --res:["message","test","world"] if err then func(false) end --在redis的訂閱模式中, --單例模式下只能使用固定幾個命令[ (P)SUBSCRIBE,(P)UNSUBSCRIBE,QUIT,PING,... ], --無法使用其它命令,比如lpop, rpush等命令, --所以這里無法使用red:lpop()來執行出隊刪除操作, --只能另起一個客戶端對象來進行刪除操作; local oo, ooerr = bak:lpop(channel) nlog.dinfo("bak lpop:" .. cjson.encode(oo)) nlog.dinfo("res:" .. cjson.encode(res)) ngx.sleep(1) end end --發布者,測試用,實際調用是在業務層 if "pub" == ttype then --先發布,再追加隊列 --local subchannel, err = red:spop(subset) --nlog.dinfo("subchannel:" .. type(subchannel)) --if "userdata" ~= type(subchannel) then for i=1, 1000 do local str = "world_" .. i red:publish(channel, str) red:rpush(channel, str) --尾部追加 ngx.sleep(0.1) end --end end --監聽器,crontab定時運行 if "spy" == ttype then while true do red:publish(channel, "0") ngx.sleep(60) end end ok, err = red:set_keepalive(60000, 100) if not ok then ngx.say("set_keepalive:", err) end ngx.print("rpush done.") ngx.exit(200)