openresty 學習筆記五:訪問RabbitMQ消息隊列
之前通過比較選擇,決定采用RabbitMQ這種消息隊列來做中間件,目的舒緩是為了讓整個架構的瓶頸環節。這里是做具體實施,用lua訪問RabbitMQ消息隊列。
RabbitMQ消息隊列有幾個比較重要的概念:生產者Producer,消費者Consumer,交換器Exchanges,隊列Queues
我的簡單理解
生產者,發布消息入隊的用戶。
消費者,訂閱隊列獲取消息的用戶。
交換器,消息可以不指定某個具體隊列,而是發送給交換器,通過不同類型交換器的規則和router key分配到具體的隊列。
隊列,消息隊列載體,每個消息都會被投入到一個或多個隊列。
lua 要訪問 RabbitMQ 目前只找到一個通過STOMP協議連接的庫lua-resty-rabbitmqstomp,前面已經介紹了這個STOMP,RabbitMQ也需要安裝適配器才可以。該庫我查不到聲明exchange和queues並進行綁定的方法,只可以實現發送消息到指定交換器和訂閱指定隊列。
二次封裝
根據自己的業務需要,先稍微做了個二次封裝,主要是封裝連接步驟和send的方法,同時規范返回值
local rabbitmq = require "resty.rabbitmqstomp" local _M = {} _M._VERSION = '0.01' local mt = { __index = _M } function _M.new(self, opts) opts = opts or {} return setmetatable({ mq_host = opts.host or '127.0.0.1', mq_port = opts.port or 61613, mq_timeout = opts.timeout or 10000, mq_user = opts.username or 'guest', mq_password = opts.password or 'guest', mq_vhost = opts.vhost or "/"}, mt) end function _M.get_connect(self) local mq, err = rabbitmq:new({ username = self.mq_user, password = self.mq_password, vhost = self.mq_vhost }) if not mq then return false,err end mq:set_timeout(self.mq_timeout) local ok, err = mq:connect(self.mq_host,self.mq_port) if not ok then return false,err end return true,mq end function _M.send(self , destination, msg) local ret, client = self:get_connect() if not ret then return false,client end local send_receipt_id = ngx.now()*1000 local headers = {} headers["destination"] = destination headers["receipt"] = send_receipt_id headers["app-id"] = "luaresty" headers["persistent"] = "true" headers["content-type"] = "application/json" local ok, err = client:send(msg, headers) if not ok then return false,err end local _,str_start = string.find(ok, "receipt%-id", 1) local str_end = string.find(ok, "\n\n", 1) if str_start == nil or str_end == nil then return false,"send receipt not receive" end local receipt_id = string.sub(ok, str_start + 2 ,str_end - 1) if receipt_id ~= send_receipt_id then return false,"receipt id not right" end local ok, err = client:set_keepalive(10000, 10000) return true,send_receipt_id end return _M
使用示例
local rabbitmq = require ("myRabbitmq") local mq = rabbitmq:new(config) local ok , err = mq:send(destination,data) if not ok then ngx.log(ngx.ERR, "failed to send mq :", err) end