設備收發包之NAPI/非NAPI方式收包


概述

本文主要介紹二層收包流程,包括NAPI與非NAPI方式;

NAPI方式

數據包到來,第一個數據包產生硬件中斷,中斷處理程序將設備的napi_struct結構掛在當前cpu的待收包設備鏈表softnet_data->poll_list中,並觸發軟中斷,軟中斷執行過程中,遍歷softnet_data->poll_list中的所有設備,依次調用其收包函數napi_sturct->poll,處理收包過程;

非NAPI方式

每個數據包到來,都會產生硬件中斷,中斷處理程序將收到的包放入當前cpu的收包隊列softnet_data->input_pkt_queue中,並且將非napi設備對應的虛擬設備napi結構softnet->backlog結構掛在當前cpu的待收包設備鏈表softnet_data->poll_list中,並觸發軟中斷,軟中斷處理過程中,會調用backlog的回調處理函數process_backlog,將收包隊列input_pkt_queue合並到softdata->process_queue后面,並依次處理該隊列中的數據包;

 

NAPI方式收包流程
中斷上半部

以e100為例:

e100_intr(中斷處理程序)–>__napi_schedule–>____napi_schedule(將設備對應的napi結構加入到當前cpu的待收包處理隊列softnet_data->poll_list中,並觸發軟中斷)

數據包到來,第一包產生中斷,中斷處理程序得到執行,其中關鍵步驟為調用__napi_schedule(&nic->napi)將設備對應的napi加入到當前cpu的softnet_data->poll_list中;

 1 static irqreturn_t e100_intr(int irq, void *dev_id)
 2 {
 3     struct net_device *netdev = dev_id;
 4     struct nic *nic = netdev_priv(netdev);
 5     u8 stat_ack = ioread8(&nic->csr->scb.stat_ack);
 6 
 7     netif_printk(nic, intr, KERN_DEBUG, nic->netdev,
 8              "stat_ack = 0x%02X\n", stat_ack);
 9 
10     if (stat_ack == stat_ack_not_ours ||    /* Not our interrupt */
11        stat_ack == stat_ack_not_present)    /* Hardware is ejected */
12         return IRQ_NONE;
13 
14     /* Ack interrupt(s) */
15     iowrite8(stat_ack, &nic->csr->scb.stat_ack);
16 
17     /* We hit Receive No Resource (RNR); restart RU after cleaning */
18     if (stat_ack & stat_ack_rnr)
19         nic->ru_running = RU_SUSPENDED;
20 
21     if (likely(napi_schedule_prep(&nic->napi))) {
22         e100_disable_irq(nic);
23         //將該網絡設備加入到sd的poll_list中
24         __napi_schedule(&nic->napi);
25     }
26 
27     return IRQ_HANDLED;
28 }

 

將設備對應的napi結構加入到當前cpu的softnet_data->poll_list中,並觸發收包軟中斷;

 1 void __napi_schedule(struct napi_struct *n)
 2 {
 3     unsigned long flags;
 4 
 5     local_irq_save(flags);
 6     ____napi_schedule(this_cpu_ptr(&softnet_data), n);
 7     local_irq_restore(flags);
 8 }
 9 
10 
11 //添加設備到poll_list,激活接收報文軟中斷
12 static inline void ____napi_schedule(struct softnet_data *sd,
13                      struct napi_struct *napi)
14 {
15     list_add_tail(&napi->poll_list, &sd->poll_list);
16     __raise_softirq_irqoff(NET_RX_SOFTIRQ);
17 }

 

中斷下半部

net_rx_action(軟中斷收包處理程序)–>napi_poll(執行設備包處理回調napi_struct->poll)

