前言
本文主要介紹如果使用 qemu
和 unicorn
來搜集程序執行的覆蓋率信息以及如何把搜集到的覆蓋率信息反饋到 fuzzer
中輔助 fuzz
的進行。
AFL Fork Server
為了后面介紹 afl
的 qemu
模式和 unicorn
模式, 首先大概講一下 afl
的 fork server
的實現機制。afl
與 fork server
的通信流程如圖所示
- 首先
afl-fuzz
調用init_forkserver
函數fork
出一個新進程作為fork server
, 然后等待fork server
發送4
個字節的數據, 如果能夠正常接收到數據則表示fork server
啟動正常。 fork server
起來后會使用read
阻塞住, 等待afl-fuzz
發送命令來啟動一個測試進程。- 當需要進行一次測試時,
afl-fuzz
會調用run_target
, 首先往管道發送 4 個字節通知fork server
去fork
一個進程來測試。 fork server
新建進程后,會通過管道發送剛剛fork
出的進程的pid
給fork server
.afl-fuzz
根據接收到的pid
等待測試進程結束,然后根據測試生成的覆蓋率信息來引導后續的測試。
AFL qemu 模式
AFL
的 qemu
模式的實現和 winafl
使用 dynamorio
來插樁的實現方式比較類似,winafl
的實現細節如下
https://xz.aliyun.com/t/5108
原始版本
源碼地址
https://github.com/google/AFL/tree/master/qemu_mode/patches
qemu
在執行一個程序時,從被執行程序的入口點開始對基本塊翻譯並執行,為了提升效率,qemu
會把翻譯出來的基本塊存放到 cache
中,當 qemu
要執行一個基本塊時首先判斷基本塊是否在 cache
中,如果在 cache
中則直接執行基本塊,否則會翻譯基本塊並執行。
AFL
的 qemu
模式就是通過在准備執行基本塊的和准備翻譯基本塊的前面增加一些代碼來實現的。首先會在每次執行一個基本塊前調用 AFL_QEMU_CPU_SNIPPET2
來和 afl
通信。
#define AFL_QEMU_CPU_SNIPPET2 do { \
if(itb->pc == afl_entry_point) { \
afl_setup(); \
afl_forkserver(cpu); \
} \
afl_maybe_log(itb->pc); \
} while (0)
如果當前執行的基本塊是 afl_entry_point
(即目標程序的入口點),就設置好與 afl
通信的命名管道和共享內存並初始化 fork server
,然后通過 afl_maybe_log
往共享內存中設置覆蓋率信息。統計覆蓋率的方式和 afl
的方式一樣。
cur_loc = (cur_loc >> 4) ^ (cur_loc << 8);
cur_loc &= MAP_SIZE - 1;
afl_area_ptr[cur_loc ^ prev_loc]++; // 和 afl 一樣 統計 edge 覆蓋率
fork server
的代碼如下
static void afl_forkserver(CPUState *cpu) {
// 通知 afl-fuzz fork server 啟動正常
if (write(FORKSRV_FD + 1, tmp, 4) != 4) return;
// fork server 的主循環,不斷地 fork 新進程
while (1) {
// 阻塞地等待 afl-fuzz 發送命令,fork 新進程
if (read(FORKSRV_FD, tmp, 4) != 4) exit(2);
child_pid = fork(); // fork 新進程
if (!child_pid) {
// 子進程會進入這,關閉通信管道描述符,然后從 afl_forkserver 返回繼續往下執行被測試程序
afl_fork_child = 1;
close(FORKSRV_FD);
close(FORKSRV_FD + 1);
close(t_fd[0]);
return;
}
// fork server 進程,發送 fork 出來的測試進程的 pid 給 afl-fuzz
if (write(FORKSRV_FD + 1, &child_pid, 4) != 4) exit(5);
// 不斷等待處理 測試進程的 翻譯基本塊的請求
afl_wait_tsl(cpu, t_fd[0]);
// 等待子進程結束
if (waitpid(child_pid, &status, 0) < 0) exit(6);
if (write(FORKSRV_FD + 1, &status, 4) != 4) exit(7);
}
}
forkserver
的代碼流程如下
- 首先發送數據給
afl-fuzz
, 表示fork server
啟動正常,通知完之后會進入循環阻塞在 read ,直到 afl-fuzz 端發送消息。 - 接收到數據后,
fork server
會fork
出新進程,此時子進程會關閉所有與afl-fuzz
通信的文件描述符並從afl_forkserver
返回繼續往下執行被測試程序。而父進程則把剛剛fork
出的測試進程的pid
通過管道發送給afl-fuzz
。 - 之后
fork server
進程進入afl_wait_tsl
,不斷循環處理子進程翻譯基本塊的請求。
下面分析 afl_wait_tsl
的原理, 首先 afl
會在 翻譯基本塊后插入一段代碼
tb = tb_gen_code(cpu, pc, cs_base, flags, 0); // 翻譯基本塊
AFL_QEMU_CPU_SNIPPET1; // 通知父進程 (fork server進程) 剛剛翻譯了一個基本塊
#define AFL_QEMU_CPU_SNIPPET1 do { \
afl_request_tsl(pc, cs_base, flags); \
} while (0)
afl_request_tsl
就是把測試進程剛剛翻譯的基本塊的信息發送給父進程(fork server
進程)
static void afl_request_tsl(target_ulong pc, target_ulong cb, uint64_t flags) {
struct afl_tsl t;
if (!afl_fork_child) return;
t.pc = pc;
t.cs_base = cb;
t.flags = flags;
// 通過管道發送信息給 父進程 (fork server 進程)
if (write(TSL_FD, &t, sizeof(struct afl_tsl)) != sizeof(struct afl_tsl))
return;
}
下面看看 afl_wait_tsl
的代碼
static void afl_wait_tsl(CPUState *cpu, int fd) {
while (1) {
// 死循環不斷接收子進程的翻譯基本塊請求
if (read(fd, &t, sizeof(struct afl_tsl)) != sizeof(struct afl_tsl))
break;
// 去fork server進程的 tb cache 中搜索
tb = tb_htable_lookup(cpu, t.pc, t.cs_base, t.flags);
// 如果該基本塊不在在 cache 中就使用 tb_gen_code 翻譯基本塊並放到 cache 中
if(!tb) {
mmap_lock();
tb_lock();
tb_gen_code(cpu, t.pc, t.cs_base, t.flags, 0);
mmap_unlock();
tb_unlock();
}
}
close(fd);
}
代碼流程如下
- 這個函數里面就是一個死循環,不斷地接收測試進程翻譯基本塊的請求。
- 接收到請求后會使用
tb_htable_lookup
在fork server
進程的cache
中搜索,如果基本塊不在cache
中的話就使用tb_gen_code
翻譯基本塊並放置到fork server
進程的cache
中。
這個函數有兩個 tips
。
- 首先函數里面是死循環,只有當
read
失敗了才會退出循環,read
又是阻塞的,所以只有fd
管道的另一端關閉了才會read
失敗退出函數,所以當子進程執行結束或者由於進程超時被afl-fuzz
殺死后,afl_wait_tsl
就會因為read
失敗而退出該函數,等待接下來的fork
請求。 - 子進程向父進程(
fork server
進程)發送基本塊翻譯請求的原因是讓fork server
進程把子進程剛剛翻譯的基本塊在fork server
進程也翻譯一遍並放入cache
,這樣在后續測試中fork
出的新進程就會由於fork
的特性繼承fork server
的tb cache
,從而避免重復翻譯之前子進程翻譯過的基本塊。
改進版本
源碼地址
https://github.com/vanhauser-thc/AFLplusplus
在原始的 AFL qemu
版本中獲取覆蓋率的方式是在每次翻譯基本塊前調用 afl_maybe_log
往 afl-fuzz
同步覆蓋率信息,這種方式有一個問題就是由於 qemu
會把順序執行的基本塊 chain
一起,這樣可以提升執行速度。但是在這種方式下有的基本塊就會由於 chain
的原因導致追蹤不到基本塊的執行, afl
的處理方式是禁用 qemu
的 chain
功能,這樣則會削減 qemu
的性能。
為此有人提出了一些改進的方式
https://abiondo.me/2018/09/21/improving-afl-qemu-mode/
為了能夠啟用 chain
功能,可以直接把統計覆蓋率的代碼插入到每個翻譯的基本塊的前面
TranslationBlock *tb_gen_code(CPUState *cpu,
............................
............................
tcg_ctx->cpu = ENV_GET_CPU(env);
afl_gen_trace(pc); // 生成統計覆蓋率的代碼
gen_intermediate_code(cpu, tb);
tcg_ctx->cpu = NULL;
............................
afl_gen_trace
的作用是插入一個函數調用在翻譯的基本塊前面,之后在每次執行基本塊前會執行 afl_maybe_log
統計程序執行的覆蓋率信息。
同時為了能夠進一步提升速度可以把子進程生成的 基本塊chain
也同步到 fork server
進程。
bool was_translated = false, was_chained = false;
tb = tb_lookup__cpu_state(cpu, &pc, &cs_base, &flags, cf_mask);
if (tb == NULL) {
mmap_lock();
tb = tb_gen_code(cpu, pc, cs_base, flags, cf_mask);
was_translated = true; // 表示當前基本塊被翻譯了
mmap_unlock();
/* See if we can patch the calling TB. */
if (last_tb) {
tb_add_jump(last_tb, tb_exit, tb);
was_chained = true; // 表示當前基本塊執行了 chain 操作
}
if (was_translated || was_chained) {
// 如果有新翻譯的基本塊或者新構建的 chain 就通知 fork server 更新 cache
afl_request_tsl(pc, cs_base, flags, cf_mask, was_chained ? last_tb : NULL, tb_exit);
}
主要流程就是當有新的基本塊和新的 chain
構建時就通知父進程 (fork server
進程)更新父進程的 cache
.
基於qemu
還可以實現 afl
的 persistent
模式,具體的實現細節就是在被測函數的開始和末尾插入指令
#define AFL_QEMU_TARGET_i386_SNIPPET \
if (is_persistent) { \
\
if (s->pc == afl_persistent_addr) { \
\
I386_RESTORE_STATE_FOR_PERSISTENT; \
\
if (afl_persistent_ret_addr == 0) { \
\
TCGv_ptr paddr = tcg_const_ptr(afl_persistent_addr); \
tcg_gen_st_tl(paddr, cpu_regs[R_ESP], persisent_retaddr_offset); \
\
} \
tcg_gen_afl_call0(&afl_persistent_loop); \
\
} else if (afl_persistent_ret_addr && s->pc == afl_persistent_ret_addr) { \
\
gen_jmp_im(s, afl_persistent_addr); \
gen_eob(s); \
\
} \
\
}
- 在被測函數的開頭(
afl_persistent_addr
)插入指令調用afl_persistent_loop
函數, 該函數的作用是在每次進入被測函數前初始化一些信息,比如存儲程序執行的覆蓋率信息的共享內存。 - 然后在 被測函數的末尾
afl_persistent_ret_addr
增加一條跳轉指令直接跳轉到函數的入口(afl_persistent_addr
) - 通過這樣可以實現不斷對函數進行循環測試
AFL unicorn 模式
源碼地址
https://github.com/vanhauser-thc/AFLplusplus
afl
可以使用 unicorn
來搜集覆蓋率,其實現方式和 qemu
模式類似(因為 unicorn
本身也就是基於 qemu
搞的).它通過在 cpu_exec
執行基本塊前插入設置forkserver
和統計覆蓋率的代碼,這樣在每次執行基本塊時 afl 就能獲取到覆蓋率信息
static tcg_target_ulong cpu_tb_exec(CPUState *cpu, uint8_t *tb_ptr);
@@ -228,6 +231,8 @@
next_tb & TB_EXIT_MASK, tb);
}
AFL_UNICORN_CPU_SNIPPET2; // unicorn 插入的代碼
/* cpu_interrupt might be called while translating the
TB, but before it is linked into a potentially
infinite loop and becomes env->current_tb. Avoid
插入的代碼如下
#define AFL_UNICORN_CPU_SNIPPET2 do { \
if(afl_first_instr == 0) { \ // 如果是第一次執行就設置 forkserver
afl_setup(); \ // 初始化管道
afl_forkserver(env); \ // 設置 fork server
afl_first_instr = 1; \
} \
afl_maybe_log(tb->pc); \ // 統計覆蓋率
} while (0)
和 qemu
類似在執行第一個基本塊時初始化 afl
的命名管道並且設置好 forkserver
,然后通過 afl_maybe_log
與 afl-fuzz
端同步覆蓋率。
forkserver
的作用和 qemu
模式中的類似,主要就是接收命令 fork
新進程並且處理子進程的基本塊翻譯請求來提升執行速度。
libFuzzer unicorn 模式
源碼地址
https://github.com/PAGalaxyLab/uniFuzzer
libfuzzer
支持從外部獲取覆蓋率信息
__attribute__((section("__libfuzzer_extra_counters")))
uint8_t Counters[PCS_N];
上面的定義表示 libfuzzer
從 Counters
里面取出覆蓋率信息來引導變異。
那么下面就簡單了,首先通過 unicorn
的基本塊 hook
事件來搜集執行的基本塊信息,然后在回調函數里面更新Counters
, 就可以把被 unicorn
模擬執行的程序的覆蓋率信息反饋給 libfuzzer
// hook basic block to get code coverage
uc_hook hookHandle;
uc_hook_add(uc, &hookHandle, UC_HOOK_BLOCK, hookBlock, NULL, 1, 0);
下面看看 hookBlock
的實現
// update code coverage counters by hooking basic block
void hookBlock(uc_engine *uc, uint64_t address, uint32_t size, void *user_data) {
uint16_t pr = crc16(address);
uint16_t idx = pr ^ prevPR;
Counters[idx]++;
prevPR = (pr >> 1);
}
其實就是模擬 libfuzzer
統計覆蓋率的方式在 Counters
更新覆蓋率信息並反饋給 libfuzzer
.
總結
通過分析 afl
的 forkserver
機制、 afl qemu
的實現機制以及 afl unicorn
的實現機制可以得出afl
的變異策略調度模塊和被測程序執行和覆蓋率信息搜集模塊是相對獨立的,兩者通過命名管道進行通信。假設我們需要實現一種新的覆蓋率搜集方式並把覆蓋率反饋給 afl
來使用 afl
的 fuzz
策略,我們主要就需要模擬 fork server
和 afl-fuzz
進行通信,然后把覆蓋率反饋給 afl-fuzz
即可。
對於 libfuzzer
而言,它本身就支持從外部獲取程序執行的覆蓋率信息(通過全局變量來傳遞),所以如果要實現新的覆蓋率搜集方式,按照 libfuzzer
的規范來實現即可。