wrk 及擴展支持 tcp 字節流協議壓測


wrk 及擴展支持 tcp 字節流協議壓測

高性能、方便使用的 HTTP(s) 的流量壓測工具,結合了多個開源項目開發而成:

  1. redis 的 ae 事件框架
  2. luajit
  3. openssl
  4. http-parser

減少造輪子、復用他人的成功項目,贊👍;我們定制化也走這條路線,代碼見此

要支持 tcp 字節流協議壓測,只需要增加一個函數 stream_response實現見此

-- data 的是請求的響應結果,這里假設為 {"error_code":0,"error_msg":"","data":{}}
-- stream_response 表示使用tcp字節流協議壓測,對返回 error_code 進行校驗,為0表示狀態正常。

function stream_response(data)
    local t = json.decode(data)
    return t["error_code"] == 0
end

lua 腳本

wrk 的第一大特色就是支持 lua 腳本,直接對 c 修改進行壓測成本比較高:c 的業務開發速度較慢及每次都需要編譯。

腳本則克服了開發時間過長的缺點,使用 luajit 速度可以保證在開發和運行的速度中得到一個平衡。

具體的腳本的變量和函數的邏輯見 官方文檔,一定要熟讀這個文檔,精華部分,比其它個人的表述准確非常多。

一些官方文檔之外的補充

腳本文件

分為兩個文件組成

  1. wrk.lua 內置腳本,提供了基本的 API 和一些變量
  2. 命令行 -s <foo.lua>, foo.lua 用戶自己使用的腳本文件,為可選項

線程

每個線程都包含一個自己的lua狀態機,所以在腳本文件中定義的變量(如自增的請求計數),不同線程的中的結果是不相同的。

線程的結構是一個用戶數據(userdata), 在 c 中的定義為 struct thread。關聯的 addr 也同樣為用戶數據,在 c 中的定義為 struct addrinfo,支持取和存的操作(可以存在 = 的左右)

thread:set(key, value),value 不能夠為表,在使用 get 操作后會發生 panic,應該是 script_copy_value() 函數中出現棧頂設置錯誤的問題

加速

如果構造的 request 內容比較耗時的話,優先放在 init() 使用提前生成並且混緩存起來,后面的 request() 直接從緩存的結果中獲取。

高性能 && 請求收發邏輯

基於 redis 的 ae 事件框架是。和 ab 不同的是可以充分利用多核資源,減小線程間的切換,以此獲得高性能。一般而言,ab 在請求量不是很大的情況下是ok的,但是在請求量到達上w req/s 后,自身就會成為瓶頸。

在每個線程中創建 connections / threads 個連接,並且將這些建立連接的 fd 添加至事件循環中,然后 fd 就緒后,將 readblewriteable 函數添加再添加至事件循環中;

writeable

對應着請求發送的邏輯,調用lua接口 reqeust() 獲取發送內容就在其中;

在發送完成后會將自身從事件循環中刪除,發送(write)可能調用多次,但是一定會等到將緩沖區中的內容全部發送完成,除非發送失敗產生錯誤。

延時發送 delay

delay() 為 lua 的一個可選接口,發揮延遲發送的間隔,單位為毫秒(ms).

當lua腳本中出現了該函數時,writeable 就會從事件循環的文件事件刪除自身,並且將 writeable 作為定時任務添加至事件循環中,從而達到延時發送的效果。

readable

對應響應接收的邏輯,對應返回的內容校驗,在官方版本中,為 http 請求的解析。對解析的結果進行統計,當判斷響應結束時,刪除該連接在事件循環中的事件,並且重新進行最初的動作。

我們可以接管這個解析結果的過程,豐富化使用場景。

收發事件簡單的時序圖

統計

wrk 對兩個維度對壓測的結果做了統計,結果如下

  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    19.85ms    3.71ms  49.11ms   81.44%
    Req/Sec    98.50     16.32   121.00     88.33%