收包軟中斷處理程序,軟中斷觸發,說明有設備的數據包到達,此時本處理程序遍歷softnet_data->poll_list中的待收包設備,並執行napi中的poll調度,關鍵代碼napi_poll(n, &repoll);

 1 /* 收包軟中斷處理程序 */
 2 static __latent_entropy void net_rx_action(struct softirq_action *h)
 3 {
 4     struct softnet_data *sd = this_cpu_ptr(&softnet_data);
 5     unsigned long time_limit = jiffies +
 6         usecs_to_jiffies(netdev_budget_usecs);
 7     int budget = netdev_budget;
 8     LIST_HEAD(list);
 9     LIST_HEAD(repoll);
10 
11     /* 
12         將當前cpu的待收包設備列表poll_list合並到list,
13         並且重新初始化poll_list 
14     */
15     local_irq_disable();
16     list_splice_init(&sd->poll_list, &list);
17     local_irq_enable();
18 
19     /* 遍歷列表 */
20     for (;;) {
21         struct napi_struct *n;
22 
23         /* 列表為空,則跳出 */
24         if (list_empty(&list)) {
25             if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
26                 goto out;
27             break;
28         }
29 
30         /* 取鏈表頭napi節點 */
31         n = list_first_entry(&list, struct napi_struct, poll_list);
32 
33         /* 
34             調用該節點的poll函數收包 ,
35             若未處理完,則掛到repoll上
36         */
37         budget -= napi_poll(n, &repoll);
38 
39         /* If softirq window is exhausted then punt.
40          * Allow this to run for 2 jiffies since which will allow
41          * an average latency of 1.5/HZ.
42          */
43         /* 總配額用盡,或者中斷時間窗口用盡,跳出 */
44         if (unlikely(budget <= 0 ||
45                  time_after_eq(jiffies, time_limit))) {
46             sd->time_squeeze++;
47             break;
48         }
49     }
50 
51     /* 禁用中斷 */
52     local_irq_disable();
53 
54     /* 整合poll_list鏈表,包括新產成的,未完成的,未完成的在前 */
55     list_splice_tail_init(&sd->poll_list, &list);
56     list_splice_tail(&repoll, &list);
57     list_splice(&list, &sd->poll_list);
58 
59     /* 如果poll_list不為空,則觸發下一次收包中斷 */
60     if (!list_empty(&sd->poll_list))
61         __raise_softirq_irqoff(NET_RX_SOFTIRQ);
62 
63     /* 啟用中斷 */
64     net_rps_action_and_irq_enable(sd);
65 out:
66     __kfree_skb_flush();
67 }
68 
69 struct netdev_adjacent {
70     struct net_device *dev;
71 
72     /* upper master flag, there can only be one master device per list */
73     bool master;
74 
75     /* counter for the number of times this device was added to us */
76     u16 ref_nr;
77 
78     /* private field for the users */
79     void *private;
80 
81     struct list_head list;
82     struct rcu_head rcu;
83 };

 

調用設備對應的napi_struct->poll回調接收數據包,接收數量要根據配額進行限制,關鍵代碼為 work = n->poll(n, weight);

 1 static int napi_poll(struct napi_struct *n, struct list_head *repoll)
 2 {
 3     void *have;
 4     int work, weight;
 5 
 6     /* 將napi從鏈表中拿掉 */
 7     list_del_init(&n->poll_list);
 8 
 9     have = netpoll_poll_lock(n);
10 
11     /* 讀取配額 */
12     weight = n->weight;
13 
14     /* This NAPI_STATE_SCHED test is for avoiding a race
15      * with netpoll's poll_napi().  Only the entity which
16      * obtains the lock and sees NAPI_STATE_SCHED set will
17      * actually make the ->poll() call.  Therefore we avoid
18      * accidentally calling ->poll() when NAPI is not scheduled.
19      */
20     work = 0;
21 
22     /* napi在調度狀態 */
23     if (test_bit(NAPI_STATE_SCHED, &n->state)) {
24         /* 執行設備napi的poll回調進行收包 */
25         work = n->poll(n, weight);
26         trace_napi_poll(n, work, weight);
27     }
28 
29     WARN_ON_ONCE(work > weight);
30 
31     /* 收包數量小於配額,全部讀完 */
32     if (likely(work < weight))
33         goto out_unlock;
34 
35     /* 以下未讀完 */
36 
37     /* Drivers must not modify the NAPI state if they
38      * consume the entire weight.  In such cases this code
39      * still "owns" the NAPI instance and therefore can
40      * move the instance around on the list at-will.
41      */
42     /* napi在禁用狀態 */
43     if (unlikely(napi_disable_pending(n))) {
44         /* 執行完成項 */
45         napi_complete(n);
46         goto out_unlock;
47     }
48 
49     if (n->gro_list) {
50         /* flush too old packets
51          * If HZ < 1000, flush all packets.
52          */
53         napi_gro_flush(n, HZ >= 1000);
54     }
55 
56     /* Some drivers may have called napi_schedule
57      * prior to exhausting their budget.
58      */
59     if (unlikely(!list_empty(&n->poll_list))) {
60         pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
61                  n->dev ? n->dev->name : "backlog");
62         goto out_unlock;
63     }
64 
65     /* 將為處理完的掛到repoll上 */
66     list_add_tail(&n->poll_list, repoll);
67 
68 out_unlock:
69     netpoll_poll_unlock(have);
70 
71     return work;
72 }

 

非NAPI方式收包流程
中斷上半部

netif_rx(中斷處理程序最終會調用次函數處理收到的包)->netif_rx_internal->enqueue_to_backlog(將收到的包加入到當前cpu的softnet->input_pkt_queue中,並將默認設備backlog加入到softnet_data結構的poll_list鏈表)

中斷處理程序會調用netif_rx來將數據包加入到收包隊列中,關鍵代碼:enqueue_to_backlog(skb, get_cpu(), &qtail); 注意數每包都會中斷;

1 int netif_rx(struct sk_buff *skb)
2 {
3     trace_netif_rx_entry(skb);
4 
5     return netif_rx_internal(skb);
6 }

 

 1 static int netif_rx_internal(struct sk_buff *skb)
 2 {
 3     int ret;
 4 
 5     net_timestamp_check(netdev_tstamp_prequeue, skb);
 6 
 7     trace_netif_rx(skb);
 8 
 9 #ifdef CONFIG_RPS
10     if (static_key_false(&rps_needed)) {
11         struct rps_dev_flow voidflow, *rflow = &voidflow;
12         int cpu;
13 
14         preempt_disable();
15         rcu_read_lock();
16 
17         cpu = get_rps_cpu(skb->dev, skb, &rflow);
18         if (cpu < 0)
19             cpu = smp_processor_id();
20 
21         ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
22 
23         rcu_read_unlock();
24         preempt_enable();
25     } else
26 #endif
27     {
28         unsigned int qtail;
29 
30         ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
31         put_cpu();
32     }
33     return ret;
34 }

 

enqueue_to_backlog將skb加入到當前cpu的softnet_data->input_pkt_queue中,並將softnet_data->backlog結構加入到softnet_data->poll_list鏈表中,並觸發收包軟中斷;

 1 static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
 2                   unsigned int *qtail)
 3 {
 4     struct softnet_data *sd;
 5     unsigned long flags;
 6     unsigned int qlen;
 7 
 8     sd = &per_cpu(softnet_data, cpu);
 9 
10     local_irq_save(flags);
11 
12     rps_lock(sd);
13 
14     //檢查設備狀態
15     if (!netif_running(skb->dev))
16         goto drop;
17 
18     //獲取隊列長度
19     qlen = skb_queue_len(&sd->input_pkt_queue);
20 
21     //如果隊列未滿&& 未達到skb流限制
22     if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
23 
24         //長度不為空,設備已經得到了調度
25         if (qlen) {
26 enqueue:
27             //skb入隊
28             __skb_queue_tail(&sd->input_pkt_queue, skb);
29             input_queue_tail_incr_save(sd, qtail);
30             rps_unlock(sd);
31             local_irq_restore(flags);
32             return NET_RX_SUCCESS;
33         }
34 
35         /* Schedule NAPI for backlog device
36          * We can use non atomic operation since we own the queue lock
37          */
38         //為空,則設置napi調度
39         if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
40 
41             //alextodo
42             if (!rps_ipi_queued(sd))
43                 ____napi_schedule(sd, &sd->backlog);
44         }
45 
46         //設置調度之后,入隊
47         goto enqueue;
48     }
49 
50 //丟包
51 drop:
52     sd->dropped++;
53     rps_unlock(sd);
54 
55     local_irq_restore(flags);
56 
57     atomic_long_inc(&skb->dev->rx_dropped);
58     kfree_skb(skb);
59     return NET_RX_DROP;
60 }

 

中斷下半部

net_rx_action(軟中斷收包處理程序)–>napi_poll(執行非napi回調函數process_backlog)

net_rx_action與napi方式相同,這里略過,主要看下其poll回調函數,其將數據包從隊列中移出,調用__netif_receive_skb傳遞到上層,后續介紹傳遞流程,此處略過:

 1 static int process_backlog(struct napi_struct *napi, int quota)
 2 {
 3     struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
 4     bool again = true;
 5     int work = 0;
 6 
 7     /* Check if we have pending ipi, its better to send them now,
 8      * not waiting net_rx_action() end.
 9      */
10     if (sd_has_rps_ipi_waiting(sd)) {
11         local_irq_disable();
12         net_rps_action_and_irq_enable(sd);
13     }
14 
15     //設置設備接收配額
16     napi->weight = dev_rx_weight;
17     while (again) {
18         struct sk_buff *skb;
19 
20         //從隊列中取skb向上層輸入
21         while ((skb = __skb_dequeue(&sd->process_queue))) {
22             rcu_read_lock();
23             __netif_receive_skb(skb);
24             rcu_read_unlock();
25             input_queue_head_incr(sd);
26 
27             //如果達到配額,則完成
28             if (++work >= quota)
29                 return work;
30 
31         }
32 
33         local_irq_disable();
34         rps_lock(sd);
35 
36         //如果輸入隊列為空,沒有需要處理
37         if (skb_queue_empty(&sd->input_pkt_queue)) {
38             /*
39              * Inline a custom version of __napi_complete().
40              * only current cpu owns and manipulates this napi,
41              * and NAPI_STATE_SCHED is the only possible flag set
42              * on backlog.
43              * We can use a plain write instead of clear_bit(),
44              * and we dont need an smp_mb() memory barrier.
45              */
46 
47             //重置狀態,處理完畢
48             napi->state = 0;
49             again = false;
50         } else {
51             //合並輸入隊列到處理隊列,繼續走循環處理
52             skb_queue_splice_tail_init(&sd->input_pkt_queue,
53                            &sd->process_queue);
54         }
55         rps_unlock(sd);
56         local_irq_enable();
57     }
58 
59     //返回實際處理的包數
60     return work;
61 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM