一個發包亂序問題記錄


在用戶線程綁定某個核的情況下,從某個線程發送的udp報文,偶爾出現了亂序。我們來分析下發包流程:

 0xffffffff81593b30 : dev_hard_start_xmit+0x0/0x1a0 [kernel]------------進入driver層
 0xffffffff81596b08 : __dev_queue_xmit+0x448/0x550 [kernel]--------------其實少了一個qdisc層的堆棧
 0xffffffff81596c20 : dev_queue_xmit+0x10/0x20 [kernel]-----------------dev層
 0xffffffff815a284d : neigh_resolve_output+0x11d/0x220 [kernel]
 0xffffffff815dd65c : ip_finish_output+0x2ac/0x7a0 [kernel]
 0xffffffff815dde53 : ip_output+0x73/0xe0 [kernel]
 0xffffffff815dba87 : ip_local_out_sk+0x37/0x40 [kernel]
 0xffffffff815dbdf3 : ip_queue_xmit+0x143/0x3a0 [kernel]----------------ip層
 0xffffffff815f60fc : tcp_transmit_skb+0x52c/0xa20 [kernel]
 0xffffffff815f687c : tcp_write_xmit+0x28c/0xcf0 [kernel]
 0xffffffff815f755e : __tcp_push_pending_frames+0x2e/0xc0 [kernel]
 0xffffffff815e54ac : tcp_push+0xec/0x120 [kernel]
 0xffffffff815e8e70 : tcp_sendmsg+0xd0/0xc30 [kernel]---------------------tcp層
 0xffffffff816153d9 : inet_sendmsg+0x69/0xb0 [kernel]---------------------inet層
 0xffffffff815765ad : sock_aio_write+0x15d/0x180 [kernel]
 0xffffffff81208093 : do_sync_write+0x93/0xe0 [kernel]---------------------vfs層
 0xffffffff81208c75 : vfs_write+0x1c5/0x1f0 [kernel]
 0xffffffff8120998f : sys_write+0x7f/0xe0 [kernel]
 0xffffffff816c5715 : system_call_fastpath+0x1c/0x21 [kernel]

沒有故意去抓udp的堆棧,除了tcp層那部分不太一樣,其他都應該一樣,不影響我們分析。

可以看到,這個是sys態直接發送的案例,后面其實就是

dev_hard_start_xmit--》xmit_one--》
netdev_start_xmit--》
__netdev_start_xmit--》
ops->ndo_start_xmit,這個對於bond,這個是
bond_start_xmit,最終還是會走到實體設備,如i40e的驅動,是 i40e_lan_xmit_frame ,而ixegb的驅動則是:ixgbe_xmit_frame

也就是,整個發送過程都體現出來了。而且這個流程是在sys態完成的,不是在軟中斷中,軟中斷的話,需要從net_tx_action 中看起。

但是,如果對整個流程非常了解的人,可以看到堆棧中缺少一部分,那就是

__qdisc_run--》qdisc_restart--》
sch_direct_xmit--》
dev_hard_start_xmit這部分並沒有在此體現。

從qdisc角度來看,雖然它的作用在於流量控制,但是也可以看做是dev層到驅動層的一個緩存層,從前面堆棧看,如果一個socket 固定從某個

cpu上發送,不會出現亂序,到了網卡驅動層,更不會亂序,那么到了qdisc層會怎么樣?

下面詳細分析下qdisc的可能亂序行為:

int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
{
。。。
    txq = netdev_pick_tx(dev, skb, accel_priv);
    q = rcu_dereference_bh(txq->qdisc);
。。。
    if (q->enqueue) {------------enqueue不為空,流控
        rc = __dev_xmit_skb(skb, q, dev, txq);
        goto out;
    }
。。。
}

拿到skb之后,怎么選擇哪個queue發送呢?

//隊列選擇函數,如果不是多隊列,ops->ndo_select_queue非空的話,則調用ops->ndo_select_queue,否則__netdev_pick_tx
struct netdev_queue *netdev_pick_tx(struct net_device *dev,
                    struct sk_buff *skb,
                    void *accel_priv)
{
    int queue_index = 0;

#ifdef CONFIG_XPS
    u32 sender_cpu = skb->sender_cpu - 1;

    if (sender_cpu >= (u32)NR_CPUS)
        skb->sender_cpu = raw_smp_processor_id() + 1;-------------開啟了xps的話,會設置sender_cpu的值為當前cpu號+1
#endif

    if (dev->real_num_tx_queues != 1) {---------------------------多隊列的設備
        const struct net_device_ops *ops = dev->netdev_ops;
        if (ops->ndo_select_queue)-----------i40e這個地方是NULL
            queue_index = ops->ndo_select_queue(dev, skb, accel_priv,
                                __netdev_pick_tx);
        else
            queue_index = __netdev_pick_tx(dev, skb);-------------所以i40e走這個流程

        if (!accel_priv)
            queue_index = netdev_cap_txqueue(dev, queue_index);
    }

    skb_set_queue_mapping(skb, queue_index);------------------保存對應的queue_index到skb->queue_mapping,發送的時候,會取這個來獲取對應的queue
    return netdev_get_tx_queue(dev, queue_index);
}

