openresty 學習筆記五:訪問RabbitMQ消息隊列
之前通過比較選擇,決定采用RabbitMQ這種消息隊列來做中間件,目的舒緩是為了讓整個架構的瓶頸環節。這里是做具體實施,用lua訪問RabbitMQ消息隊列。
RabbitMQ消息隊列有幾個比較重要的概念:生產者Producer,消費者Consumer,交換器Exchanges,隊列Queues
我的簡單理解
生產者,發布消息入隊的用戶。
消費者,訂閱隊列獲取消息的用戶。
交換器,消息可以不指定某個具體隊列,而是發送給交換器,通過不同類型交換器的規則和router key分配到具體的隊列。
隊列,消息隊列載體,每個消息都會被投入到一個或多個隊列。
lua 要訪問 RabbitMQ 目前只找到一個通過STOMP協議連接的庫lua-resty-rabbitmqstomp,前面已經介紹了這個STOMP,RabbitMQ也需要安裝適配器才可以。該庫我查不到聲明exchange和queues並進行綁定的方法,只可以實現發送消息到指定交換器和訂閱指定隊列。
二次封裝
根據自己的業務需要,先稍微做了個二次封裝,主要是封裝連接步驟和send的方法,同時規范返回值
local rabbitmq = require "resty.rabbitmqstomp"
local _M = {}
_M._VERSION = '0.01'
local mt = { __index = _M }
function _M.new(self, opts)
opts = opts or {}
return setmetatable({
mq_host = opts.host or '127.0.0.1',
mq_port = opts.port or 61613,
mq_timeout = opts.timeout or 10000,
mq_user = opts.username or 'guest',
mq_password = opts.password or 'guest',
mq_vhost = opts.vhost or "/"}, mt)
end
function _M.get_connect(self)
local mq, err = rabbitmq:new({ username = self.mq_user,
password = self.mq_password,
vhost = self.mq_vhost })
if not mq then
return false,err
end
mq:set_timeout(self.mq_timeout)
local ok, err = mq:connect(self.mq_host,self.mq_port)
if not ok then
return false,err
end
return true,mq
end
function _M.send(self , destination, msg)
local ret, client = self:get_connect()
if not ret then
return false,client
end
local send_receipt_id = ngx.now()*1000
local headers = {}
headers["destination"] = destination
headers["receipt"] = send_receipt_id
headers["app-id"] = "luaresty"
headers["persistent"] = "true"
headers["content-type"] = "application/json"
local ok, err = client:send(msg, headers)
if not ok then
return false,err
end
local _,str_start = string.find(ok, "receipt%-id", 1)
local str_end = string.find(ok, "\n\n", 1)
if str_start == nil or str_end == nil then
return false,"send receipt not receive"
end
local receipt_id = string.sub(ok, str_start + 2 ,str_end - 1)
if receipt_id ~= send_receipt_id then
return false,"receipt id not right"
end
local ok, err = client:set_keepalive(10000, 10000)
return true,send_receipt_id
end
return _M
使用示例
local rabbitmq = require ("myRabbitmq")
local mq = rabbitmq:new(config)
local ok , err = mq:send(destination,data)
if not ok then
ngx.log(ngx.ERR, "failed to send mq :", err)
end
