2016-11-15
本來這是在前端驅動后期分析的,但是這部分內容比較多,且分析了后端notify前端的機制,所以還是單獨拿出一節分析比較好!
還是拿網絡驅動部分做案例,網絡驅動部分有兩個隊列,(忽略控制隊列):接收隊列和發送隊列;每個隊列都對應一個virtqueue,兩個隊列之間是互不影響的。
前后端利用virtqueue的方式如下圖所示:
這里再詳細的描述下,當兩個queue都需要客戶機填充buffer,ReceiveQueue需要客戶機 driver提前填充分配好的空buffer,然后記錄到availRing,並在恰當的時機通知后端,當外部網絡有數據包到達時,qemu后端就從availRing 中獲取一個buffer,然后填充數據,完事后記錄buffer head index到usedRing.最后在恰當的時機通知客戶機(向客戶機注入中斷),客戶機接收到信號便知道有數據包到達,這里只需要從usedRing 中獲取到index,然后取data數組的第i個元素即可。因為在客戶機填充buffer的時候把邏輯buffer的指針保存在data數組中。
而SendQueue同樣需要客戶機去填充,只不過這里是當客戶機需要發送數據包時,把數據包構造成邏輯buffer,然后填充到send Queue,並在恰當的時機通知后端,qemu后端收到通知就知道那個隊列有請求到達,如果當前沒有處理其他數據包就着手處理這個數據包。具體就同樣是從AvailRing中取出buffer head index,然后從描述符表中get到buffer,這時就需要從buffer中copy數據了,因為要把數據包從host發送出去,然后更新usedRing。最后同樣要在恰當的時機通知客戶機。注意這里客戶機同樣需要從usedRing 中get index,但是這里主要是用於delay notify,因為數據包由客戶機構造,其占用的buffer並不能重復使用,只是每次有數據包就把其構造成buffer而已。
以上便是基本的使用sendqueue和receive的原理,但是還有一點上面我沒有提到,就是通知的那個恰當的時機,那么這個恰當的實際究竟是什么時候呢??在virtIO中有兩種方式控制前后端的notify.
1、flags字段
2、事件觸發
1、在vring_avail和vring_used的flags字段,控制前后端的通信。vring_used中的flags用於通知driver端,當add一個buffer的時候不用notify后端。而vring_avail中的flags用於通知qemu端,當消費一個buffer的時候不用interrupt 客戶機。
2、在virtIO中又加入了另一種機制,需要由driver和qemu自己判斷是否需要通知,也就是設置一個限額,當一端添加buffer或者消費buffer的數量達到指定數目,就觸發事件,從而發生notify或者interrupt。在有這種機制的情況下就忽略了前面所說的flags。
這里我們以receiveQueue為例,分析下前后端的delay notify機制。
在front driver端:
客戶機driver通過NAPI接收數據時,會在可用buffer不足的時候調用函數添加,具體就是try_fill_recv:
static bool try_fill_recv(struct receive_queue *rq, gfp_t gfp)
{
struct virtnet_info *vi = rq->vq->vdev->priv; int err; bool oom; /*循環,每循環一次添加一個buffer,一直到填充滿,即描述符表滿*/ do { if (vi->mergeable_rx_bufs) err = add_recvbuf_mergeable(rq, gfp); else if (vi->big_packets) err = add_recvbuf_big(rq, gfp); else err = add_recvbuf_small(rq, gfp); oom = err == -ENOMEM; if (err) break; ++rq->num; } while (rq->vq->num_free); if (unlikely(rq->num > rq->max)) rq->max = rq->num; /*通知后端*/ virtqueue_kick(rq->vq); return !oom; }
至於添加的是哪種類型的buffer,我們這里並不關心,循環結束就調用virtqueue_kick(rq->vq)函數,此時參數是接收隊列的virtqueue,
接下來就調用到了virtqueue_kick_prepare函數,該函數判斷當前應不應該通知后端。先看下函數的代碼:
1 bool virtqueue_kick_prepare(struct virtqueue *_vq)
2 { 3 struct vring_virtqueue *vq = to_vvq(_vq); 4 u16 new, old; 5 bool needs_kick; 6 7 START_USE(vq); 8 /* We need to expose available array entries before checking avail 9 * event. */ 10 virtio_mb(vq->weak_barriers); 11 12 old = vq->vring.avail->idx - vq->num_added; 13 new = vq->vring.avail->idx; 14 vq->num_added = 0; 15 16 #ifdef DEBUG 17 if (vq->last_add_time_valid) { 18 WARN_ON(ktime_to_ms(ktime_sub(ktime_get(), 19 vq->last_add_time)) > 100); 20 } 21 vq->last_add_time_valid = false; 22 #endif 23 24 if (vq->event) { 25 needs_kick = vring_need_event(vring_avail_event(&vq->vring), 26 new, old); 27 } else { 28 needs_kick = !(vq->vring.used->flags & VRING_USED_F_NO_NOTIFY); 29 } 30 END_USE(vq); 31 return needs_kick;
這里面涉及到幾個變量,old是add_sg之前的avail.idx,而new是當前的avail.idx,還有一個是vring_avail_event(&vq->vring),看具體的實現:
1 #define vring_avail_event(vr) (*(__u16 *)&(vr)->used->ring[(vr)->num])
可以看到這里是VRingUsed中的ring數組最后一項的值,該值在后端驅動從virtqueue中pop一個elem之前設置成相應隊列的下一個將要使用的index,即last_avail_index。
看下vring_need_event函數:
1 static inline int vring_need_event(__u16 event_idx, __u16 new_idx, __u16 old)
2 { 3 /* Note: Xen has similar logic for notification hold-off 4 * in include/xen/interface/io/ring.h with req_event and req_prod 5 * corresponding to event_idx + 1 and new_idx respectively. 6 * Note also that req_event and req_prod in Xen start at 1, 7 * event indexes in virtio start at 0. */ 8 return (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old); 9 }
前后端通過對比 (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old)來判斷是否需要notify后端,這在數據量比較大的時候顯得很實用。在初始狀態下,即在qemu一個buffer還沒有使用的情況下,event_idx必然是0,那么此時這里的判斷肯定為真,所以notify后端。后端收到通知就從virtqueue中pop buffer,同時在此之前需要設置event_idx,代碼見qemu virtio.c的virtqueue_pop函數:
1 void *virtqueue_pop(VirtQueue *vq, size_t sz)
2 { 3 ...... 4 5 i = head = virtqueue_get_head(vq, vq->last_avail_idx++); 6 if (virtio_vdev_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX)) { 7 vring_set_avail_event(vq, vq->last_avail_idx); 8 } 10 ...... 11 }
如果是初始化狀態,即當前是首次執行virtqueue_pop函數,last_avail_idx=0,在++后就成了1,然后設置此值到UsedRing.ring[]數組的最后一項:
1 static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val)
2 { 3 hwaddr pa; 4 if (!vq->notification) { 5 return; 6 } 7 pa = vq->vring.used + offsetof(VRingUsed, ring[vq->vring.num]); 8 virtio_stw_phys(vq->vdev, pa, val); 9 }
設置成功后就執行pop之后的處理,寫入數據完成后,調用后端的 virtio_notify(vdev, q->rx_vq)函數。該函數執行前同樣需要判斷是否需要notify,具體函數為virtio_should_notify
bool virtio_should_notify(VirtIODevice *vdev, VirtQueue *vq)
{
uint16_t old, new; bool v; /* We need to expose used array entries before checking used event. */ smp_mb(); /* Always notify when queue is empty (when feature acknowledge) */ if (virtio_vdev_has_feature(vdev, VIRTIO_F_NOTIFY_ON_EMPTY) && !vq->inuse && virtio_queue_empty(vq)) { return true; } if (!virtio_vdev_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX)) { return !(vring_avail_flags(vq) & VRING_AVAIL_F_NO_INTERRUPT); } v = vq->signalled_used_valid; vq->signalled_used_valid = true; old = vq->signalled_used; new = vq->signalled_used = vq->used_idx; return !v || vring_need_event(vring_get_used_event(vq), new, old); }
該函數邏輯和前端driver總的判斷函數大致類似,但是還是有些不同,首先,如果隊列為空即當前沒有可用buffer了,那么必然會notify前端;
接着判斷是否支持這樣事件觸發式的方式即VIRTIO_RING_F_EVENT_IDX,如果不支持,就通過flags字段來判斷。而如果支持,就通過事件觸發來通知。
這里有兩個條件:第一個是v = vq->signalled_used_valid和vring_need_event(vring_get_used_event(vq), new, old)
v = vq->signalled_used_valid在初始化的時候被設置成false,表示還沒有向前端做任何通知,而后再每次的virtio_should_notify中就會設置成true,並更新vq->signalled_used = vq->used_idx;所以如果是首次嘗試通知前端,則總能成功,否則需要判斷vring_need_event(vring_get_used_event(vq), new, old),該函數具體是根前面邏輯是一樣的,正如前面所說,這是第一次嘗試通知,所以總能成功。而vring_get_used_event(vq)是VRingAvail.ring[]數組的最后一項的值,該值在客戶機driver中被設置
在次回到linux driver中,就會從usedRing中取buffer,同樣每取出一個buffer就會設置used_event,代碼見virtio_ring.c的virtqueue_get_buf函數,設置的值是vq->last_used_idx,記錄客戶機處理位置。
1 void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len)
2 { 3 4 ...... 5 if (!(vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { 6 vring_used_event(&vq->vring) = vq->last_used_idx; 7 virtio_mb(vq->weak_barriers); 8 } 9 ...... 10 11 }
到目前為止,基本一次完整的交互已經完成了,但是由於是初次交互,前后端的delay機制都沒起作用,判斷條件中使用到的event_idx已經更新了,假如說首次add 8個buffer,然后通知了后端,並且后端使用了三個buffer並首次notify前端,此時 后端向第4個buffer中寫數據,last_avail_idx=4(從0開始),那么used_event=4,此時前端發現可用buffer不足,需要添加,那么本次添加了5個,即new=8+5=13,old=8,new-old=5,而此時new-used_event-1=8,條件不滿足,所以此時前端driver添加的buffer就不用notify后端。而話說這段時間后端又處理好了第二個數據包,使用了3個buffer。但不幸,前端還在處理第二個buffer,即last_used_idx=2,則used_event=2;對於后端來講new-old=3,new-used_event-1=3,條件不滿足,所以也不用通知。這樣delay notify的機制便顯示出效果了。筆者認為這其實本質上就是一場速度的對決,為了保證公平,即使一方處理快,也不能任意向另一端發送數據,只能待對方處理的差不多了你才能發,這樣發送一方可以歇歇,而接受一方也不會因為處理不及而丟棄,從而造成浪費!哈哈,真是無規矩不成方圓!
具體通知方式:
前面已經提到前端或者后端完成某個操作需要通知另一端的時候需要某種notify機制。這個notify機制是啥呢?這里分為兩個方向
1、guest->host
前面也已經介紹,當前端想通知后端時,會調用virtqueue_kick函數,繼而調用virtqueue_notify,對應virtqueue結構中的notify函數,在初始化的時候被初始化成vp_notify(virtio_pci.c中),看下該函數的實現
static void vp_notify(struct virtqueue *vq) { struct virtio_pci_device *vp_dev = to_vp_device(vq->vdev); /* we write the queue's selector into the notification register to * signal the other end */ iowrite16(vq->index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_NOTIFY); }
可以看到這里僅僅是吧vq的index編號寫入到設備的IO地址空間中,實際上就是設備對應的PCI配置空間中VIRTIO_PCI_QUEUE_NOTIFY位置。這里執行IO操作會引發VM-exit,繼而退出到KVM->qemu中處理。看下后端驅動的處理方式。在qemu代碼中virtio-pci.c文件中有函數virtio_ioport_write專門處理前端驅動的IO寫操作,看
case VIRTIO_PCI_QUEUE_NOTIFY: if (val < VIRTIO_PCI_QUEUE_MAX) { virtio_queue_notify(vdev, val); } break;
這里首先判斷隊列號是否在合法范圍內,然后調用virtio_queue_notify函數,而最終會調用到virtio_queue_notify_vq,該函數其實僅僅調用了VirtQueue結構中綁定的處理函數handle_output,該函數根據不同的設備有不同的實現,比如網卡有網卡的實現,而塊設備有塊設備的實現。以網卡為例看看創建VirtQueue的時候給綁定的是哪個函數。在virtio-net,c中的virtio_net_init,可以看到這里給接收隊列綁定的是virtio_net_handle_rx,而給發送隊列綁定的是virtio_net_handle_tx_bh或者virtio_net_handle_tx_timer。而對於塊設備則對應的是virtio_blk_handle_output函數。
2、host->guest
host通知guest當然是通過注入中斷的方式,首先調用的是virtio_notify,繼而調用virtio_notify_vector並把中斷向量作為參數傳遞進去。這里就調用了設備關聯的notify函數,具體實現為virtio_pci_notify函數,常規中斷(非MSI)會調用qemu_set_irq,在8259a中斷控制器的情況下回調用kvm_pic_set_irq,然后到了kvm_set_irq,這里就會通過kvm_vm_ioctl和KVM交互,接口為KVM_IRQ_LINE,通知KVM對guest進行中斷的注入。KVm里的kvm_vm_ioctl函數會對此調用進行處理,具體就是調用kvm_vm_ioctl_irq_line,之后就調用kvm_set_irq函數進行注入了。之后的流程參看中斷虛擬化部分。
共享內存
前面提到,在guest通知host的時候,是把隊列的索引寫入到了配置空間的VIRTIO_PCI_QUEUE_NOTIFY字段,但是僅僅一個索引是怎么找到指定的隊列,且數據時什么時候到達后端的呢?這就用到了共享內存。我們知道的是前后端的確通過共享內存的方式傳遞數據,但是數據的地址是怎么傳遞到后端的,這是個問題。本小節主要分析下這個問題。
為了便於理解我們先闡述其原理,然后結合代碼看具體的實現。實際上前后端在初始化后就共享了一段連續的內存區,注意這里是物理上連續的內存區(GPA),由客戶機內部初始化隊列的時候分配,所以這里就是需要和伙伴系統交互。這段內存區的結構如下圖所示
對於vring了解的朋友應該很熟悉這個結構,沒錯,這就是通過vring管理的結構,換句話說,前后端直接共享的其實是vring。也就是說針對同一個隊列(比如網卡的發送隊列),前后端已經形成一種協議,通過這段內存區交換數據的地址信息。在把數據的地址信息寫入到desc數組中后,僅僅需要通知另一端,另一端就知道從哪里取出數據。當然還是通過desc數組。具體數據的傳遞過程參見其他小結。因此在初始化階段,前端分配好內存區,並初始化好前端的vring后,就把內存區的信息傳遞到后端,后端也利用這個內存區的信息初始化隊列相關的vring。這樣vring就在前后端保持了一致。原理就是如此,下面看具體初始化代碼:
前端:
virtnet_probe->init_vqs->virtnet_find_vqs->vi->vdev->config->find_vqs(vp_find_vqs)->vp_try_to_find_vqs->setup_vq,在setup_vp中通過IO端口和后端交互完成前面我們說的協議。看下該函數
static struct virtqueue *setup_vq(struct virtio_device *vdev, unsigned index, void (*callback)(struct virtqueue *vq), const char *name, u16 msix_vec) { struct virtio_pci_device *vp_dev = to_vp_device(vdev); struct virtio_pci_vq_info *info; struct virtqueue *vq; unsigned long flags, size; u16 num; int err; /* Select the queue we're interested in */ iowrite16(index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_SEL); /* Check if queue is either not available or already active. */ num = ioread16(vp_dev->ioaddr + VIRTIO_PCI_QUEUE_NUM); if (!num || ioread32(vp_dev->ioaddr + VIRTIO_PCI_QUEUE_PFN)) return ERR_PTR(-ENOENT); /* allocate and fill out our structure the represents an active * queue */ info = kmalloc(sizeof(struct virtio_pci_vq_info), GFP_KERNEL); if (!info) return ERR_PTR(-ENOMEM); info->num = num; info->msix_vector = msix_vec; size = PAGE_ALIGN(vring_size(num, VIRTIO_PCI_VRING_ALIGN)); info->queue = alloc_pages_exact(size, GFP_KERNEL|__GFP_ZERO); if (info->queue == NULL) { err = -ENOMEM; goto out_info; } /* activate the queue */ /*這里實際上把info->queue的GPA(頁框號寫入到了設備的VIRTIO_PCI_QUEUE_PFN)*/ iowrite32(virt_to_phys(info->queue) >> VIRTIO_PCI_QUEUE_ADDR_SHIFT, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_PFN); /* create the vring */ vq = vring_new_virtqueue(index, info->num, VIRTIO_PCI_VRING_ALIGN, vdev, true, info->queue, vp_notify, callback, name); if (!vq) { err = -ENOMEM; goto out_activate_queue; } vq->priv = info; info->vq = vq; if (msix_vec != VIRTIO_MSI_NO_VECTOR) { iowrite16(msix_vec, vp_dev->ioaddr + VIRTIO_MSI_QUEUE_VECTOR); msix_vec = ioread16(vp_dev->ioaddr + VIRTIO_MSI_QUEUE_VECTOR); if (msix_vec == VIRTIO_MSI_NO_VECTOR) { err = -EBUSY; goto out_assign; } } if (callback) { spin_lock_irqsave(&vp_dev->lock, flags); list_add(&info->node, &vp_dev->virtqueues); spin_unlock_irqrestore(&vp_dev->lock, flags); } else { INIT_LIST_HEAD(&info->node); } return vq; out_assign: vring_del_virtqueue(vq); out_activate_queue: iowrite32(0, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_PFN); free_pages_exact(info->queue, size); out_info: kfree(info); return ERR_PTR(err); }
注意協商的步驟,首先通過VIRTIO_PCI_QUEUE_SEL標記本次操作的隊列索引,因為每個隊列都有自己的vring,即需要自己的共享內存區。然后檢查隊列是否可用,這是通過VIRTIO_PCI_QUEUE_NUM,如果返回的結果是0,則表示沒有隊列可用,則返回錯誤。接着通過VIRTIO_PCI_QUEUE_PFN檢查是否已經激活,如果已經激活,同樣返回錯誤。這些檢查通過就可以予以初始化了,具體先分配一個中間結構virtio_pci_vq_info,這不是重點,后面通過alloc_pages_exact向伙伴系統分配了不小於size的連續物理內存,等會我們再說size的問題,然后把這塊物理頁框號(GPA>>VIRTIO_PCI_QUEUE_ADDR_SHIFT)寫入到VIRTIO_PCI_QUEUE_PFN,這樣后端就會得到這塊內存區的信息。然后我們先看下前端利用這塊內存區做了什么?看下面的vring_new_virtqueue函數,該函數中調用vring_init來初始化vring
static inline void vring_init(struct vring *vr, unsigned int num, void *p, unsigned long align) { vr->num = num; vr->desc = p; vr->avail = p + num*sizeof(struct vring_desc); vr->used = (void *)(((unsigned long)&vr->avail->ring[num] + sizeof(__u16) + align-1) & ~(align - 1)); }
這個函數正好體現了我們前面那個結構圖。這樣前端vring就初始化好了。對隊列填充數據時就是根據這個vring填充信息。
后端(qemu端)
主要操作都在virtio_ioport_write中,我們只關注三個case
case VIRTIO_PCI_QUEUE_PFN: /*pa就是desc的GPA*/ pa = (target_phys_addr_t)val << VIRTIO_PCI_QUEUE_ADDR_SHIFT; if (pa == 0) { virtio_pci_stop_ioeventfd(proxy); virtio_reset(proxy->vdev); msix_unuse_all_vectors(&proxy->pci_dev); } else virtio_queue_set_addr(vdev, vdev->queue_sel, pa); break; case VIRTIO_PCI_QUEUE_SEL: if (val < VIRTIO_PCI_QUEUE_MAX) vdev->queue_sel = val; break; case VIRTIO_PCI_QUEUE_NOTIFY: if (val < VIRTIO_PCI_QUEUE_MAX) { virtio_queue_notify(vdev, val); } break;
可以看到在VIRTIO_PCI_QUEUE_SEL時候,僅僅是標記了下設備中的queue_sel表示當前操作的隊列索引。下面在通過VIRTIO_PCI_QUEUE_PFN傳遞地址的時候,調用virtio_queue_set_addr設置后端相關隊列的vring該函數實現較簡單
void virtio_queue_set_addr(VirtIODevice *vdev, int n, target_phys_addr_t addr) { vdev->vq[n].pa = addr; virtqueue_init(&vdev->vq[n]); }
static void virtqueue_init(VirtQueue *vq) { target_phys_addr_t pa = vq->pa; vq->vring.desc = pa; vq->vring.avail = pa + vq->vring.num * sizeof(VRingDesc); vq->vring.used = vring_align(vq->vring.avail + offsetof(VRingAvail, ring[vq->vring.num]), VIRTIO_PCI_VRING_ALIGN); }
看到這里有么有很面熟,沒錯,這個函數和前端初始化vring的函數很是類似,這樣前后端的vring就同步起來了……
而在guest通知后端的時候,通過VIRTIO_PCI_QUEUE_NOTIFY接口,該函數調用了virtio_queue_notify_vq繼而調用 vq->handle_output……就這樣,后端就得到通知着手處理了!
后記:
到此,virtIO部分已經分析的差不多了,分析期間真實感覺到了自己知識的匱乏,其間多次向開發者求助,並均得到認真回復,在此在此感謝這些優秀的開發者。有時候看內核代碼就感覺工程師和硬件在干仗,站在工程師的角度,需要盡其所能榨取硬件的性能。大到實現算法的優化,小到分析程序執行流的概率,從而針對編譯做優化。站在硬件的角度,你處理不好,我就不給你工作。而從這方面,工程師自然是完勝,並且還在不遺余力的朝着勝利的另一個境界挺近,即征服硬件!哈哈,不過誰都知道,這是一場沒有勝負的戰爭,工程師自然優秀,但是,因為工程師內部的競爭,這樣戰斗將永無休止!!唉,瞎扯淡了,各位朋友,下篇文章見!