既然i40e走的是默認的選queue_index的流程,那么就需要看一下這個函數:

static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
{
    struct sock *sk = skb->sk;
    int queue_index = sk_tx_queue_get(sk);------之前保存的queue_index

    if (queue_index < 0 || skb->ooo_okay ||
        queue_index >= dev->real_num_tx_queues) {
        int new_index = get_xps_queue(dev, skb);------開啟了xps,優先選擇
        if (new_index < 0)--------沒選到
            new_index = skb_tx_hash(dev, skb);--------則直接hash

        if (queue_index != new_index && sk &&
            rcu_access_pointer(sk->sk_dst_cache))
            sk_tx_queue_set(sk, new_index);

        queue_index = new_index;
    }

    return queue_index;
}

 在開啟了xps的情況下,get_xps_queue 會根據xps_map來選擇隊列,如果xps只綁定了一個cpu,則用那個對應的queue-index,否則根據skb來hash選擇:

//開啟xps,根據sender_cpu來選擇map
static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb)
{
#ifdef CONFIG_XPS
    struct xps_dev_maps *dev_maps;
    struct xps_map *map;
    int queue_index = -1;

    rcu_read_lock();
    dev_maps = rcu_dereference(dev->xps_maps);
    if (dev_maps) {
        map = rcu_dereference(
            dev_maps->cpu_map[skb->sender_cpu - 1]);//由於之前設置了sendercpu,所以這里取該cpu,找到對應的map
        if (map) {
            if (map->len == 1)
                queue_index = map->queues[0];//如果cpu只關聯當前net_device的一個隊列,當然直接選擇
            else
                queue_index = map->queues[reciprocal_scale(skb_get_hash(skb),
                                       map->len)];//當前cpu關聯了多個隊列,則做下hash選擇
            if (unlikely(queue_index >= dev->real_num_tx_queues))
                queue_index = -1;
        }
    }
    rcu_read_unlock();

    return queue_index;
#else
    return -1;
#endif
}

 我們目前設置的xps_map如下:

cat /sys/class/net/eth0/queues/tx-0/xps_cpus
00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001

也就是一個隊列,只對應一個cpu。這個展示的結果,是從隊列的角度來展示對應的cpu,但是內核實現的時候,
實際上代碼內使用了反向映射,通過xps_dev_maps存放到cpu到tx隊列集合的映射:參照:dev_maps->cpu_map 函數的 實現過程,

按照我們目前這樣的這樣的話,一個隊列對應的是一個核,一個核針對某個網卡,也只是某一個隊列,按道理也不會亂序啊。因為txq因為queue-index唯一確定,

而qdisc 又是由queue->qdisc唯一確定的。

選擇好了queue,那么來看對應的qdisc實現:

static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                 struct net_device *dev,
                 struct netdev_queue *txq)
{
    spinlock_t *root_lock = qdisc_lock(q);
    bool contended;
    int rc;

    qdisc_pkt_len_init(skb);
    qdisc_calculate_pkt_len(skb, q);
    /*
     * Heuristic to force contended enqueues to serialize on a
     * separate lock before trying to get qdisc main lock.
     * This permits __QDISC_STATE_RUNNING owner to get the lock more often
     * and dequeue packets faster.
     */
    contended = qdisc_is_running(q);
    if (unlikely(contended))
        spin_lock(&q->busylock);

    spin_lock(root_lock);
    ........//只保留一種條件,其他忽略
        rc = q->enqueue(skb, q) & NET_XMIT_MASK;//入隊qdisc,對於fq,其實就是將當前的skb按順序加到flow的尾部
        if (qdisc_run_begin(q)) {
            if (unlikely(contended)) {
                spin_unlock(&q->busylock);
                contended = false;
            }
            __qdisc_run(q);//取包調dequeue,也是按順序的,
        }
    }
    spin_unlock(root_lock);
    if (unlikely(contended))
        spin_unlock(&q->busylock);
    return rc;
}

好的,正式進入了qdisc層,來看一下q->enqueue的實現:

當然如果把qdisc看做一個對象的話,它的背后還有一堆class和filter,由於我們環境使用的是:

[root@localhost ~]# tc qdisc show dev eth0
qdisc mq 0: root
qdisc fq 0: parent :1 limit 10000p flow_limit 100p buckets 1024 quantum 3028 initial_quantum 15140

針對的是fq的實現:

static struct Qdisc_ops fq_qdisc_ops __read_mostly = {
    .id        =    "fq",
    .priv_size    =    sizeof(struct fq_sched_data),

    .enqueue    =    fq_enqueue,

看看實現:

static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch)
{
    struct fq_sched_data *q = qdisc_priv(sch);
    struct fq_flow *f;

    if (unlikely(sch->q.qlen >= sch->limit))//fq管理的flow個數超過閾值
        return qdisc_drop(skb, sch);------------------------------丟包,但不會亂序

    f = fq_classify(skb, q);-------------根據skb的sk來獲取對應的flow,如果沒有,則申請一個。
    if (unlikely(f->qlen >= q->flow_plimit && f != &q->internal)) {
        q->stat_flows_plimit++;
        return qdisc_drop(skb, sch);
    }

    f->qlen++;
    if (skb_is_retransmit(skb))
        q->stat_tcp_retrans++;
    qdisc_qstats_backlog_inc(sch, skb);
    if (fq_flow_is_detached(f)) {
        fq_flow_add_tail(&q->new_flows, f);----------------------------------加到skb加到對應的flow中去,關鍵函數
        if (time_after(jiffies, f->age + q->flow_refill_delay))
            f->credit = max_t(u32, f->credit, q->quantum);
        q->inactive_flows--;
    }

    /* Note: this overwrites f->age */
    flow_queue_add(f, skb);

    if (unlikely(f == &q->internal)) {
        q->stat_internal_packets++;
    }
    sch->q.qlen++;

    return NET_XMIT_SUCCESS;
}

我們看一下加skb到flow中去的過程:

static void flow_queue_add(struct fq_flow *flow, struct sk_buff *skb)
{
    struct sk_buff *prev, *head = flow->head;

    skb->next = NULL;
    if (!head) {
        flow->head = skb;
        flow->tail = skb;
        return;
    }
    if (likely(!skb_is_retransmit(skb))) {
        flow->tail->next = skb;--------------------skb加到鏈表尾,所以絕對不會亂序
        flow->tail = skb;
        return;
    }

不管怎么樣,skb是加入到了對應的flow中了,就等着dequeue的時候發送了,間接地保證了時序。

同理也可以分析 :

static struct sk_buff *fq_dequeue(struct Qdisc *sch)
這個也是按順序取包發送,不會亂序。
從這個流程看,應該不會亂序,那么最終亂序的原因是?仔細查看我們的發包流程,我們一直強調是在sys態,還有一個中斷時發包的流程沒有分析。
我們來看i40e的napi發包模式:
i40e_napi_poll函數調用 i40e_clean_tx_irq,清理發送隊列的數據。
static bool i40e_clean_tx_irq(struct i40e_vsi *vsi,
                  struct i40e_ring *tx_ring, int napi_budget)//發送完之后資源回收
{
。。。。
        if (__netif_subqueue_stopped(tx_ring->netdev,
                         tx_ring->queue_index) &&
           !test_bit(__I40E_DOWN, &vsi->state)) {
            netif_wake_subqueue(tx_ring->netdev,
                        tx_ring->queue_index);
            ++tx_ring->tx_stats.restart_queue;
        }
。。。。。
}

具體查看 netif_wake_subqueue 的實現,

netif_wake_subqueue --》__netif_schedule --》__netif_reschedule 
static inline void __netif_reschedule(struct Qdisc *q)
{
    struct softnet_data *sd;
    unsigned long flags;

    local_irq_save(flags);
    sd = this_cpu_ptr(&softnet_data);
    q->next_sched = NULL;
    *sd->output_queue_tailp = q;
    sd->output_queue_tailp = &q->next_sched;//output_queue_tailp只會在中斷中操作,net_tx_action會獲取sd->output_queue,然后調用qdisc_run發包。
    raise_softirq_irqoff(NET_TX_SOFTIRQ);
    local_irq_restore(flags);
}

觸發軟中斷,而根據中斷綁核的設置,該中斷號綁定的cpu並不一定和xps的queue映射的cpu是同一個,這樣的話,就存在兩個cpu發送一個流的情況。一個是在sys態,調用qdisc_run發包,

一個是在軟中斷處理時,調用qdisc_run發包,這兩個cpu可能不是同一個。

由於發送的時候,還是要調用qdisc的spin_lock,所以雖然cpu不是同一個,但還是依靠自旋鎖控制了並發dequeue一個skb的情況。那就是不可能兩個cpu都dequeue到同一個skb的情況。

中斷里面處理流程:

        root_lock = qdisc_lock(q);
            if (spin_trylock(root_lock)) {-------------獲取qdisc的自旋鎖
                smp_mb__before_clear_bit();
                clear_bit(__QDISC_STATE_SCHED,
                      &q->state);//清理__QDISC_STATE_SCHED狀態,
                qdisc_run(q);-------------------------這個里面還是會判斷qdisc的state,如果是running狀態,則直接返回,不會調用__qdisc_run
                spin_unlock(root_lock);

殊途同歸,進入__qdisc_run:

void __qdisc_run(struct Qdisc *q)//進入該函數,此時應持有qdisc_lock
{
    int quota = weight_p;

    while (qdisc_restart(q)) {
        /*
         * Ordered by possible occurrence: Postpone processing if
         * 1. we've exceeded packet quota
         * 2. another process needs the CPU;
         */
        if (--quota <= 0 || need_resched()) {//配額用完了,__netif_schedule會觸發軟中斷
            __netif_schedule(q);
            break;
        }
    }

    qdisc_run_end(q);
}


static inline int qdisc_restart(struct Qdisc *q)
{
    struct netdev_queue *txq;
    struct net_device *dev;
    spinlock_t *root_lock;
    struct sk_buff *skb;
    bool validate;

    /* Dequeue packet */
    skb = dequeue_skb(q, &validate);
    if (unlikely(!skb))
        return 0;

    root_lock = qdisc_lock(q);
    dev = qdisc_dev(q);
    txq = skb_get_tx_queue(dev, skb);

    return sch_direct_xmit(skb, q, dev, txq, root_lock, validate);
}

所以按道理也能保證發送時序,但是由於我劫持了響應的網卡發包驅動,自己再做了一次緩存,而這些緩存的管理,是percpu的,所以qdisc持有鎖發送的時候,最終到了兩個不同的percpu的緩存,然后就存在發包亂序的可能了,畢竟兩個cpu再也看不到對方的存在了,也不會按順序從qdisc中取skb了,他們只管自己percpu緩存中的skb,所以會分別嘗試獲取:

HARD_TX_LOCK(dev, txq, smp_processor_id());這把鎖,然后發包。亂序就很正常了。
 
結論:
1.如果過多的cpu使用相同的tx隊列,那么加重tx對應的qdisc鎖的爭搶,也會增加對txq->_xmit_lock的爭搶,
為了將qdisc的鎖爭搶降低到最低,最好就是: 如果每個cpu只關聯了一個tx,甚至能消除競爭
也可以減小因為發送完成中斷造成的cache miss。
因此xps_cpus的配置最好結合 /proc/irq//smp_affinity, 映射最好在同一個cpu或者同一個numa node的cpu上。
 
2.如果在驅動層再做了緩存,要保證各個cpu的時序非常困難,所以只能保證開啟了xps的情況下,將xps映射到的cpu和網卡對應中斷完全對應,比1的要求還要嚴格,
因為多個cpu訪問的percpu緩存再也不相關了,同一個流如果有skb在不同的percpu緩存,時序無法保證。
 

當然還有一種情況是cpu的熱插拔,這個不在本文討論范圍之內。

 附:

一個net_device在初始化的時候:

void dev_init_scheduler(struct net_device *dev)
{
    dev->qdisc = &noop_qdisc;
。。。。
}

但是當網卡up的時候,調用dev_activate,會重新設置,如果是多隊列網卡,則設置為 mq_qdisc_ops:

void dev_activate(struct net_device *dev)
{
    int need_watchdog;

    /* No queueing discipline is attached to device;
     * create default one for devices, which need queueing
     * and noqueue_qdisc for virtual interfaces
     */

    if (dev->qdisc == &noop_qdisc)
        attach_default_qdiscs(dev);
。。。。。
}

static void attach_default_qdiscs(struct net_device *dev)
{
    struct netdev_queue *txq;
    struct Qdisc *qdisc;

    txq = netdev_get_tx_queue(dev, 0);

    if (!netif_is_multiqueue(dev) ||
        dev->priv_flags & IFF_NO_QUEUE) {
        netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL);
        dev->qdisc = txq->qdisc_sleeping;
        atomic_inc(&dev->qdisc->refcnt);
    } else {
        qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);-------------默認使用多隊列策略設置root的策略,即mq_qdisc_ops
        if (qdisc) {
            dev->qdisc = qdisc;
            qdisc->ops->attach(qdisc);
        }
    }
}
 
參考資料:
https://lwn.net/Articles/412062/
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM