基於qemu和unicorn的Fuzz技術分析


前言

本文主要介紹如果使用 qemuunicorn 來搜集程序執行的覆蓋率信息以及如何把搜集到的覆蓋率信息反饋到 fuzzer 中輔助 fuzz 的進行。

AFL Fork Server

為了后面介紹 aflqemu 模式和 unicorn 模式, 首先大概講一下 aflfork server 的實現機制。aflfork server 的通信流程如圖所示

  1. 首先 afl-fuzz 調用 init_forkserver 函數 fork 出一個新進程作為 fork server , 然后等待 fork server 發送 4 個字節的數據, 如果能夠正常接收到數據則表示 fork server 啟動正常。
  2. fork server 起來后會使用 read 阻塞住, 等待 afl-fuzz 發送命令來啟動一個測試進程。
  3. 當需要進行一次測試時,afl-fuzz 會調用 run_target , 首先往管道發送 4 個字節通知 fork serverfork 一個進程來測試。
  4. fork server 新建進程后,會通過管道發送剛剛 fork 出的進程的 pidfork server.
  5. afl-fuzz 根據接收到的 pid 等待測試進程結束,然后根據測試生成的覆蓋率信息來引導后續的測試。

AFL qemu 模式

AFLqemu 模式的實現和 winafl 使用 dynamorio 來插樁的實現方式比較類似,winafl 的實現細節如下

https://xz.aliyun.com/t/5108

原始版本

源碼地址

https://github.com/google/AFL/tree/master/qemu_mode/patches

qemu 在執行一個程序時,從被執行程序的入口點開始對基本塊翻譯並執行,為了提升效率,qemu會把翻譯出來的基本塊存放到 cache 中,當 qemu 要執行一個基本塊時首先判斷基本塊是否在 cache 中,如果在 cache 中則直接執行基本塊,否則會翻譯基本塊並執行。

AFLqemu 模式就是通過在准備執行基本塊的和准備翻譯基本塊的前面增加一些代碼來實現的。首先會在每次執行一個基本塊前調用 AFL_QEMU_CPU_SNIPPET2 來和 afl 通信。

#define AFL_QEMU_CPU_SNIPPET2 do { \
    if(itb->pc == afl_entry_point) { \
      afl_setup(); \
      afl_forkserver(cpu); \
    } \
    afl_maybe_log(itb->pc); \
  } while (0)

如果當前執行的基本塊是 afl_entry_point (即目標程序的入口點),就設置好與 afl 通信的命名管道和共享內存並初始化 fork server ,然后通過 afl_maybe_log 往共享內存中設置覆蓋率信息。統計覆蓋率的方式和 afl 的方式一樣。

  cur_loc  = (cur_loc >> 4) ^ (cur_loc << 8);
  cur_loc &= MAP_SIZE - 1;
  afl_area_ptr[cur_loc ^ prev_loc]++;  // 和 afl 一樣 統計 edge 覆蓋率

fork server 的代碼如下

static void afl_forkserver(CPUState *cpu) {
  
  // 通知 afl-fuzz fork server 啟動正常
  if (write(FORKSRV_FD + 1, tmp, 4) != 4) return;

  // fork server 的主循環,不斷地 fork 新進程
  while (1) {
    // 阻塞地等待 afl-fuzz 發送命令,fork 新進程
    if (read(FORKSRV_FD, tmp, 4) != 4) exit(2);
    
    child_pid = fork(); // fork 新進程
    if (!child_pid) {
	  // 子進程會進入這,關閉通信管道描述符,然后從 afl_forkserver 返回繼續往下執行被測試程序
      afl_fork_child = 1;
      close(FORKSRV_FD);
      close(FORKSRV_FD + 1);
      close(t_fd[0]);
      return;

    }

	// fork server 進程,發送 fork 出來的測試進程的 pid 給 afl-fuzz
    if (write(FORKSRV_FD + 1, &child_pid, 4) != 4) exit(5);
	
	// 不斷等待處理 測試進程的 翻譯基本塊的請求
    afl_wait_tsl(cpu, t_fd[0]);
	
	// 等待子進程結束
    if (waitpid(child_pid, &status, 0) < 0) exit(6);
    if (write(FORKSRV_FD + 1, &status, 4) != 4) exit(7);

  }
}

