概述
本文主要介紹二層收包流程,包括NAPI與非NAPI方式;
NAPI方式
數據包到來,第一個數據包產生硬件中斷,中斷處理程序將設備的napi_struct結構掛在當前cpu的待收包設備鏈表softnet_data->poll_list中,並觸發軟中斷,軟中斷執行過程中,遍歷softnet_data->poll_list中的所有設備,依次調用其收包函數napi_sturct->poll,處理收包過程;
非NAPI方式
每個數據包到來,都會產生硬件中斷,中斷處理程序將收到的包放入當前cpu的收包隊列softnet_data->input_pkt_queue中,並且將非napi設備對應的虛擬設備napi結構softnet->backlog結構掛在當前cpu的待收包設備鏈表softnet_data->poll_list中,並觸發軟中斷,軟中斷處理過程中,會調用backlog的回調處理函數process_backlog,將收包隊列input_pkt_queue合並到softdata->process_queue后面,並依次處理該隊列中的數據包;
NAPI方式收包流程
中斷上半部
以e100為例:
e100_intr(中斷處理程序)–>__napi_schedule–>____napi_schedule(將設備對應的napi結構加入到當前cpu的待收包處理隊列softnet_data->poll_list中,並觸發軟中斷)
數據包到來,第一包產生中斷,中斷處理程序得到執行,其中關鍵步驟為調用__napi_schedule(&nic->napi)將設備對應的napi加入到當前cpu的softnet_data->poll_list中;
1 static irqreturn_t e100_intr(int irq, void *dev_id) 2 { 3 struct net_device *netdev = dev_id; 4 struct nic *nic = netdev_priv(netdev); 5 u8 stat_ack = ioread8(&nic->csr->scb.stat_ack); 6 7 netif_printk(nic, intr, KERN_DEBUG, nic->netdev, 8 "stat_ack = 0x%02X\n", stat_ack); 9 10 if (stat_ack == stat_ack_not_ours || /* Not our interrupt */ 11 stat_ack == stat_ack_not_present) /* Hardware is ejected */ 12 return IRQ_NONE; 13 14 /* Ack interrupt(s) */ 15 iowrite8(stat_ack, &nic->csr->scb.stat_ack); 16 17 /* We hit Receive No Resource (RNR); restart RU after cleaning */ 18 if (stat_ack & stat_ack_rnr) 19 nic->ru_running = RU_SUSPENDED; 20 21 if (likely(napi_schedule_prep(&nic->napi))) { 22 e100_disable_irq(nic); 23 //將該網絡設備加入到sd的poll_list中 24 __napi_schedule(&nic->napi); 25 } 26 27 return IRQ_HANDLED; 28 }
將設備對應的napi結構加入到當前cpu的softnet_data->poll_list中,並觸發收包軟中斷;
1 void __napi_schedule(struct napi_struct *n) 2 { 3 unsigned long flags; 4 5 local_irq_save(flags); 6 ____napi_schedule(this_cpu_ptr(&softnet_data), n); 7 local_irq_restore(flags); 8 } 9 10 11 //添加設備到poll_list,激活接收報文軟中斷 12 static inline void ____napi_schedule(struct softnet_data *sd, 13 struct napi_struct *napi) 14 { 15 list_add_tail(&napi->poll_list, &sd->poll_list); 16 __raise_softirq_irqoff(NET_RX_SOFTIRQ); 17 }
中斷下半部
net_rx_action(軟中斷收包處理程序)–>napi_poll(執行設備包處理回調napi_struct->poll)
收包軟中斷處理程序,軟中斷觸發,說明有設備的數據包到達,此時本處理程序遍歷softnet_data->poll_list中的待收包設備,並執行napi中的poll調度,關鍵代碼napi_poll(n, &repoll);
1 /* 收包軟中斷處理程序 */ 2 static __latent_entropy void net_rx_action(struct softirq_action *h) 3 { 4 struct softnet_data *sd = this_cpu_ptr(&softnet_data); 5 unsigned long time_limit = jiffies + 6 usecs_to_jiffies(netdev_budget_usecs); 7 int budget = netdev_budget; 8 LIST_HEAD(list); 9 LIST_HEAD(repoll); 10 11 /* 12 將當前cpu的待收包設備列表poll_list合並到list, 13 並且重新初始化poll_list 14 */ 15 local_irq_disable(); 16 list_splice_init(&sd->poll_list, &list); 17 local_irq_enable(); 18 19 /* 遍歷列表 */ 20 for (;;) { 21 struct napi_struct *n; 22 23 /* 列表為空,則跳出 */ 24 if (list_empty(&list)) { 25 if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll)) 26 goto out; 27 break; 28 } 29 30 /* 取鏈表頭napi節點 */ 31 n = list_first_entry(&list, struct napi_struct, poll_list); 32 33 /* 34 調用該節點的poll函數收包 , 35 若未處理完,則掛到repoll上 36 */ 37 budget -= napi_poll(n, &repoll); 38 39 /* If softirq window is exhausted then punt. 40 * Allow this to run for 2 jiffies since which will allow 41 * an average latency of 1.5/HZ. 42 */ 43 /* 總配額用盡,或者中斷時間窗口用盡,跳出 */ 44 if (unlikely(budget <= 0 || 45 time_after_eq(jiffies, time_limit))) { 46 sd->time_squeeze++; 47 break; 48 } 49 } 50 51 /* 禁用中斷 */ 52 local_irq_disable(); 53 54 /* 整合poll_list鏈表,包括新產成的,未完成的,未完成的在前 */ 55 list_splice_tail_init(&sd->poll_list, &list); 56 list_splice_tail(&repoll, &list); 57 list_splice(&list, &sd->poll_list); 58 59 /* 如果poll_list不為空,則觸發下一次收包中斷 */ 60 if (!list_empty(&sd->poll_list)) 61 __raise_softirq_irqoff(NET_RX_SOFTIRQ); 62 63 /* 啟用中斷 */ 64 net_rps_action_and_irq_enable(sd); 65 out: 66 __kfree_skb_flush(); 67 } 68 69 struct netdev_adjacent { 70 struct net_device *dev; 71 72 /* upper master flag, there can only be one master device per list */ 73 bool master; 74 75 /* counter for the number of times this device was added to us */ 76 u16 ref_nr; 77 78 /* private field for the users */ 79 void *private; 80 81 struct list_head list; 82 struct rcu_head rcu; 83 };
調用設備對應的napi_struct->poll回調接收數據包,接收數量要根據配額進行限制,關鍵代碼為 work = n->poll(n, weight);
1 static int napi_poll(struct napi_struct *n, struct list_head *repoll) 2 { 3 void *have; 4 int work, weight; 5 6 /* 將napi從鏈表中拿掉 */ 7 list_del_init(&n->poll_list); 8 9 have = netpoll_poll_lock(n); 10 11 /* 讀取配額 */ 12 weight = n->weight; 13 14 /* This NAPI_STATE_SCHED test is for avoiding a race 15 * with netpoll's poll_napi(). Only the entity which 16 * obtains the lock and sees NAPI_STATE_SCHED set will 17 * actually make the ->poll() call. Therefore we avoid 18 * accidentally calling ->poll() when NAPI is not scheduled. 19 */ 20 work = 0; 21 22 /* napi在調度狀態 */ 23 if (test_bit(NAPI_STATE_SCHED, &n->state)) { 24 /* 執行設備napi的poll回調進行收包 */ 25 work = n->poll(n, weight); 26 trace_napi_poll(n, work, weight); 27 } 28 29 WARN_ON_ONCE(work > weight); 30 31 /* 收包數量小於配額,全部讀完 */ 32 if (likely(work < weight)) 33 goto out_unlock; 34 35 /* 以下未讀完 */ 36 37 /* Drivers must not modify the NAPI state if they 38 * consume the entire weight. In such cases this code 39 * still "owns" the NAPI instance and therefore can 40 * move the instance around on the list at-will. 41 */ 42 /* napi在禁用狀態 */ 43 if (unlikely(napi_disable_pending(n))) { 44 /* 執行完成項 */ 45 napi_complete(n); 46 goto out_unlock; 47 } 48 49 if (n->gro_list) { 50 /* flush too old packets 51 * If HZ < 1000, flush all packets. 52 */ 53 napi_gro_flush(n, HZ >= 1000); 54 } 55 56 /* Some drivers may have called napi_schedule 57 * prior to exhausting their budget. 58 */ 59 if (unlikely(!list_empty(&n->poll_list))) { 60 pr_warn_once("%s: Budget exhausted after napi rescheduled\n", 61 n->dev ? n->dev->name : "backlog"); 62 goto out_unlock; 63 } 64 65 /* 將為處理完的掛到repoll上 */ 66 list_add_tail(&n->poll_list, repoll); 67 68 out_unlock: 69 netpoll_poll_unlock(have); 70 71 return work; 72 }
非NAPI方式收包流程
中斷上半部
netif_rx(中斷處理程序最終會調用次函數處理收到的包)->netif_rx_internal->enqueue_to_backlog(將收到的包加入到當前cpu的softnet->input_pkt_queue中,並將默認設備backlog加入到softnet_data結構的poll_list鏈表)
中斷處理程序會調用netif_rx來將數據包加入到收包隊列中,關鍵代碼:enqueue_to_backlog(skb, get_cpu(), &qtail); 注意數每包都會中斷;
1 int netif_rx(struct sk_buff *skb) 2 { 3 trace_netif_rx_entry(skb); 4 5 return netif_rx_internal(skb); 6 }
1 static int netif_rx_internal(struct sk_buff *skb) 2 { 3 int ret; 4 5 net_timestamp_check(netdev_tstamp_prequeue, skb); 6 7 trace_netif_rx(skb); 8 9 #ifdef CONFIG_RPS 10 if (static_key_false(&rps_needed)) { 11 struct rps_dev_flow voidflow, *rflow = &voidflow; 12 int cpu; 13 14 preempt_disable(); 15 rcu_read_lock(); 16 17 cpu = get_rps_cpu(skb->dev, skb, &rflow); 18 if (cpu < 0) 19 cpu = smp_processor_id(); 20 21 ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail); 22 23 rcu_read_unlock(); 24 preempt_enable(); 25 } else 26 #endif 27 { 28 unsigned int qtail; 29 30 ret = enqueue_to_backlog(skb, get_cpu(), &qtail); 31 put_cpu(); 32 } 33 return ret; 34 }
enqueue_to_backlog將skb加入到當前cpu的softnet_data->input_pkt_queue中,並將softnet_data->backlog結構加入到softnet_data->poll_list鏈表中,並觸發收包軟中斷;
1 static int enqueue_to_backlog(struct sk_buff *skb, int cpu, 2 unsigned int *qtail) 3 { 4 struct softnet_data *sd; 5 unsigned long flags; 6 unsigned int qlen; 7 8 sd = &per_cpu(softnet_data, cpu); 9 10 local_irq_save(flags); 11 12 rps_lock(sd); 13 14 //檢查設備狀態 15 if (!netif_running(skb->dev)) 16 goto drop; 17 18 //獲取隊列長度 19 qlen = skb_queue_len(&sd->input_pkt_queue); 20 21 //如果隊列未滿&& 未達到skb流限制 22 if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) { 23 24 //長度不為空,設備已經得到了調度 25 if (qlen) { 26 enqueue: 27 //skb入隊 28 __skb_queue_tail(&sd->input_pkt_queue, skb); 29 input_queue_tail_incr_save(sd, qtail); 30 rps_unlock(sd); 31 local_irq_restore(flags); 32 return NET_RX_SUCCESS; 33 } 34 35 /* Schedule NAPI for backlog device 36 * We can use non atomic operation since we own the queue lock 37 */ 38 //為空,則設置napi調度 39 if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) { 40 41 //alextodo 42 if (!rps_ipi_queued(sd)) 43 ____napi_schedule(sd, &sd->backlog); 44 } 45 46 //設置調度之后,入隊 47 goto enqueue; 48 } 49 50 //丟包 51 drop: 52 sd->dropped++; 53 rps_unlock(sd); 54 55 local_irq_restore(flags); 56 57 atomic_long_inc(&skb->dev->rx_dropped); 58 kfree_skb(skb); 59 return NET_RX_DROP; 60 }
中斷下半部
net_rx_action(軟中斷收包處理程序)–>napi_poll(執行非napi回調函數process_backlog)
net_rx_action與napi方式相同,這里略過,主要看下其poll回調函數,其將數據包從隊列中移出,調用__netif_receive_skb傳遞到上層,后續介紹傳遞流程,此處略過:
1 static int process_backlog(struct napi_struct *napi, int quota) 2 { 3 struct softnet_data *sd = container_of(napi, struct softnet_data, backlog); 4 bool again = true; 5 int work = 0; 6 7 /* Check if we have pending ipi, its better to send them now, 8 * not waiting net_rx_action() end. 9 */ 10 if (sd_has_rps_ipi_waiting(sd)) { 11 local_irq_disable(); 12 net_rps_action_and_irq_enable(sd); 13 } 14 15 //設置設備接收配額 16 napi->weight = dev_rx_weight; 17 while (again) { 18 struct sk_buff *skb; 19 20 //從隊列中取skb向上層輸入 21 while ((skb = __skb_dequeue(&sd->process_queue))) { 22 rcu_read_lock(); 23 __netif_receive_skb(skb); 24 rcu_read_unlock(); 25 input_queue_head_incr(sd); 26 27 //如果達到配額,則完成 28 if (++work >= quota) 29 return work; 30 31 } 32 33 local_irq_disable(); 34 rps_lock(sd); 35 36 //如果輸入隊列為空,沒有需要處理 37 if (skb_queue_empty(&sd->input_pkt_queue)) { 38 /* 39 * Inline a custom version of __napi_complete(). 40 * only current cpu owns and manipulates this napi, 41 * and NAPI_STATE_SCHED is the only possible flag set 42 * on backlog. 43 * We can use a plain write instead of clear_bit(), 44 * and we dont need an smp_mb() memory barrier. 45 */ 46 47 //重置狀態,處理完畢 48 napi->state = 0; 49 again = false; 50 } else { 51 //合並輸入隊列到處理隊列,繼續走循環處理 52 skb_queue_splice_tail_init(&sd->input_pkt_queue, 53 &sd->process_queue); 54 } 55 rps_unlock(sd); 56 local_irq_enable(); 57 } 58 59 //返回實際處理的包數 60 return work; 61 }