2016-11-08
前段時間大致整理了下virtIO后端驅動的工作模式以及原理,今天就從前端驅動的角度描述下目前Linux內核代碼中的virtIO驅動是如何配合后端進行工作的。
注:本節代碼參考Linux 內核3.11.1代碼
virtIO驅動從架構上來講可以分為兩部分,一個是其作為PCI設備本身的驅動,此驅動需要提供一些基本的操作PCI設備本身的函數比如PCI設備的探測、刪除、配置空間的設置和寄存器空間的讀寫等。而另一個就是其virtIO設備本身實現的功能驅動例如網絡驅動、塊設備驅動、console驅動等。所以我們要看還是分兩部分,先介紹PCI設備本身的驅動,然后在介紹實際功能驅動。
一、PCI設備本身驅動
在前面的PCI系列文章中對Linux內核中PCI設備驅動做了分析,所以這里我們只分析和virtIO相關的部分。
二、功能驅動部分
其實大部分的功能在后端驅動已經介紹,只是有些功能是在前端實現的,比如說virtqueue的初始化、avail buffer的添加以及used buffer的消費,還有比較很重要的是前后端vring的同步。
鑒於前面已經有了基本的概念基礎,那么我們直接從網絡驅動下手,分析驅動從注冊到接受數據的整個流程。(參考代碼virtio-net.c)
看下網絡驅動注冊的操作函數:
1 static const struct net_device_ops virtnet_netdev = { 2 .ndo_open = virtnet_open, 3 .ndo_stop = virtnet_close, 4 .ndo_start_xmit = start_xmit, 5 .ndo_validate_addr = eth_validate_addr, 6 .ndo_set_mac_address = virtnet_set_mac_address, 7 .ndo_set_rx_mode = virtnet_set_rx_mode, 8 .ndo_change_mtu = virtnet_change_mtu, 9 .ndo_get_stats64 = virtnet_stats, 10 .ndo_vlan_rx_add_vid = virtnet_vlan_rx_add_vid, 11 .ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid, 12 .ndo_select_queue = virtnet_select_queue, 13 #ifdef CONFIG_NET_POLL_CONTROLLER 14 .ndo_poll_controller = virtnet_netpoll, 15 #endif 16 };
發送數據的函數為start_xmit,該函數接收來自網絡協議棧的函數並寫入到ring buffer中,然后通知后端驅動。
1 static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) 2 { 3 struct virtnet_info *vi = netdev_priv(dev); 4 int qnum = skb_get_queue_mapping(skb); 5 struct send_queue *sq = &vi->sq[qnum]; 6 int err; 7 8 /* Free up any pending old buffers before queueing new ones. */ 9 free_old_xmit_skbs(sq); 10 11 /* Try to transmit */ 12 err = xmit_skb(sq, skb); 13 14 /* This should not happen! */ 15 if (unlikely(err)) { 16 dev->stats.tx_fifo_errors++; 17 if (net_ratelimit()) 18 dev_warn(&dev->dev, 19 "Unexpected TXQ (%d) queue failure: %d\n", qnum, err); 20 dev->stats.tx_dropped++; 21 kfree_skb(skb); 22 return NETDEV_TX_OK; 23 } 24 /*通知后端驅動*/ 25 virtqueue_kick(sq->vq); 26 27 /* Don't wait up for transmitted skbs to be freed. */ 28 skb_orphan(skb); 29 nf_reset(skb); 30 31 /* Apparently nice girls don't return TX_BUSY; stop the queue 32 * before it gets out of hand. Naturally, this wastes entries. */ 33 if (sq->vq->num_free < 2+MAX_SKB_FRAGS) { 34 netif_stop_subqueue(dev, qnum); 35 if (unlikely(!virtqueue_enable_cb_delayed(sq->vq))) { 36 /* More just got used, free them then recheck. */ 37 free_old_xmit_skbs(sq); 38 if (sq->vq->num_free >= 2+MAX_SKB_FRAGS) { 39 netif_start_subqueue(dev, qnum); 40 virtqueue_disable_cb(sq->vq); 41 } 42 } 43 } 45 return NETDEV_TX_OK; 46 }
函數中首先獲取了buffer對應的發送隊列sendqueue,調用了一個關鍵的函數xmit_skb,具體的添加buffer到queue中的操作就是在此函數實現的:
1 static int xmit_skb(struct send_queue *sq, struct sk_buff *skb) 2 { 3 struct skb_vnet_hdr *hdr = skb_vnet_hdr(skb); 4 const unsigned char *dest = ((struct ethhdr *)skb->data)->h_dest; 5 struct virtnet_info *vi = sq->vq->vdev->priv; 6 unsigned num_sg; 7 8 pr_debug("%s: xmit %p %pM\n", vi->dev->name, skb, dest); 9 10 if (skb->ip_summed == CHECKSUM_PARTIAL) { 11 hdr->hdr.flags = VIRTIO_NET_HDR_F_NEEDS_CSUM; 12 hdr->hdr.csum_start = skb_checksum_start_offset(skb); 13 hdr->hdr.csum_offset = skb->csum_offset; 14 } else { 15 hdr->hdr.flags = 0; 16 hdr->hdr.csum_offset = hdr->hdr.csum_start = 0; 17 } 18 19 if (skb_is_gso(skb)) { 20 hdr->hdr.hdr_len = skb_headlen(skb); 21 hdr->hdr.gso_size = skb_shinfo(skb)->gso_size; 22 if (skb_shinfo(skb)->gso_type & SKB_GSO_TCPV4) 23 hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_TCPV4; 24 else if (skb_shinfo(skb)->gso_type & SKB_GSO_TCPV6) 25 hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_TCPV6; 26 else if (skb_shinfo(skb)->gso_type & SKB_GSO_UDP) 27 hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_UDP; 28 else 29 BUG(); 30 if (skb_shinfo(skb)->gso_type & SKB_GSO_TCP_ECN) 31 hdr->hdr.gso_type |= VIRTIO_NET_HDR_GSO_ECN; 32 } else { 33 hdr->hdr.gso_type = VIRTIO_NET_HDR_GSO_NONE; 34 hdr->hdr.gso_size = hdr->hdr.hdr_len = 0; 35 } 36 37 hdr->mhdr.num_buffers = 0; 38 39 /* Encode metadata header at front. 首個sg entry存儲頭部信息*/ 40 if (vi->mergeable_rx_bufs) 41 sg_set_buf(sq->sg, &hdr->mhdr, sizeof hdr->mhdr); 42 else 43 sg_set_buf(sq->sg, &hdr->hdr, sizeof hdr->hdr); 44 /*映射數據到sg,當前在sq->sg里面已經記錄數據的地址信息了*/ 45 num_sg = skb_to_sgvec(skb, sq->sg + 1, 0, skb->len) + 1; 46 /*調用函數把sg 信息記錄到隊列中的desc中*/ 47 return virtqueue_add_outbuf(sq->vq, sq->sg, num_sg, skb, GFP_ATOMIC); 48 }
這里我們先介紹下兩種virtIO 頭部:
1 struct skb_vnet_hdr { 2 union { 3 struct virtio_net_hdr hdr; 4 struct virtio_net_hdr_mrg_rxbuf mhdr; 5 }; 6 };
里面包含一個union分別是virtio_net_hdr和virtio_net_hdr_mrg_rxbuf,前者是普通的數據包頭部,后者是支持合並buffer的數據包的頭部,並且virtio_net_hdr是virtio_net_hdr_mrg_rxbuf的一個內嵌結構,這樣再看前面的函數代碼
首先判斷硬件是否已經添加了校驗字段,設置virtio_net_hdr中相關的值;然后判斷數據包是否是GSO類型,再次設置virtio_net_hdr相關字段的值。關於GSO類型,文章最后會介紹。設置好頭部后,進入下一個if,判斷設備是否支持合並buffer,是的話就調用函數sg_set_buf把virtio_net_hdr_mrg_rxbuf記錄到首個sg table的第一個表項 中,否則添加virtio_net_hdr。這樣設置好頭部,就調用skb_to_sgvec函數把skb buffer記錄到sg table中,然后調用virtqueue_add_outbuf把sg table轉換到發送隊列的ring desc中。回到上層的函數start_xmit中,在xmit_skb返回后,如果返回值正常,就調用virtqueue_kick函數通知后端驅動。
在通知后端驅動后判斷剩余可用的desc是否小於2+MAX_SKB_FRAGS(為保證安全,一個數據包最多可能使用2+MAX_SKB_FRAGS個物理buffer,virtIO 頭部占用一個,數據包頭部占用一個,剩下的是數據包最大分片數),不小於的話需要調用netif_stop_subqueue禁止下一個數據包的發送。
下面回過頭分析sg_set_buf、skb_to_sgvec和virtqueue_add_outbuf。
1 static inline void sg_set_buf(struct scatterlist *sg, const void *buf, 2 unsigned int buflen) 3 { 4 sg_set_page(sg, virt_to_page(buf), buflen, offset_in_page(buf)); 5 }
在分析的同時我們也看下scatter list是如何組織的。首先看參數sg是sg table 的指針,buf指向數據,buflen是數據的長度。可以看到函數中僅僅是調用了sg_set_page函數,所以這里具體的物理buffer塊是按照頁為單位的。由於buf並不一定是頁對齊的,所以需要一個buf指針到所在頁基址的偏移。
1 static inline void sg_set_page(struct scatterlist *sg, struct page *page, 2 unsigned int len, unsigned int offset) 3 { 4 sg_assign_page(sg, page); 5 sg->offset = offset;//data在頁面中的偏移 6 sg->length = len;//data的長度 7 }
到該函數中,page是一個指向一個頁的指針,該函數中調用了sg_assign_page函數設置sg->page_link指向page,這樣在sg table entry和具體的buffer就聯系起來了。然后把buffer的offset和length記錄到sg entry中。
結合xmit_skb函數,那么在經過sg_set_buffer之后,sg table的第一個表項便和hdr->mhdr或者hdr->hdr聯系起來。
Function skb_to_sgvec
1 int skb_to_sgvec(struct sk_buff *skb, struct scatterlist *sg, int offset, int len) 2 { 3 /*buffer數據存儲在sg的個數*/ 4 int nsg = __skb_to_sgvec(skb, sg, offset, len); 5 /*標記最后一個sg entry結束*/ 6 sg_mark_end(&sg[nsg - 1]); 7 8 return nsg; 9 }
該函數直接調用了__skb_to_sgvec函數,有其實現具體的功能,然后設置最后一個entry為end end entry,以此表明sg list的結束。
1 static int 2 __skb_to_sgvec(struct sk_buff *skb, struct scatterlist *sg, int offset, int len) 3 { 4 int start = skb_headlen(skb); 5 int i, copy = start - offset; 6 struct sk_buff *frag_iter; 7 int elt = 0;/*elt記錄sg entry的個數*/ 8 9 if (copy > 0) {/*copy是頭部的長度*/ 10 if (copy > len)/*頭部大於總長度。。。幾乎不可能*/ 11 copy = len; 12 /*skb->data + offset是數據起始位置,盡管offset一般是0,所以可以看出頭部是占用一個sg entry*/ 13 sg_set_buf(sg, skb->data + offset, copy); 14 elt++; 15 if ((len -= copy) == 0) 16 return elt; 17 /*offset記錄數據copy的位置*/ 18 offset += copy; 19 } 20 /*映射非線性數據即skb_shared_info相關的數據*/ 21 for (i = 0; i < skb_shinfo(skb)->nr_frags; i++) { 22 int end; 23 24 WARN_ON(start > offset + len); 25 26 end = start + skb_frag_size(&skb_shinfo(skb)->frags[i]); 27 if ((copy = end - offset) > 0) { 28 skb_frag_t *frag = &skb_shinfo(skb)->frags[i]; 29 30 if (copy > len) 31 copy = len; 32 sg_set_page(&sg[elt], skb_frag_page(frag), copy, 33 frag->page_offset+offset-start); 34 elt++; 35 if (!(len -= copy)) 36 return elt; 37 offset += copy; 38 } 39 start = end; 40 } 41 42 skb_walk_frags(skb, frag_iter) { 43 int end; 44 45 WARN_ON(start > offset + len); 46 47 end = start + frag_iter->len; 48 if ((copy = end - offset) > 0) { 49 if (copy > len) 50 copy = len; 51 elt += __skb_to_sgvec(frag_iter, sg+elt, offset - start, 52 copy); 53 if ((len -= copy) == 0) 54 return elt; 55 offset += copy; 56 } 57 start = end; 58 } 59 BUG_ON(len); 60 return elt; 61 }
該函數把一個完整的skbuffer記錄到sg table,要搞清楚這些最好對sk_buffer結構比較清楚,而對sk_buffer結構可以參考其的有關專門的介紹。本節我們只介紹相關的部分,這里可以把skbuffer分成兩部分:
1、skbuffer本身的數據
2、skb_shared_info記錄的分片數據
而上面的函數也是把這兩部分分開記錄的,首先調用skb_headlen函數獲取sk_buffer本身的頭部以及數據(不包含分片數據),copy為實際的長度,不過這里傳遞進來的offset為0,所以copy即start,接着就調用了sg_set_buf函數把從skb_buffer->data+offset起始的有效數據記錄到sg table,elt是一個變量記錄使用的sg entry個數。
如果這里沒有分片數據,那么直接返回elt,否則需要記錄offset的位置,便於下次知道上次數據的記錄位置。
下面一個for循環時完成第二部分數據的記錄,即分片數據。sk_buffer->end指向一個skb_shared_info結構,該結構管理分片數據,nr_frags表示分片的數量,所以以此為基添加分片。
循環內部的內容有點混亂感覺,這里詳解解釋下:
注意一下幾個變量:
/*
*len是未復制的數據的長度
*offset是已經復制的數據的長度
*copy是本次要復制的數據的長度
*start 是線性數據段的長度
*映射非線性數據即skb_shared_info相關的數據
*/
其實說實話我個人覺得這幾個變量的命名很是失敗,start和end咋一看容易讓人感覺這是指針,但是沒辦法,說讓咱寫不出這種代碼勒!
在循環之前,start是代表線性數據段的長度,offset在完成映射后就執行offset += copy,所以offset=start。
在循環中end=start +分片size,copy=end-offset,那么實際上,本次copy的長度也就是分片的size。如果分片size大於0,則表示分片存在數據,那么copy=end-offset必定大於0,直接調用sg_set_page函數把當前分片擴展成一個page然后映射到sg table.接着更新len,判斷是否映射完成,即len是否為0,不為0的話更新offset。最后更新start=end.
下面遍歷所有的sk_buffer->frag_list,對於每個sk_buffer,都調用__skb_to_sgvec對其中數據進行映射,最后返回elt即使用的sg entry個數。
/*關於sk_buffer,確實其組織方式很復雜,會單獨講解,礙於篇幅,就不在這里詳細描述*/
回到skb_to_sgvec函數中,調用sg_mark_end對最后一個entry做末端標記。具體而言就是設置sg->page_link第二位為1:sg->page_link |= 0x02;
到這里就把buffer映射到了sg table中。那么如何把sg填入ring desc數組中呢?看virtqueue_add_outbuf
這里需要注意一下傳入的data指針是skb,即數據的虛擬起始地址
1 int virtqueue_add_outbuf(struct virtqueue *vq, 2 struct scatterlist sg[], unsigned int num, 3 void *data, 4 gfp_t gfp) 5 { 6 return virtqueue_add(vq, &sg, sg_next_arr, num, 0, 1, 0, data, gfp); 7 }
這里就是簡單的調用了下virtqueue_add
1 static inline int virtqueue_add(struct virtqueue *_vq, 2 struct scatterlist *sgs[], 3 struct scatterlist *(*next) 4 (struct scatterlist *, unsigned int *), 5 unsigned int total_out, 6 unsigned int total_in, 7 unsigned int out_sgs, 8 unsigned int in_sgs, 9 void *data, 10 gfp_t gfp) 11 { 12 struct vring_virtqueue *vq = to_vvq(_vq); 13 struct scatterlist *sg; 14 unsigned int i, n, avail, uninitialized_var(prev), total_sg; 15 int head; 16 17 START_USE(vq); 18 19 BUG_ON(data == NULL); 20 21 #ifdef DEBUG 22 { 23 ktime_t now = ktime_get(); 24 25 /* No kick or get, with .1 second between? Warn. */ 26 if (vq->last_add_time_valid) 27 WARN_ON(ktime_to_ms(ktime_sub(now, vq->last_add_time)) 28 > 100); 29 vq->last_add_time = now; 30 vq->last_add_time_valid = true; 31 } 32 #endif 33 34 total_sg = total_in + total_out; 35 //這里判斷是否支持間接描述符並且總的entry數要大於1且,vring里至少有一個空buffer 36 /* If the host supports indirect descriptor tables, and we have multiple 37 * buffers, then go indirect. FIXME: tune this threshold */ 38 if (vq->indirect && total_sg > 1 && vq->vq.num_free) { 39 head = vring_add_indirect(vq, sgs, next, total_sg, total_out, 40 total_in, 41 out_sgs, in_sgs, gfp); 42 if (likely(head >= 0))//如果執行成功,就直接執行add_head段 43 goto add_head; 44 } 45 /*否則就可能是不支持間接描述符,那么這是需要有足夠的desc來裝載哪些entry*/ 46 47 BUG_ON(total_sg > vq->vring.num); 48 BUG_ON(total_sg == 0); 49 /*如果可用的desc數量不夠,則不能執行成功*/ 50 if (vq->vq.num_free < total_sg) { 51 pr_debug("Can't add buf len %i - avail = %i\n", 52 total_sg, vq->vq.num_free); 53 /* FIXME: for historical reasons, we force a notify here if 54 * there are outgoing parts to the buffer. Presumably the 55 * host should service the ring ASAP. */ 56 if (out_sgs) 57 vq->notify(&vq->vq); 58 END_USE(vq); 59 return -ENOSPC; 60 } 61 62 /* We're about to use some buffers from the free list. */ 63 vq->vq.num_free -= total_sg; 64 /*可用的desc數量夠的話就可以直接使用這些desc,針對desc的操作都是一樣的*/ 65 head = i = vq->free_head; 66 for (n = 0; n < out_sgs; n++) { 67 for (sg = sgs[n]; sg; sg = next(sg, &total_out)) { 68 vq->vring.desc[i].flags = VRING_DESC_F_NEXT; 69 vq->vring.desc[i].addr = sg_phys(sg); 70 vq->vring.desc[i].len = sg->length; 71 prev = i; 72 i = vq->vring.desc[i].next; 73 } 74 } 75 for (; n < (out_sgs + in_sgs); n++) { 76 for (sg = sgs[n]; sg; sg = next(sg, &total_in)) { 77 vq->vring.desc[i].flags = VRING_DESC_F_NEXT|VRING_DESC_F_WRITE; 78 vq->vring.desc[i].addr = sg_phys(sg); 79 vq->vring.desc[i].len = sg->length; 80 prev = i; 81 i = vq->vring.desc[i].next; 82 } 83 } 84 /* Last one doesn't continue. */ 85 vq->vring.desc[prev].flags &= ~VRING_DESC_F_NEXT; 86 87 /* Update free pointer */ 88 vq->free_head = i; 89 90 add_head: 91 /* Set token. */ 92 /*在客戶機驅動寫入數據到buffer以后,設置data數組以head為下標的內容為buffer的虛擬地址*/ 93 vq->data[head] = data; 94 95 /* Put entry in available array (but don't update avail->idx until they 96 * do sync). */ 97 98 //然后把本次傳送所用到的描述符表的信息寫入avail結構中 99 /*&應該是要保證idx小於vq->vring.num*/ 100 avail = (vq->vring.avail->idx & (vq->vring.num-1)); 101 /*設置avail_ring*/ 102 vq->vring.avail->ring[avail] = head; 103 104 /* Descriptors and available array need to be set before we expose the 105 * new available array entries. */ 106 virtio_wmb(vq->weak_barriers); 107 vq->vring.avail->idx++; 108 vq->num_added++; 109 110 /* This is very unlikely, but theoretically possible. Kick 111 * just in case. */ 112 if (unlikely(vq->num_added == (1 << 16) - 1)) 113 virtqueue_kick(_vq); 114 115 pr_debug("Added buffer head %i to %p\n", head, vq); 116 END_USE(vq); 117 118 return 0; 119 }
該函數實現了把sg table中記錄的信息,復制到發送隊列的ring的desc數組中。看下該函數幾個重要的參數:
struct virtqueue *_vq 添加的目的隊列
struct scatterlist *sgs[] 要添加的sg table
struct scatterlist *(*next) 一個函數指針,用於獲取下一個sg entry
(struct scatterlist *, unsigned int *)
unsigned int total_out 輸出的sg entry的個數
unsigned int total_in 輸入的sg entry的個數
unsigned int out_sgs 輸出的sg list的個數,這里一個out_sgs代表一個完整的skb_buffer
unsigned int in_sgs 輸入的sg list的個數,這里一個in_sgs代表一個完整的skb_buffer
void *data 一個指向sk_buffer的指針。
介紹完這些,下面的就很明確了,
首先判斷是否支持隊列是否支持indirect descriptor,首選也是使用這種方式,不過這種方式需要占用主描述符表的一個表項,並且在total_sg>1的時候使用(total_sg=1時只是使用主描述符表即可),如果滿足條件就調用vring_add_indirect函數添加間接描述符表,並把sg table中記錄的信息寫入到描述符表中。
如果不支持,就只能使用主描述符表,此時主描述符表的空閑表項數必須大於等於total_sg,具體可用的數目記錄在vq->vq.num_free,而首個可用的表項的下標記錄在vq->free_head中,下面的for循環就依次把sg table中entry的信息記錄到對應的desc表中,需要注意的是desc中的addr記錄的是buffer的物理地址,而sg是記錄的頁虛擬地址。下面的一個for循環是添加in_sg,關於in_sg和out_sg,目前在網絡驅動部分的發送隊列只使用out_sg,而接受隊列只使用in_sg,而控制隊列就可能兩個都使。這里我們忽略此點即可。
最后依然需要設置最后一個desc為末端desc,並移動vq->free_head便於下次使用。
add_head后面的部分是和后端驅動相關的。
主要是在發送隊列的ring[]中獲取一項,寫入前面寫入的sk_buffer 對應的desc表中 的head,即首個描述符的下標。然后更新vq->vring.avail->idx。
到現在前端驅動已經設置完成,剩下就要通知后端驅動讀取數據了,回到start_xmit函數中,看到調用了virtqueue_kick函數
1 void virtqueue_kick(struct virtqueue *vq) 2 { 3 if (virtqueue_kick_prepare(vq)) 4 virtqueue_notify(vq); 5 }
virtqueue_kick函數調用了virtqueue_kick_prepare判斷下當前是否應該notify后端,如果應該,就調用virtqueue_notify函數,該函數直接調用了vq->notify函數,參數是隊列指針。
而具體實現的是下面的函數vp_notify在virtio_pci.c中
1 static void vp_notify(struct virtqueue *vq) 2 { 3 struct virtio_pci_device *vp_dev = to_vp_device(vq->vdev); 4 5 /* we write the queue's selector into the notification register to 6 * signal the other end */ 7 iowrite16(vq->index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_NOTIFY); 8 }
可以看到,實際上前端通知后端僅僅是把隊列的索引寫入到對應的設備寄存器中,這樣在后端qemu就會知道是哪個隊列發生了add buffer,然后就從對應隊列的buffer取出數據。
而對於前后端notify機制的分析,這里我們單獨拿出一節來講,感興趣可以參考:
參考:
- LInux 3.11.1內核源代碼
- qemu 2.7.0源代碼
- qemu 開發者的幫助