forkserver 的代碼流程如下

  1. 首先發送數據給 afl-fuzz, 表示 fork server 啟動正常,通知完之后會進入循環阻塞在 read ,直到 afl-fuzz 端發送消息。
  2. 接收到數據后,fork serverfork 出新進程,此時子進程會關閉所有與 afl-fuzz 通信的文件描述符並從 afl_forkserver 返回繼續往下執行被測試程序。而父進程則把剛剛 fork出的測試進程的 pid 通過管道發送給 afl-fuzz
  3. 之后 fork server 進程進入 afl_wait_tsl ,不斷循環處理子進程翻譯基本塊的請求。

下面分析 afl_wait_tsl 的原理, 首先 afl 會在 翻譯基本塊后插入一段代碼

                 tb = tb_gen_code(cpu, pc, cs_base, flags, 0); // 翻譯基本塊
                 AFL_QEMU_CPU_SNIPPET1;  // 通知父進程 (fork server進程) 剛剛翻譯了一個基本塊
                 
#define AFL_QEMU_CPU_SNIPPET1 do { \
    afl_request_tsl(pc, cs_base, flags); \
  } while (0)

afl_request_tsl 就是把測試進程剛剛翻譯的基本塊的信息發送給父進程(fork server 進程)

static void afl_request_tsl(target_ulong pc, target_ulong cb, uint64_t flags) {
  struct afl_tsl t;
  if (!afl_fork_child) return;
  t.pc      = pc;
  t.cs_base = cb;
  t.flags   = flags;
  // 通過管道發送信息給 父進程 (fork server 進程)
  if (write(TSL_FD, &t, sizeof(struct afl_tsl)) != sizeof(struct afl_tsl))
    return;
}

下面看看 afl_wait_tsl 的代碼

static void afl_wait_tsl(CPUState *cpu, int fd) {

  while (1) {
   
    // 死循環不斷接收子進程的翻譯基本塊請求
    if (read(fd, &t, sizeof(struct afl_tsl)) != sizeof(struct afl_tsl))
      break;
	// 去fork server進程的 tb cache 中搜索
    tb = tb_htable_lookup(cpu, t.pc, t.cs_base, t.flags);
	// 如果該基本塊不在在 cache 中就使用 tb_gen_code 翻譯基本塊並放到 cache 中 
    if(!tb) {
      mmap_lock();
      tb_lock();
      tb_gen_code(cpu, t.pc, t.cs_base, t.flags, 0);
      mmap_unlock();
      tb_unlock();
    }
  }
  close(fd);
}

代碼流程如下

  1. 這個函數里面就是一個死循環,不斷地接收測試進程翻譯基本塊的請求。
  2. 接收到請求后會使用 tb_htable_lookupfork server 進程的 cache 中搜索,如果基本塊不在 cache 中的話就使用 tb_gen_code 翻譯基本塊並放置到 fork server 進程的 cache 中。

這個函數有兩個 tips

  1. 首先函數里面是死循環,只有當 read 失敗了才會退出循環,read 又是阻塞的,所以只有 fd 管道的另一端關閉了才會 read 失敗退出函數,所以當子進程執行結束或者由於進程超時被 afl-fuzz 殺死后, afl_wait_tsl 就會因為 read 失敗而退出該函數,等待接下來的 fork 請求。
  2. 子進程向父進程( fork server 進程)發送基本塊翻譯請求的原因是讓 fork server 進程把子進程剛剛翻譯的基本塊在 fork server 進程也翻譯一遍並放入 cache,這樣在后續測試中 fork 出的新進程就會由於 fork 的特性繼承 fork servertb cache,從而避免重復翻譯之前子進程翻譯過的基本塊。

