Qemu IO事件處理框架
qemu是基於事件驅動的,在基於KVM的qemu模型中,每一個VCPU對應一個qemu線程,且qemu主線程負責各種事件的監聽,這里有一個小的IO監聽框架,本節對此進行介紹。
1.1 涉及結構
struct GArray { gchar *data; guint len; };
Data指向一個GpollFD數組,len表示數組的個數。
struct GPollFD { gint fd; gushort events; gushort revents; };
Fd為監聽的fd,event為請求監聽的事件,是一組bit的組合。Revents為poll收到的事件,根據此判定當前什么事件可用。
typedef struct IOHandlerRecord { IOCanReadHandler *fd_read_poll; IOHandler *fd_read; IOHandler *fd_write; void *opaque; QLIST_ENTRY(IOHandlerRecord) next; int fd; int pollfds_idx; bool deleted; } IOHandlerRecord;
該結構是qemu中IO框架的處理單位,fd_read和fd_write為注冊的對應處理函數。Next表示該結構會連接在一個全局的鏈表上,fd是對應的fd,delete標志是否需要從鏈表中刪除該結構。
1.2 代碼分析
1.2.1 初始化階段
Main-> qemu_init_main_loop
Main-> main_loop-> main_loop_wait
qemu_init_main_loop
int qemu_init_main_loop(void) { int ret; GSource *src; init_clocks(); ret = qemu_signal_init(); if (ret) { return ret; } //malloc a globle fd array gpollfds = g_array_new(FALSE, FALSE, sizeof(GPollFD)); //create a aio context qemu_aio_context = aio_context_new(); //get event source from aio context src = aio_get_g_source(qemu_aio_context); //add source to main loop g_source_attach(src, NULL); g_source_unref(src); return 0; }
Qemu中的main loop主要采用 了glib中的事件循環,關於此詳細內容,准備后面專門寫一小節,本節主要看主體IO框架。
該函數主要就分配了一個Garray結構存儲全局的GpollFD,在main_loop中的main_loop_wait階段有兩個比較重要的函數:qemu_iohandler_fill,os_host_main_loop_wait和qemu_iohandler_poll,前者把用戶添加的fd信息注冊到剛才分配的Garray結構中,os_host_main_loop_wait對事件進行監聽,qemu_iohandler_poll對接收到的事件進行處理。
1.2.2 添加fd
用戶添加fd的函數為qemu_set_fd_handler,參數中fd為本次添加的fd,后面分別是對該fd的處理函數(read or write),最后opaque為處理函數的參數。
int qemu_set_fd_handler(int fd, IOHandler *fd_read, IOHandler *fd_write, void *opaque) { return qemu_set_fd_handler2(fd, NULL, fd_read, fd_write, opaque); }
可見該函數直接調用了qemu_set_fd_handler2:
int qemu_set_fd_handler2(int fd, IOCanReadHandler *fd_read_poll, IOHandler *fd_read, IOHandler *fd_write, void *opaque) { IOHandlerRecord *ioh; assert(fd >= 0); //if read and write are null,delete if (!fd_read && !fd_write) { QLIST_FOREACH(ioh, &io_handlers, next) { if (ioh->fd == fd) { ioh->deleted = 1; break; } } } else {//find and goto find QLIST_FOREACH(ioh, &io_handlers, next) { if (ioh->fd == fd) goto found; } ioh = g_malloc0(sizeof(IOHandlerRecord)); //insert ioh to io_handlers list QLIST_INSERT_HEAD(&io_handlers, ioh, next); found: ioh->fd = fd; ioh->fd_read_poll = fd_read_poll; ioh->fd_read = fd_read; ioh->fd_write = fd_write; ioh->opaque = opaque; ioh->pollfds_idx = -1; ioh->deleted = 0; qemu_notify_event(); } return 0; }
這里判斷如果read和write函數均為空的話就表示本次是要delete某個fd,就遍歷所有的io_handlers,對指定的fd對應的IOHandlerRecord標志delete。
否則還有兩種情況,添加或者更新。所以首先還是要從io_handlers找一下,如果找到直接更新,否則新創建一個IOHandlerRecord,然后再添加信息。具體信息內容就比較簡單。
1.2.3 處理fd
在main_loop_wait函數中,通過os_host_main_loop_wait對fd進行監聽,當然並不是它直接監聽,而是通過glib的接口。
當os_host_main_loop_wait返回后,就表示當前有可用的事件,在main_loop_wait函數中,調用了qemu_iohandler_poll函數對fd進行處理。
void qemu_iohandler_poll(GArray *pollfds, int ret) { if (ret > 0) { IOHandlerRecord *pioh, *ioh; QLIST_FOREACH_SAFE(ioh, &io_handlers, next, pioh) { int revents = 0; if (!ioh->deleted && ioh->pollfds_idx != -1) { GPollFD *pfd = &g_array_index(pollfds, GPollFD, ioh->pollfds_idx); revents = pfd->revents; } if (!ioh->deleted && ioh->fd_read && (revents & (G_IO_IN | G_IO_HUP | G_IO_ERR))) { ioh->fd_read(ioh->opaque); } if (!ioh->deleted && ioh->fd_write && (revents & (G_IO_OUT | G_IO_ERR))) { ioh->fd_write(ioh->opaque); } /* Do this last in case read/write handlers marked it for deletion */ if (ioh->deleted) { QLIST_REMOVE(ioh, next); g_free(ioh); } } } }
具體的處理倒也簡單,逐個遍歷io_handlers,對於每個GpollFD,取其revents,判斷delete標志並校驗狀態,根據不同的狀態,調用read或者write回調。最后如果是delete的GpollFD,就從鏈表中remove掉,釋放GpollFD。
補充:針對qemu進程中線程數目的問題,從本節可以發現qemu主線程主要負責事件循環,針對每個虛擬機的VCPU,會有一個子線程為之服務,因此qemu線程數目至少要大於等於1+VCPU數目。
以馬內利!
參考資料:
1、qemu 2.7源碼