之前通過比較選擇,決定采用RabbitMQ這種消息隊列來做中間件,目的舒緩是為了讓整個架構的瓶頸環節。這里是做具體實施,用lua訪問RabbitMQ消息隊列。

RabbitMQ消息隊列有幾個比較重要的概念:生產者Producer,消費者Consumer,交換器Exchanges,隊列Queues

我的簡單理解
生產者,發布消息入隊的用戶。
消費者,訂閱隊列獲取消息的用戶。
交換器,消息可以不指定某個具體隊列,而是發送給交換器,通過不同類型交換器的規則和router key分配到具體的隊列。
隊列,消息隊列載體,每個消息都會被投入到一個或多個隊列。

lua 要訪問 RabbitMQ 目前只找到一個通過STOMP協議連接的庫lua-resty-rabbitmqstomp,前面已經介紹了這個STOMP,RabbitMQ也需要安裝適配器才可以。該庫我查不到聲明exchange和queues並進行綁定的方法,只可以實現發送消息到指定交換器和訂閱指定隊列。

二次封裝

根據自己的業務需要,先稍微做了個二次封裝,主要是封裝連接步驟和send的方法,同時規范返回值

local rabbitmq = require "resty.rabbitmqstomp"

local _M = {}
_M._VERSION = '0.01'

local mt = { __index = _M }

function _M.new(self, opts)
    opts = opts or {}
    return setmetatable({
            mq_host = opts.host or '127.0.0.1',
            mq_port = opts.port or 61613,
            mq_timeout = opts.timeout or 10000,
            mq_user = opts.username or 'guest',
            mq_password = opts.password or 'guest',
            mq_vhost = opts.vhost or "/"}, mt)
end

function _M.get_connect(self)

    local mq, err = rabbitmq:new({ username = self.mq_user,
                                   password = self.mq_password,
                                   vhost = self.mq_vhost })

    if not mq then
        return false,err
    end

    mq:set_timeout(self.mq_timeout)

    local ok, err = mq:connect(self.mq_host,self.mq_port) 

    if not ok then
        return false,err
    end

    return true,mq

end

function _M.send(self , destination, msg)

    local ret, client = self:get_connect()
    if not ret then
        return false,client
    end

    local send_receipt_id = ngx.now()*1000

    local headers = {}
    headers["destination"] = destination
    headers["receipt"] = send_receipt_id
    headers["app-id"] = "luaresty"
    headers["persistent"] = "true"
    headers["content-type"] = "application/json"

    local ok, err = client:send(msg, headers)
    if not ok then
        return false,err
    end

    local _,str_start = string.find(ok, "receipt%-id", 1)
    local str_end = string.find(ok, "\n\n", 1)
    if str_start == nil or str_end == nil then
        return false,"send receipt not     receive"
    end

    local receipt_id = string.sub(ok, str_start + 2 ,str_end - 1)
    if receipt_id ~= send_receipt_id then
        return false,"receipt id not right"
    end

    local ok, err = client:set_keepalive(10000, 10000)

    return true,send_receipt_id
end


return _M

  

使用示例

local rabbitmq = require ("myRabbitmq")
local mq = rabbitmq:new(config)
local ok , err = mq:send(destination,data)
if not ok then
    ngx.log(ngx.ERR, "failed to send mq :", err)
end