改進版本

源碼地址

https://github.com/vanhauser-thc/AFLplusplus

在原始的 AFL qemu 版本中獲取覆蓋率的方式是在每次翻譯基本塊前調用 afl_maybe_logafl-fuzz 同步覆蓋率信息,這種方式有一個問題就是由於 qemu 會把順序執行的基本塊 chain 一起,這樣可以提升執行速度。但是在這種方式下有的基本塊就會由於 chain 的原因導致追蹤不到基本塊的執行, afl 的處理方式是禁用 qemuchain 功能,這樣則會削減 qemu 的性能。

為此有人提出了一些改進的方式

https://abiondo.me/2018/09/21/improving-afl-qemu-mode/

為了能夠啟用 chain 功能,可以直接把統計覆蓋率的代碼插入到每個翻譯的基本塊的前面

TranslationBlock *tb_gen_code(CPUState *cpu,
     ............................
     ............................
     tcg_ctx->cpu = ENV_GET_CPU(env);
     afl_gen_trace(pc);  // 生成統計覆蓋率的代碼
     gen_intermediate_code(cpu, tb);
     tcg_ctx->cpu = NULL;
     ............................

afl_gen_trace 的作用是插入一個函數調用在翻譯的基本塊前面,之后在每次執行基本塊前會執行 afl_maybe_log 統計程序執行的覆蓋率信息。

同時為了能夠進一步提升速度可以把子進程生成的 基本塊chain 也同步到 fork server 進程。

     bool was_translated = false, was_chained = false;
     tb = tb_lookup__cpu_state(cpu, &pc, &cs_base, &flags, cf_mask);
     if (tb == NULL) {
         mmap_lock();
         tb = tb_gen_code(cpu, pc, cs_base, flags, cf_mask);
         was_translated = true; // 表示當前基本塊被翻譯了
         mmap_unlock();

     /* See if we can patch the calling TB. */
     if (last_tb) {
         tb_add_jump(last_tb, tb_exit, tb);
         was_chained = true; // 表示當前基本塊執行了 chain 操作
     }
     if (was_translated || was_chained) {
     	 // 如果有新翻譯的基本塊或者新構建的 chain 就通知 fork server 更新 cache
         afl_request_tsl(pc, cs_base, flags, cf_mask, was_chained ? last_tb : NULL, tb_exit);
     }

主要流程就是當有新的基本塊和新的 chain 構建時就通知父進程 (fork server進程)更新父進程的 cache.

基於qemu還可以實現 aflpersistent 模式,具體的實現細節就是在被測函數的開始和末尾插入指令

#define AFL_QEMU_TARGET_i386_SNIPPET                                          \
  if (is_persistent) {                                                        \
                                                                              \
    if (s->pc == afl_persistent_addr) {                                       \
                                                                              \
      I386_RESTORE_STATE_FOR_PERSISTENT;                                      \
                                                                              \
      if (afl_persistent_ret_addr == 0) {                                     \
                                                                              \
        TCGv_ptr paddr = tcg_const_ptr(afl_persistent_addr);                  \
        tcg_gen_st_tl(paddr, cpu_regs[R_ESP], persisent_retaddr_offset);      \
                                                                              \
      }                                                                       \
      tcg_gen_afl_call0(&afl_persistent_loop);                                \
                                                                              \
    } else if (afl_persistent_ret_addr && s->pc == afl_persistent_ret_addr) { \
                                                                              \
      gen_jmp_im(s, afl_persistent_addr);                                     \
      gen_eob(s);                                                             \
                                                                              \
    }                                                                         \
                                                                              \
  }
  1. 在被測函數的開頭(afl_persistent_addr)插入指令調用 afl_persistent_loop 函數, 該函數的作用是在每次進入被測函數前初始化一些信息,比如存儲程序執行的覆蓋率信息的共享內存。
  2. 然后在 被測函數的末尾 afl_persistent_ret_addr 增加一條跳轉指令直接跳轉到函數的入口(afl_persistent_addr)
  3. 通過這樣可以實現不斷對函數進行循環測試

AFL unicorn 模式

源碼地址

https://github.com/vanhauser-thc/AFLplusplus

afl 可以使用 unicorn 來搜集覆蓋率,其實現方式和 qemu 模式類似(因為 unicorn 本身也就是基於 qemu 搞的).它通過在 cpu_exec 執行基本塊前插入設置forkserver和統計覆蓋率的代碼,這樣在每次執行基本塊時 afl 就能獲取到覆蓋率信息

 static tcg_target_ulong cpu_tb_exec(CPUState *cpu, uint8_t *tb_ptr);
@@ -228,6 +231,8 @@
                             next_tb & TB_EXIT_MASK, tb);
                 }
 
                 AFL_UNICORN_CPU_SNIPPET2; // unicorn 插入的代碼
                 /* cpu_interrupt might be called while translating the
                    TB, but before it is linked into a potentially
                    infinite loop and becomes env->current_tb. Avoid

插入的代碼如下

#define AFL_UNICORN_CPU_SNIPPET2 do { \
    if(afl_first_instr == 0) { \  // 如果是第一次執行就設置 forkserver
      afl_setup(); \  // 初始化管道
      afl_forkserver(env); \  // 設置 fork server
      afl_first_instr = 1; \
    } \
    afl_maybe_log(tb->pc); \  // 統計覆蓋率
  } while (0)

qemu 類似在執行第一個基本塊時初始化 afl 的命名管道並且設置好 forkserver,然后通過 afl_maybe_logafl-fuzz 端同步覆蓋率。

forkserver 的作用和 qemu 模式中的類似,主要就是接收命令 fork 新進程並且處理子進程的基本塊翻譯請求來提升執行速度。

libFuzzer unicorn 模式

源碼地址

https://github.com/PAGalaxyLab/uniFuzzer

libfuzzer 支持從外部獲取覆蓋率信息

__attribute__((section("__libfuzzer_extra_counters")))
uint8_t Counters[PCS_N];

上面的定義表示 libfuzzerCounters 里面取出覆蓋率信息來引導變異。

那么下面就簡單了,首先通過 unicorn 的基本塊 hook 事件來搜集執行的基本塊信息,然后在回調函數里面更新Counters, 就可以把被 unicorn 模擬執行的程序的覆蓋率信息反饋給 libfuzzer

    // hook basic block to get code coverage
    uc_hook hookHandle;
    uc_hook_add(uc, &hookHandle, UC_HOOK_BLOCK, hookBlock, NULL, 1, 0);

下面看看 hookBlock 的實現

// update code coverage counters by hooking basic block
void hookBlock(uc_engine *uc, uint64_t address, uint32_t size, void *user_data) {
    uint16_t pr = crc16(address);
    uint16_t idx = pr ^ prevPR;
    Counters[idx]++;
    prevPR = (pr >> 1);
}

其實就是模擬 libfuzzer 統計覆蓋率的方式在 Counters 更新覆蓋率信息並反饋給 libfuzzer.

總結

通過分析 aflforkserver 機制、 afl qemu的實現機制以及 afl unicorn 的實現機制可以得出afl 的變異策略調度模塊和被測程序執行和覆蓋率信息搜集模塊是相對獨立的,兩者通過命名管道進行通信。假設我們需要實現一種新的覆蓋率搜集方式並把覆蓋率反饋給 afl 來使用 aflfuzz 策略,我們主要就需要模擬 fork serverafl-fuzz 進行通信,然后把覆蓋率反饋給 afl-fuzz 即可。

對於 libfuzzer 而言,它本身就支持從外部獲取程序執行的覆蓋率信息(通過全局變量來傳遞),所以如果要實現新的覆蓋率搜集方式,按照 libfuzzer 的規范來實現即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM