Linux NAPI處理流程分析


2017-05-10

今天重點對linux網絡數據包的處理做下分析,但是並不關系到上層協議,僅僅到鏈路層。

之前轉載過一篇文章,對NAPI做了比較詳盡的分析,本文結合Linux內核源代碼,對當前網絡數據包的處理進行梳理。根據NAPI的處理特性,對設備提出一定的要求

1、設備需要有足夠的緩沖區,保存多個數據分組

2、可以禁用當前設備中斷,然而不影響其他的操作。

當前大部分的設備都支持NAPI,但是為了對之前的保持兼容,內核還是對之前中斷方式提供了兼容。我們先看下NAPI具體的處理方式。我們都知道中斷分為中斷上半部和下半部,上半部完成的任務很是簡單,僅僅負責把數據保存下來;而下半部負責具體的處理。為了處理下半部,每個CPU有維護一個softnet_data結構。我們不對此結構做詳細介紹,僅僅描述和NAPI相關的部分。結構中有一個poll_list字段,連接所有的輪詢設備。還 維護了兩個隊列input_pkt_queue和process_queue。這兩個用戶傳統不支持NAPI方式的處理。前者由中斷上半部的處理函數吧數據包入隊,在具體的處理時,使用后者做中轉,相當於前者負責接收,后者負責處理。最后是一個napi_struct的backlog,代表一個虛擬設備供輪詢使用。在支持NAPI的設備下,每個設備具備一個緩沖隊列,存放到來數據。每個設備對應一個napi_struct結構,該結構代表該設備存放在poll_list中被輪詢。而設備還需要提供一個poll函數,在設備被輪詢到后,會調用poll函數對數據進行處理。基本邏輯就是這樣,下面看下具體流程。

中斷上半部:

非NAPI:

非NAPI對應的上半部函數為netif_rx,位於Dev.,c中

int netif_rx(struct sk_buff *skb)
{
    int ret;

    /* if netpoll wants it, pretend we never saw it */
    /*如果是net_poll想要的,則不作處理*/
    if (netpoll_rx(skb))
        return NET_RX_DROP;
    /*檢查時間戳*/
    net_timestamp_check(netdev_tstamp_prequeue, skb);

    trace_netif_rx(skb);
#ifdef CONFIG_RPS
    if (static_key_false(&rps_needed)) {
        struct rps_dev_flow voidflow, *rflow = &voidflow;
        int cpu;
        /*禁用搶占*/
        preempt_disable();
        rcu_read_lock();
        
        cpu = get_rps_cpu(skb->dev, skb, &rflow);
        if (cpu < 0)
            cpu = smp_processor_id();
        /*把數據入隊*/
        ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);

        rcu_read_unlock();
        preempt_enable();
    } else
#endif
    {
        unsigned int qtail;
        ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
        put_cpu();
    }
    return ret;
}

 

中間RPS暫時不關心,這里直接調用enqueue_to_backlog放入CPU的全局隊列input_pkt_queue

static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
                  unsigned int *qtail)
{
    struct softnet_data *sd;
    unsigned long flags;
    /*獲取cpu相關的softnet_data變量*/
    sd = &per_cpu(softnet_data, cpu);
    /*關中斷*/
    local_irq_save(flags);

    rps_lock(sd);
    /*如果input_pkt_queue的長度小於最大限制,則符合條件*/
    if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog) {
        /*如果input_pkt_queue不為空,說明虛擬設備已經得到調度,此時僅僅把數據加入
            input_pkt_queue隊列即可
        */
        if (skb_queue_len(&sd->input_pkt_queue)) {
enqueue:
            __skb_queue_tail(&sd->input_pkt_queue, skb);
            input_queue_tail_incr_save(sd, qtail);
            rps_unlock(sd);
            local_irq_restore(flags);
            return NET_RX_SUCCESS;
        }

        /* Schedule NAPI for backlog device
         * We can use non atomic operation since we own the queue lock
         */
         /*否則需要調度backlog 即虛擬設備,然后再入隊。napi_struct結構中的state字段如果標記了NAPI_STATE_SCHED,則表明該設備已經在調度,不需要再次調度*/
        if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
            if (!rps_ipi_queued(sd))
                ____napi_schedule(sd, &sd->backlog);
        }
        goto enqueue;
    }
    /*到這里緩沖區已經不足了,必須丟棄*/
    sd->dropped++;
    rps_unlock(sd);
    local_irq_restore(flags);
    atomic_long_inc(&skb->dev->rx_dropped);
    kfree_skb(skb);
    return NET_RX_DROP;
}

 

該函數邏輯也比較簡單,主要注意的是設備必須先添加調度然后才能接受數據,添加調度調用了____napi_schedule函數,該函數把設備對應的napi_struct結構插入到softnet_data的poll_list鏈表尾部,然后喚醒軟中斷,這樣在下次軟中斷得到處理時,中斷下半部就會得到處理。不妨看下源碼

static inline void ____napi_schedule(struct softnet_data *sd,
                     struct napi_struct *napi)
{
    list_add_tail(&napi->poll_list, &sd->poll_list);
    __raise_softirq_irqoff(NET_RX_SOFTIRQ);
}

 

NAPI方式

NAPI的方式相對於非NAPI要簡單許多,看下e100網卡的中斷處理函數e100_intr,核心部分

if (likely(napi_schedule_prep(&nic->napi))) {
        e100_disable_irq(nic);//屏蔽當前中斷
        __napi_schedule(&nic->napi);//把設備加入到輪訓隊列
    }

 

if條件檢查當前設備是否 可被調度,主要檢查兩個方面:1、是否已經在調度 2、是否禁止了napi pending.如果符合條件,就關閉當前設備的中斷,調用__napi_schedule函數把設備假如到輪訓列表,從而開啟輪詢模式。

分析:結合上面兩種方式,還是可以發現兩種方式的異同。其中softnet_data作為主導結構,在NAPI的處理方式下,主要維護輪詢鏈表。NAPI設備均對應一個napi_struct結構,添加到鏈表中;非NAPI沒有對應的napi_struct結構,為了使用NAPI的處理流程,使用了softnet_data結構中的back_log作為一個虛擬設備添加到輪詢鏈表。同時由於非NAPI設備沒有各自的接收隊列,所以利用了softnet_data結構的input_pkt_queue作為全局的接收隊列。這樣就處理而言,可以和NAPI的設備進行兼容。但是還有一個重要區別,在NAPI的方式下,首次數據包的接收使用中斷的方式,而后續的數據包就會使用輪詢處理了;而非NAPI每次都是通過中斷通知。

下半部:

下半部的處理函數,之前提到,網絡數據包的接發對應兩個不同的軟中斷,接收軟中斷NET_RX_SOFTIRQ的處理函數對應net_rx_action

static void net_rx_action(struct softirq_action *h)
{
    struct softnet_data *sd = &__get_cpu_var(softnet_data);
    unsigned long time_limit = jiffies + 2;
    int budget = netdev_budget;
    void *have;

    local_irq_disable();
    /*遍歷輪詢表*/
    while (!list_empty(&sd->poll_list)) {
        struct napi_struct *n;
        int work, weight;

        /* If softirq window is exhuasted then punt.
         * Allow this to run for 2 jiffies since which will allow
         * an average latency of 1.5/HZ.
         */
         /*如果開支用完了或者時間用完了*/
        if (unlikely(budget <= 0 || time_after_eq(jiffies, time_limit)))
            goto softnet_break;

        local_irq_enable();

        /* Even though interrupts have been re-enabled, this
         * access is safe because interrupts can only add new
         * entries to the tail of this list, and only ->poll()
         * calls can remove this head entry from the list.
         */
         /*獲取鏈表中首個設備*/
        n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list);

        have = netpoll_poll_lock(n);
        weight = n->weight;
        /* This NAPI_STATE_SCHED test is for avoiding a race
         * with netpoll's poll_napi().  Only the entity which
         * obtains the lock and sees NAPI_STATE_SCHED set will
         * actually make the ->poll() call.  Therefore we avoid
         * accidentally calling ->poll() when NAPI is not scheduled.
         */
        work = 0;
        /*如果被設備已經被調度,則調用其處理函數poll函數*/
        if (test_bit(NAPI_STATE_SCHED, &n->state)) {
            work = n->poll(n, weight);//后面weight指定了一個額度
            trace_napi_poll(n);
        }

        WARN_ON_ONCE(work > weight);
        /*總額度遞減*/
        budget -= work;

        local_irq_disable();

        /* Drivers must not modify the NAPI state if they
         * consume the entire weight.  In such cases this code
         * still "owns" the NAPI instance and therefore can
         * move the instance around on the list at-will.
         */
         /*如果work=weight的話。任務就完成了,把設備從輪詢鏈表刪除*/
        if (unlikely(work == weight)) {
            if (unlikely(napi_disable_pending(n))) {
                local_irq_enable();
                napi_complete(n);
                local_irq_disable();
            } else {
                if (n->gro_list) {
                    /* flush too old packets
                     * If HZ < 1000, flush all packets.
                     */
                    local_irq_enable();
                    napi_gro_flush(n, HZ >= 1000);
                    local_irq_disable();
                }
                /*每次處理完就把設備移動到列表尾部*/
                list_move_tail(&n->poll_list, &sd->poll_list);
            }
        }
        netpoll_poll_unlock(have);
    }
out:
    net_rps_action_and_irq_enable(sd);

#ifdef CONFIG_NET_DMA
    /*
     * There may not be any more sk_buffs coming right now, so push
     * any pending DMA copies to hardware
     */
    dma_issue_pending_all();
#endif

    return;

softnet_break:
    sd->time_squeeze++;
    __raise_softirq_irqoff(NET_RX_SOFTIRQ);
    goto out;
}

 

這里有處理方式比較直觀,直接遍歷poll_list鏈表,處理之前設置了兩個限制:budget和time_limit。前者限制本次處理數據包的總量,后者限制本次處理總時間。只有二者均有剩余的情況下,才會繼續處理。處理期間同樣是開中斷的,每次總是從鏈表表頭取設備進行處理,如果設備被調度,其實就是檢查NAPI_STATE_SCHED位,則調用 napi_struct的poll函數,處理結束如果沒有處理完,則把設備移動到鏈表尾部,否則從鏈表刪除。NAPI設備對應的poll函數會同樣會調用__netif_receive_skb函數上傳協議棧,這里就不做分析了,感興趣可以參考e100的poll函數e100_poll。

 

而非NAPI對應poll函數為process_backlog。

static int process_backlog(struct napi_struct *napi, int quota)
{
    int work = 0;
    struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);

#ifdef CONFIG_RPS
    /* Check if we have pending ipi, its better to send them now,
     * not waiting net_rx_action() end.
     */
    if (sd->rps_ipi_list) {
        local_irq_disable();
        net_rps_action_and_irq_enable(sd);
    }
#endif
    napi->weight = weight_p;
    local_irq_disable();
    while (work < quota) {
        struct sk_buff *skb;
        unsigned int qlen;
        /*涉及到兩個隊列process_queue和input_pkt_queue,數據包到來時首先填充input_pkt_queue,
        而在處理時從process_queue中取,根據這個邏輯,首次處理process_queue必定為空,檢查input_pkt_queue
        如果input_pkt_queue不為空,則把其中的數據包遷移到process_queue中,然后繼續處理,減少鎖沖突。
        */
        while ((skb = __skb_dequeue(&sd->process_queue))) {
            local_irq_enable();
            /*進入協議棧*/
            __netif_receive_skb(skb);
            local_irq_disable();
            input_queue_head_incr(sd);
            if (++work >= quota) {
                local_irq_enable();
                return work;
            }
        }

        rps_lock(sd);
        qlen = skb_queue_len(&sd->input_pkt_queue);
        if (qlen)
            skb_queue_splice_tail_init(&sd->input_pkt_queue,
                           &sd->process_queue);

        if (qlen < quota - work) {
            /*
             * Inline a custom version of __napi_complete().
             * only current cpu owns and manipulates this napi,
             * and NAPI_STATE_SCHED is the only possible flag set on backlog.
             * we can use a plain write instead of clear_bit(),
             * and we dont need an smp_mb() memory barrier.
             */
            list_del(&napi->poll_list);
            napi->state = 0;

            quota = work + qlen;
        }
        rps_unlock(sd);
    }
    local_irq_enable();

    return work;
}

 

函數還是比較簡單的,需要注意的每次處理都攜帶一個配額,即本次只能處理quota個數據包,如果超額了,即使沒處理完也要返回,這是為了保證處理器的公平使用。處理在一個while循環中完成,循環條件正是work < quota,首先會從process_queue中取出skb,調用__netif_receive_skb上傳給協議棧,然后增加work。當work即將大於quota時,即++work >= quota時,就要返回。當work還有剩余額度,但是process_queue中數據處理完了,就需要檢查input_pkt_queue,因為在具體處理期間是開中斷的,那么期間就有可能有新的數據包到來,如果input_pkt_queue不為空,則調用skb_queue_splice_tail_init函數把數據包遷移到process_queue。如果剩余額度足夠處理完這些數據包,那么就把虛擬設備移除輪詢隊列。這里有些疑惑就是最后為何要增加額度,剩下的額度已經足夠處理這些數據了呀?根據此流程不難發現,其實執行的是在兩個隊列之間移動數據包,然后再做處理。

 

 

參考:linux內核源碼


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM