wrk 及擴展支持 tcp 字節流協議壓測
高性能、方便使用的 HTTP(s)
的流量壓測工具,結合了多個開源項目開發而成:
- redis 的 ae 事件框架
- luajit
- openssl
- http-parser
減少造輪子、復用他人的成功項目,贊👍;我們定制化也走這條路線,代碼見此。
要支持 tcp 字節流協議壓測,只需要增加一個函數 stream_response
,實現見此
-- data 的是請求的響應結果,這里假設為 {"error_code":0,"error_msg":"","data":{}}
-- stream_response 表示使用tcp字節流協議壓測,對返回 error_code 進行校驗,為0表示狀態正常。
function stream_response(data)
local t = json.decode(data)
return t["error_code"] == 0
end
lua 腳本
wrk 的第一大特色就是支持 lua 腳本,直接對 c 修改進行壓測成本比較高:c 的業務開發速度較慢及每次都需要編譯。
腳本則克服了開發時間過長的缺點,使用 luajit 速度可以保證在開發和運行的速度中得到一個平衡。
具體的腳本的變量和函數的邏輯見 官方文檔,一定要熟讀這個文檔,精華部分,比其它個人的表述准確非常多。
一些官方文檔之外的補充
腳本文件
分為兩個文件組成
- wrk.lua 內置腳本,提供了基本的 API 和一些變量
- 命令行 -s <foo.lua>, foo.lua 用戶自己使用的腳本文件,為可選項
線程
每個線程都包含一個自己的lua狀態機,所以在腳本文件中定義的變量(如自增的請求計數),不同線程的中的結果是不相同的。
線程的結構是一個用戶數據(userdata), 在 c 中的定義為 struct thread
。關聯的 addr 也同樣為用戶數據,在 c 中的定義為 struct addrinfo
,支持取和存的操作(可以存在 = 的左右)
thread:set(key, value),value 不能夠為表,在使用 get 操作后會發生 panic,應該是 script_copy_value()
函數中出現棧頂設置錯誤的問題
加速
如果構造的 request 內容比較耗時的話,優先放在 init() 使用提前生成並且混緩存起來,后面的 request() 直接從緩存的結果中獲取。
高性能 && 請求收發邏輯
基於 redis 的 ae 事件框架是。和 ab 不同的是可以充分利用多核資源,減小線程間的切換,以此獲得高性能。一般而言,ab 在請求量不是很大的情況下是ok的,但是在請求量到達上w req/s 后,自身就會成為瓶頸。
在每個線程中創建 connections / threads
個連接,並且將這些建立連接的 fd 添加至事件循環中,然后 fd 就緒后,將 readble
和 writeable
函數添加再添加至事件循環中;
writeable
對應着請求發送的邏輯,調用lua接口 reqeust()
獲取發送內容就在其中;
在發送完成后會將自身從事件循環中刪除,發送(write)可能調用多次,但是一定會等到將緩沖區中的內容全部發送完成,除非發送失敗產生錯誤。
延時發送 delay
delay() 為 lua 的一個可選接口,發揮延遲發送的間隔,單位為毫秒(ms).
當lua腳本中出現了該函數時,writeable 就會從事件循環的文件事件刪除自身,並且將 writeable 作為定時任務添加至事件循環中,從而達到延時發送的效果。
readable
對應響應接收的邏輯,對應返回的內容校驗,在官方版本中,為 http 請求的解析。對解析的結果進行統計,當判斷響應結束時,刪除該連接在事件循環中的事件,並且重新進行最初的動作。
我們可以接管這個解析結果的過程,豐富化使用場景。
收發事件簡單的時序圖
統計
wrk 對兩個維度對壓測的結果做了統計,結果如下
Thread Stats Avg Stdev Max +/- Stdev
Latency 19.85ms 3.71ms 49.11ms 81.44%
Req/Sec 98.50 16.32 121.00 88.33%
延遲 Latency && 請求速度 Req/Sec
統計每個請求的延遲情況
- Avg, 平均延遲
- Stdev, 樣本標准差
- Max, 最大延遲
- +/- Stdev, 正負一倍標准差概率
實現
wrk 的實現為通用結構,適用延遲和請求速度的統計
typedef struct {
uint64_t count; // 樣本數量
uint64_t limit; // 最大樣本變量限制
uint64_t min; // 最小樣本變量
uint64_t max; // 最大樣本變量
uint64_t data[]; // 索引為樣本變量,值為出現的次數
} stats;
limit
防止 data[]
容量不夠,也起到一個剔除不滿足要求的情況,如延遲超過 limit 后直接歸為 timeout 中。
⚠️ 注意 data[]
數據每個元素的值為出現的次數,而不是樣本變量。
樣本變量統計
__sync_*
為編譯器的同步函數,wrk 將統計的變量作為一個全局存在,故多個線程內就需要一些同步操作保證正確性。
理論上可以將這些統計變量放在線程內,在所以線程結束后,匯集處理,這里就不要這些同步元語了。不過目前還算簡單,這樣做問題也不大。
n 為樣本變量,stats->data[n]
為該樣本變量出現的次數。min 和 max 為之后的統計過程加速。
int stats_record(stats *stats, uint64_t n) {
if (n >= stats->limit) return 0;
__sync_fetch_and_add(&stats->data[n], 1);
__sync_fetch_and_add(&stats->count, 1);
uint64_t min = stats->min;
uint64_t max = stats->max;
while (n < min) min = __sync_val_compare_and_swap(&stats->min, min, n);
while (n > max) max = __sync_val_compare_and_swap(&stats->max, max, n);
return 1;
}
樣本標准差
數學公式為
wrk 實現如下,L6 處 * stats->data[i]
表示有多個樣本變量為 i
1 long double stats_stdev(stats *stats, long double mean) {
2 long double sum = 0.0;
3 if (stats->count < 2) return 0.0;
4 for (uint64_t i = stats->min; i <= stats->max; i++) {
5 if (stats->data[i]) {
6 sum += powl(i - mean, 2) * stats->data[i];
7 }
8 }
9 return sqrtl(sum / (stats->count - 1));
10 }
擴展支持 tcp 壓測
由於 wrk 支持 http(s) 的壓測,但實際的場景中有很多不是 http 的協議,可以就是很簡單的 json 文本協議。
所以這里對 wrk 做一個簡單的擴展,支持普通的4層流量壓測,功能上支持 json 和 md5。json庫使用 yyjson,md5 使用 nginx/md5,充分利用前人的成功經驗。
提供的庫功能
-- scripts/test.lua
local data = '{"host":"129.168.10.10","os":"linux","open_ports":[22,80,3306]}'
local data_tbl = json.decode(data)
local function print_tables(t, indent)
local tab_indent = ""
for i = 1, indent do tab_indent = tab_indent .. "\t" end
for k, v in pairs(t) do
if type(v) == "table" then
print_tables(v, indent + 1)
else
print(string.format("%s %s\t%s", tab_indent, tostring(k), tostring(v)))
end
end
end
print_tables(data_tbl, 0)
-- $ ./wrk -t 1 -c 1 -d3s -L -s scripts/test.lua https://www.baidu.com
-- host 129.168.10.10
-- os linux
-- 1 22
-- 2 80
-- 3 3306
json
- json.encode, json轉字符串
- json.decode, 字符串轉json
- json.encode_empty_table_as_object, 空table時作為 object 使用(參考 openresty)
md5
- md5.sum, 16字節md5sum
- md5.sumhexa, 32字節16進制格式的sum
支持 tcp 文本的壓測
主要修改提交為 點此查看。
將wrk擴展為支持普通的tcp流量,主要是在 readable 中接管返回數據的解析過程。也就是 L23-L13
,這個地方改動后應該形成一個分支,如果定義了支持 tcp,就走tcp的解析邏輯。
1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
2 connection *c = data;
3 size_t n;
4
5 do {
6 switch (sock.read(c, &n)) {
7 case OK: break;
8 case ERROR: goto error;
9 case RETRY: return;
10 }
11
12 if (http_parser_execute(&c->parser, &parser_settings, c->buf, n) != n) goto error;
13 if (n == 0 && !http_body_is_final(&c->parser)) goto error;
14
15 c->thread->bytes += n;
16 } while (n == RECVBUF && sock.readable(c) > 0);
17
18 return;
19
20 error:
21 c->thread->errors.read++;
22 reconnect_socket(c->thread, c);
23 }
如何決定為tcp文本協議解析
想來想去,為不破壞原來的接口定義並且盡量減少改動,通過以lua腳本中是否定義 stream_response()
來決定是否支持tcp的文本協議:
bool script_want_stream_response(lua_State *L) {
return script_is_function(L, "stream_response");
}
使用函數指針 response_complete
來代替分支邏輯,減少干擾。 在程序的起始階段,根據是否支持為tcp文本協議將具體的處理函數賦值給函數指針。
函數指針的定義為
// 舊的 response_complete -> message_complete
typedef bool (*response_complete_func)(connection *c, size_t n);
static response_complete_func response_complete;
修改后的結果為, socket_readable L12-L3 被替換為一個函數指針執行。
1 static void socket_readable(aeEventLoop *loop, int fd, void *data, int mask) {
...
12 if (!response_complete(c, n))
13 goto error;
...
23 }
導出tcp文本協議解析
除了代碼位置調整及函數名修改,舊的關於 http 流量的解析邏輯不變。
stream_response_complete
為 response_complete
的具體實現,script_stream_response
將響應內容導出至lua的 stream_response
處理。
當響應的內容長度為0時,直接重連。
1 bool stream_response_complete(connection *c, size_t n) {
2 uint64_t now = time_us();
3 thread *thread = c->thread;
4
5 thread->complete++;
6 thread->requests++;
7
8 if (!script_stream_response(thread->L, c->buf, n))
9 thread->errors.status++;
10
11 if (!stats_record(statistics.latency, now - c->start))
12 thread->errors.timeout++;
13
14 c->delayed = cfg.delay;
15 aeCreateFileEvent(thread->loop, c->fd, AE_WRITABLE, socket_writeable, c);
16
17 if (n == 0)
18 reconnect_socket(thread, c);
19
20 return true;
21 }
22
23 bool script_stream_response(lua_State *L, const char *data, size_t n){
24 lua_getglobal(L, "stream_response");
25 lua_pushlstring(L, data, n);
26 lua_call(L, 1, 1);
27 bool ok = lua_toboolean(L, -1);
28 lua_pop(L, 1);
29 return ok;
30 }
其它修改
命令行中的url參數不帶 http 的scheme,則自動補全為 http 避免相關邏輯導致退出;因為對於一個 tcp 的流量壓測,加一個 http 的scheme看起來怪怪的。
TODO
- 修復
script_copy_value()
不能夠 copy 復合表的問題 - 增加類似端口敲門的功能:在每個tcp連接建立后,先發送一段字節流進行驗證請求是否合法
- 支持 unix domain socket 的字節流壓測
參考
- 官方腳本文檔,對相關腳本的描述,一定要熟悉
- 雲風的lua5.3中文文檔,luajit 使用的是 lua5.1 的語法,但是雲風的這個文檔足夠了
- nginx/md5, nginx 的md5模塊
- yyjson,json解析器
- 導出jsoncpp給lua使用,一個正確遞歸處理復合表的方法
- zxhio/wrk, 魔改后的wrk