延遲 Latency && 請求速度 Req/Sec

統計每個請求的延遲情況

  • Avg, 平均延遲
  • Stdev, 樣本標准差
  • Max, 最大延遲
  • +/- Stdev, 正負一倍標准差概率

實現

wrk 的實現為通用結構,適用延遲和請求速度的統計

typedef struct {
    uint64_t count;     // 樣本數量
    uint64_t limit;     // 最大樣本變量限制
    uint64_t min;       // 最小樣本變量
    uint64_t max;       // 最大樣本變量
    uint64_t data[];    // 索引為樣本變量,值為出現的次數
} stats;

limit 防止 data[] 容量不夠,也起到一個剔除不滿足要求的情況,如延遲超過 limit 后直接歸為 timeout 中。

⚠️ 注意 data[] 數據每個元素的值為出現的次數,而不是樣本變量。

樣本變量統計

__sync_* 為編譯器的同步函數,wrk 將統計的變量作為一個全局存在,故多個線程內就需要一些同步操作保證正確性。

理論上可以將這些統計變量放在線程內,在所以線程結束后,匯集處理,這里就不要這些同步元語了。不過目前還算簡單,這樣做問題也不大。

n 為樣本變量,stats->data[n] 為該樣本變量出現的次數。min 和 max 為之后的統計過程加速。

int stats_record(stats *stats, uint64_t n) {
    if (n >= stats->limit) return 0;
    __sync_fetch_and_add(&stats->data[n], 1);
    __sync_fetch_and_add(&stats->count, 1);
    uint64_t min = stats->min;
    uint64_t max = stats->max;
    while (n < min) min = __sync_val_compare_and_swap(&stats->min, min, n);
    while (n > max) max = __sync_val_compare_and_swap(&stats->max, max, n);
    return 1;
}

樣本標准差

數學公式為

\[\delta = \sqrt{ \frac{\Sigma(x_i-\bar{x})^2}{n-1}} \]

wrk 實現如下,L6 處 * stats->data[i] 表示有多個樣本變量為 i

 1 long double stats_stdev(stats *stats, long double mean) {
 2     long double sum = 0.0;
 3     if (stats->count < 2) return 0.0;
 4     for (uint64_t i = stats->min; i <= stats->max; i++) {
 5         if (stats->data[i]) {
 6             sum += powl(i - mean, 2) * stats->data[i];
 7         }
 8     }
 9     return sqrtl(sum / (stats->count - 1));
10 }

擴展支持 tcp 壓測

由於 wrk 支持 http(s) 的壓測,但實際的場景中有很多不是 http 的協議,可以就是很簡單的 json 文本協議。

所以這里對 wrk 做一個簡單的擴展,支持普通的4層流量壓測,功能上支持 json 和 md5。json庫使用 yyjson,md5 使用 nginx/md5,充分利用前人的成功經驗。

提供的庫功能

--  scripts/test.lua
local data = '{"host":"129.168.10.10","os":"linux","open_ports":[22,80,3306]}'
local data_tbl = json.decode(data)

local function print_tables(t, indent)
    local tab_indent = ""
    for i = 1, indent do tab_indent = tab_indent .. "\t" end

    for k, v in pairs(t) do
        if type(v) == "table" then
            print_tables(v, indent + 1)
        else
            print(string.format("%s %s\t%s", tab_indent, tostring(k), tostring(v)))
        end
    end
end

print_tables(data_tbl, 0)

-- $ ./wrk -t 1 -c 1 -d3s -L -s scripts/test.lua https://www.baidu.com
-- host   129.168.10.10
-- os     linux
--         1      22
--         2      80
--         3      3306

json

  • json.encode, json轉字符串
  • json.decode, 字符串轉json
  • json.encode_empty_table_as_object, 空table時作為 object 使用(參考 openresty)

md5

  • md5.sum, 16字節md5sum
  • md5.sumhexa, 32字節16進制格式的sum

支持 tcp 文本的壓測

主要修改提交為 點此查看

將wrk擴展為支持普通的tcp流量,主要是在 readable 中接管返回數據的解析過程。也就是 L23-L13,這個地方改動后應該形成一個分支,如果定義了支持 tcp,就走tcp的解析邏輯。

 1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
 2     connection *c = data;
 3     size_t n;
 4 
 5     do {
 6         switch (sock.read(c, &n)) {
 7             case OK:    break;
 8             case ERROR: goto error;
 9             case RETRY: return;
10         }
11 
12         if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
13         if (n == 0 && !http_body_is_final(&c->parser)) goto error;
14 
15         c->thread->bytes += n;
16     } while (n == RECVBUF && sock.readable(c) > 0);
17 
18     return;
19 
20   error:
21     c->thread->errors.read++;
22     reconnect_socket(c->thread, c);
23 }

如何決定為tcp文本協議解析

想來想去,為不破壞原來的接口定義並且盡量減少改動,通過以lua腳本中是否定義 stream_response() 來決定是否支持tcp的文本協議:

bool script_want_stream_response(lua_State *L) {
    return script_is_function(L, "stream_response");
}

使用函數指針 response_complete 來代替分支邏輯,減少干擾。 在程序的起始階段,根據是否支持為tcp文本協議將具體的處理函數賦值給函數指針。

函數指針的定義為

// 舊的 response_complete -> message_complete

typedef bool (*response_complete_func)(connection *c, size_t n);
static response_complete_func response_complete;

修改后的結果為, socket_readable L12-L3 被替換為一個函數指針執行。

 1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
...
12         if (!response_complete(c, n))
13             goto error;
...
23 }

導出tcp文本協議解析

除了代碼位置調整及函數名修改,舊的關於 http 流量的解析邏輯不變。

stream_response_completeresponse_complete 的具體實現,script_stream_response 將響應內容導出至lua的 stream_response 處理。

當響應的內容長度為0時,直接重連。

 1 bool stream_response_complete(connection *c, size_t n) {
 2     uint64_t now = time_us();
 3     thread *thread = c->thread;
 4 
 5     thread->complete++;
 6     thread->requests++;
 7 
 8     if (!script_stream_response(thread->L, c->buf, n))
 9         thread->errors.status++;
10 
11     if (!stats_record(statistics.latency, now - c->start))
12         thread->errors.timeout++;
13 
14     c->delayed = cfg.delay;
15     aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
16 
17     if (n == 0)
18         reconnect_socket(thread, c);
19 
20     return true;
21 }
22
23 bool script_stream_response(lua_State *L, const char *data, size_t n){
24     lua_getglobal(L, "stream_response");
25     lua_pushlstring(L, data, n);
26     lua_call(L, 1, 1);
27     bool ok = lua_toboolean(L, -1);
28     lua_pop(L, 1);
29     return ok;
30 }

其它修改

命令行中的url參數不帶 http 的scheme,則自動補全為 http 避免相關邏輯導致退出;因為對於一個 tcp 的流量壓測,加一個 http 的scheme看起來怪怪的。

TODO

  1. 修復 script_copy_value() 不能夠 copy 復合表的問題
  2. 增加類似端口敲門的功能:在每個tcp連接建立后,先發送一段字節流進行驗證請求是否合法
  3. 支持 unix domain socket 的字節流壓測

參考

  1. 官方腳本文檔,對相關腳本的描述,一定要熟悉
  2. 雲風的lua5.3中文文檔,luajit 使用的是 lua5.1 的語法,但是雲風的這個文檔足夠了
  3. nginx/md5, nginx 的md5模塊
  4. yyjson,json解析器
  5. 導出jsoncpp給lua使用,一個正確遞歸處理復合表的方法
  6. zxhio/wrk, 魔改后的wrk


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM