高並發 Nginx+Lua OpenResty系列(11)——流量復制/AB測試/協程


流量復制

在實際開發中經常涉及到項目的升級,而該升級不能簡單的上線就完事了,需要驗證該升級是否兼容老的上線,因此可能需要並行運行兩個項目一段時間進行數據比對和校驗,待沒問題后再進行上線。這其實就需要進行流量復制,把流量復制到其他服務器上,一種方式是使用如tcpcopy引流;另外我們還可以使用nginx的HttpLuaModule模塊中的ngx.location.capture_multi進行並發執行來模擬復制。
構造兩個服務:

location /test1 {
    keepalive_timeout 60s;
    keepalive_requests 1000;
    content_by_lua '
        ngx.print("test1 : ", ngx.req.get_uri_args()["a"])
        ngx.log(ngx.ERR, "request test1")
    ';
}

location /test2 {
    keepalive_timeout 60s;
    keepalive_requests 1000;
    content_by_lua '
        ngx.print("test2 : ", ngx.req.get_uri_args()["a"])
        ngx.log(ngx.ERR, "request test1")
    ';
}

通過ngx.location.capture_multi調用

location /test {
    lua_socket_connect_timeout 3s;
    lua_socket_send_timeout 3s;
    lua_socket_read_timeout 3s;
    lua_socket_pool_size 100;
    lua_socket_keepalive_timeout 60s;
    lua_socket_buffer_size 8k;

    content_by_lua '
        local res1, res2 = ngx.location.capture_multi{
            { "/test1", { args = ngx.req.get_uri_args() } },
            { "/test2", { args = ngx.req.get_uri_args()} },
        }
        if res1.status == ngx.HTTP_OK then
            ngx.print(res1.body)
        end
        if res2.status ~= ngx.HTTP_OK then
        --記錄錯誤
        end
    ';
}

此處可以根據需求設置相應的超時時間和長連接連接池等;ngx.location.capture底層通過cosocket實現,而其支持Lua中的協程,通過它可以以同步的方式寫非阻塞的代碼實現。
此處要考慮記錄失敗的情況,對失敗的數據進行重放還是放棄根據自己業務做處理。

AB測試

AB測試即多版本測試,有時候我們開發了新版本需要灰度測試,即讓一部分人看到新版,一部分人看到老版,然后通過訪問數據決定是否切換到新版。比如可以通過根據區域、用戶等信息進行切版本。
比如京東商城有一個cookie叫做__jda,該cookie是在用戶訪問網站時種下的,因此我們可以拿到這個cookie,根據這個cookie進行版本選擇。
比如兩次清空cookie訪問發現第二個數字串是變化的,即我們可以根據第二個數字串進行判斷。
__jda=122270672.1059377902.1425691107.1425691107.1425699059.1
__jda=122270672.556927616.1425699216.1425699216.1425699216.1。
判斷規則可以比較多的選擇,比如通過尾號;要切30%的流量到新版,可以通過選擇尾號為1,3,5的切到新版,其余的還停留在老版。

1. 使用map選擇版本

map $cookie___jda $ab_key {
    default                                       "0";
    ~^\d+\.\d+(?P<k>(1|3|5))\.                    "1";
}

使用map映射規則,即如果是到新版則等於"1",到老版等於“0”; 然后我們就可以通過ngx.var.ab_key獲取到該數據。

location /abtest1 {
    if ($ab_key = "1") {  
        echo_location /test1 ngx.var.args;
    }
    if ($ab_key = "0") {
        echo_location /test2 ngx.var.args;
    }
}

此處也可以使用proxy_pass到不同版本的服務器上

location /abtest2 {
    if ($ab_key = "1") {
        rewrite ^ /test1 break;
        proxy_pass http://backend1;
    }
    rewrite ^ /test2 break;
    proxy_pass http://backend2;
}

2. 直接在Lua中使用lua-resty-cookie獲取該Cookie進行解析

首先下載lua-resty-cookie

cd /usr/openResty/lualib/resty/
wget https://raw.githubusercontent.com/cloudflare/lua-resty-cookie/master/lib/resty/cookie.lua

openResty.conf配置文件

location /abtest3 {
    content_by_lua '  
         local ck = require("resty.cookie")
         local cookie = ck:new()
         local ab_key = "0"
         local jda = cookie:get("__jda")
         if jda then
             local v = ngx.re.match(jda, [[^\d+\.\d+(1|3|5)\.]])
             if v then
                ab_key = "1"
             end
         end
 
         if ab_key == "1" then
             ngx.exec("/test1", ngx.var.args)
         else
             ngx.print(ngx.location.capture("/test2", {args = ngx.req.get_uri_args()}).body)
         end
    ';
}

首先使用lua-resty-cookie獲取cookie,然后使用ngx.re.match進行規則的匹配,最后使用ngx.exec或者ngx.location.capture進行處理。此處同時使用ngx.exec和ngx.location.capture目的是為了演示,此外沒有對ngx.location.capture進行異常處理。

協程

Lua中沒有線程和異步編程編程的概念,對於並發執行提供了協程的概念,個人認為協程是在A運行中發現自己忙則把CPU使用權讓出來給B使用,最后A能從中斷位置繼續執行,本地還是單線程,CPU獨占的;因此如果寫網絡程序需要配合非阻塞I/O來實現。
ngx_lua 模塊對協程做了封裝,我們可以直接調用ngx.thread API使用,雖然稱其為“輕量級線程”,但其本質還是Lua協程。該API必須配合該ngx_lua模塊提供的非阻塞I/O API一起使用,比如我們之前使用的ngx.location.capture_multi和lua-resty-redis、lua-resty-mysql等基於cosocket實現的都是支持的。
通過Lua協程我們可以並發的調用多個接口,然后誰先執行成功誰先返回,類似於BigPipe模型。

1. 依賴的API

location /api1 {
    echo_sleep 3;
    echo api1 : $arg_a;
}
location /api2 {
    echo_sleep 3;
    echo api2 : $arg_a;
}

我們使用echo_sleep等待3秒。

2. 串行實現

location /serial {
    content_by_lua '
        local t1 = ngx.now()
        local res1 = ngx.location.capture("/api1", {args = ngx.req.get_uri_args()})
        local res2 = ngx.location.capture("/api2", {args = ngx.req.get_uri_args()})
        local t2 = ngx.now()
        ngx.print(res1.body, "<br/>", res2.body, "<br/>", tostring(t2-t1))
    ';
}

即一個個的調用,總的執行時間在6秒以上,比如訪問http://127.0.0.1/serial?a=22

api1 : 22
api2 : 22
6.0040001869202

ngx.location.capture_multi實現

location /concurrency1 {
    content_by_lua '
        local t1 = ngx.now()
        local res1,res2 = ngx.location.capture_multi({
              {"/api1", {args = ngx.req.get_uri_args()}},
              {"/api2", {args = ngx.req.get_uri_args()}}
        })
        local t2 = ngx.now()
        ngx.print(res1.body, "<br/>", res2.body, "<br/>", tostring(t2-t1))
    ';
}

直接使用ngx.location.capture_multi來實現,比如訪問http://127.0.0.1/concurrency1?a=22

api1 : 22
api2 : 22
3.0020000934601

4. 協程API實現

location /concurrency2 {
    content_by_lua '
        local t1 = ngx.now()
        local function capture(uri, args)
           return ngx.location.capture(uri, args)
        end
        local thread1 = ngx.thread.spawn(capture, "/api1", {args = ngx.req.get_uri_args()})
        local thread2 = ngx.thread.spawn(capture, "/api2", {args = ngx.req.get_uri_args()})
        local ok1, res1 = ngx.thread.wait(thread1)
        local ok2, res2 = ngx.thread.wait(thread2)
        local t2 = ngx.now()
        ngx.print(res1.body, "<br/>", res2.body, "<br/>", tostring(t2-t1))
    ';
}

使用ngx.thread.spawn創建一個輕量級線程,然后使用ngx.thread.wait等待該線程的執行成功。比如訪問http://127.0.0.1/concurrency2?a=22

api1 : 22
api2 : 22
3.0030000209808

其有點類似於Java中的線程池執行模型,但不同於線程池,其每次只執行一個函數,遇到IO等待則讓出CPU讓下一個執行。我們可以通過下面的方式實現任意一個成功即返回,之前的是等待所有執行成功才返回。

local  ok, res = ngx.thread.wait(thread1, thread2)

Lua協程參考資料
《Programming in Lua》
http://timyang.net/lua/lua-coroutine-vs-java-wait-notify/
https://github.com/andycai/luaprimer/blob/master/05.md
http://my.oschina.net/wangxuanyihaha/blog/186401
http://manual.luaer.cn/2.11.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM