在用戶線程綁定某個核的情況下,從某個線程發送的udp報文,偶爾出現了亂序。我們來分析下發包流程:
0xffffffff81593b30 : dev_hard_start_xmit+0x0/0x1a0 [kernel]------------進入driver層 0xffffffff81596b08 : __dev_queue_xmit+0x448/0x550 [kernel]--------------其實少了一個qdisc層的堆棧 0xffffffff81596c20 : dev_queue_xmit+0x10/0x20 [kernel]-----------------dev層 0xffffffff815a284d : neigh_resolve_output+0x11d/0x220 [kernel] 0xffffffff815dd65c : ip_finish_output+0x2ac/0x7a0 [kernel] 0xffffffff815dde53 : ip_output+0x73/0xe0 [kernel] 0xffffffff815dba87 : ip_local_out_sk+0x37/0x40 [kernel] 0xffffffff815dbdf3 : ip_queue_xmit+0x143/0x3a0 [kernel]----------------ip層 0xffffffff815f60fc : tcp_transmit_skb+0x52c/0xa20 [kernel] 0xffffffff815f687c : tcp_write_xmit+0x28c/0xcf0 [kernel] 0xffffffff815f755e : __tcp_push_pending_frames+0x2e/0xc0 [kernel] 0xffffffff815e54ac : tcp_push+0xec/0x120 [kernel] 0xffffffff815e8e70 : tcp_sendmsg+0xd0/0xc30 [kernel]---------------------tcp層 0xffffffff816153d9 : inet_sendmsg+0x69/0xb0 [kernel]---------------------inet層 0xffffffff815765ad : sock_aio_write+0x15d/0x180 [kernel] 0xffffffff81208093 : do_sync_write+0x93/0xe0 [kernel]---------------------vfs層 0xffffffff81208c75 : vfs_write+0x1c5/0x1f0 [kernel] 0xffffffff8120998f : sys_write+0x7f/0xe0 [kernel] 0xffffffff816c5715 : system_call_fastpath+0x1c/0x21 [kernel]
沒有故意去抓udp的堆棧,除了tcp層那部分不太一樣,其他都應該一樣,不影響我們分析。
可以看到,這個是sys態直接發送的案例,后面其實就是
也就是,整個發送過程都體現出來了。而且這個流程是在sys態完成的,不是在軟中斷中,軟中斷的話,需要從net_tx_action 中看起。
但是,如果對整個流程非常了解的人,可以看到堆棧中缺少一部分,那就是
從qdisc角度來看,雖然它的作用在於流量控制,但是也可以看做是dev層到驅動層的一個緩存層,從前面堆棧看,如果一個socket 固定從某個
cpu上發送,不會出現亂序,到了網卡驅動層,更不會亂序,那么到了qdisc層會怎么樣?
下面詳細分析下qdisc的可能亂序行為:
int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv) { 。。。 txq = netdev_pick_tx(dev, skb, accel_priv); q = rcu_dereference_bh(txq->qdisc); 。。。 if (q->enqueue) {------------enqueue不為空,流控 rc = __dev_xmit_skb(skb, q, dev, txq); goto out; } 。。。 }
拿到skb之后,怎么選擇哪個queue發送呢?
//隊列選擇函數,如果不是多隊列,ops->ndo_select_queue非空的話,則調用ops->ndo_select_queue,否則__netdev_pick_tx struct netdev_queue *netdev_pick_tx(struct net_device *dev, struct sk_buff *skb, void *accel_priv) { int queue_index = 0; #ifdef CONFIG_XPS u32 sender_cpu = skb->sender_cpu - 1; if (sender_cpu >= (u32)NR_CPUS) skb->sender_cpu = raw_smp_processor_id() + 1;-------------開啟了xps的話,會設置sender_cpu的值為當前cpu號+1 #endif if (dev->real_num_tx_queues != 1) {---------------------------多隊列的設備 const struct net_device_ops *ops = dev->netdev_ops; if (ops->ndo_select_queue)-----------i40e這個地方是NULL queue_index = ops->ndo_select_queue(dev, skb, accel_priv, __netdev_pick_tx); else queue_index = __netdev_pick_tx(dev, skb);-------------所以i40e走這個流程 if (!accel_priv) queue_index = netdev_cap_txqueue(dev, queue_index); } skb_set_queue_mapping(skb, queue_index);------------------保存對應的queue_index到skb->queue_mapping,發送的時候,會取這個來獲取對應的queue return netdev_get_tx_queue(dev, queue_index); }
既然i40e走的是默認的選queue_index的流程,那么就需要看一下這個函數:
static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb) { struct sock *sk = skb->sk; int queue_index = sk_tx_queue_get(sk);------之前保存的queue_index if (queue_index < 0 || skb->ooo_okay || queue_index >= dev->real_num_tx_queues) { int new_index = get_xps_queue(dev, skb);------開啟了xps,優先選擇 if (new_index < 0)--------沒選到 new_index = skb_tx_hash(dev, skb);--------則直接hash if (queue_index != new_index && sk && rcu_access_pointer(sk->sk_dst_cache)) sk_tx_queue_set(sk, new_index); queue_index = new_index; } return queue_index; }
在開啟了xps的情況下,get_xps_queue 會根據xps_map來選擇隊列,如果xps只綁定了一個cpu,則用那個對應的queue-index,否則根據skb來hash選擇:
//開啟xps,根據sender_cpu來選擇map static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb) { #ifdef CONFIG_XPS struct xps_dev_maps *dev_maps; struct xps_map *map; int queue_index = -1; rcu_read_lock(); dev_maps = rcu_dereference(dev->xps_maps); if (dev_maps) { map = rcu_dereference( dev_maps->cpu_map[skb->sender_cpu - 1]);//由於之前設置了sendercpu,所以這里取該cpu,找到對應的map if (map) { if (map->len == 1) queue_index = map->queues[0];//如果cpu只關聯當前net_device的一個隊列,當然直接選擇 else queue_index = map->queues[reciprocal_scale(skb_get_hash(skb), map->len)];//當前cpu關聯了多個隊列,則做下hash選擇 if (unlikely(queue_index >= dev->real_num_tx_queues)) queue_index = -1; } } rcu_read_unlock(); return queue_index; #else return -1; #endif }
我們目前設置的xps_map如下:
cat /sys/class/net/eth0/queues/tx-0/xps_cpus 00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000000,00000001
也就是一個隊列,只對應一個cpu。這個展示的結果,是從隊列的角度來展示對應的cpu,但是內核實現的時候,
實際上代碼內使用了反向映射,通過xps_dev_maps存放到cpu到tx隊列集合的映射:參照:dev_maps->cpu_map 函數的 實現過程,
按照我們目前這樣的這樣的話,一個隊列對應的是一個核,一個核針對某個網卡,也只是某一個隊列,按道理也不會亂序啊。因為txq因為queue-index唯一確定,
而qdisc 又是由queue->qdisc唯一確定的。
選擇好了queue,那么來看對應的qdisc實現:
static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q, struct net_device *dev, struct netdev_queue *txq) { spinlock_t *root_lock = qdisc_lock(q); bool contended; int rc; qdisc_pkt_len_init(skb); qdisc_calculate_pkt_len(skb, q); /* * Heuristic to force contended enqueues to serialize on a * separate lock before trying to get qdisc main lock. * This permits __QDISC_STATE_RUNNING owner to get the lock more often * and dequeue packets faster. */ contended = qdisc_is_running(q); if (unlikely(contended)) spin_lock(&q->busylock); spin_lock(root_lock); ........//只保留一種條件,其他忽略 rc = q->enqueue(skb, q) & NET_XMIT_MASK;//入隊qdisc,對於fq,其實就是將當前的skb按順序加到flow的尾部 if (qdisc_run_begin(q)) { if (unlikely(contended)) { spin_unlock(&q->busylock); contended = false; } __qdisc_run(q);//取包調dequeue,也是按順序的, } } spin_unlock(root_lock); if (unlikely(contended)) spin_unlock(&q->busylock); return rc; }
好的,正式進入了qdisc層,來看一下q->enqueue的實現:
當然如果把qdisc看做一個對象的話,它的背后還有一堆class和filter,由於我們環境使用的是:
[root@localhost ~]# tc qdisc show dev eth0 qdisc mq 0: root qdisc fq 0: parent :1 limit 10000p flow_limit 100p buckets 1024 quantum 3028 initial_quantum 15140
針對的是fq的實現:
static struct Qdisc_ops fq_qdisc_ops __read_mostly = { .id = "fq", .priv_size = sizeof(struct fq_sched_data), .enqueue = fq_enqueue,
看看實現:
static int fq_enqueue(struct sk_buff *skb, struct Qdisc *sch) { struct fq_sched_data *q = qdisc_priv(sch); struct fq_flow *f; if (unlikely(sch->q.qlen >= sch->limit))//fq管理的flow個數超過閾值 return qdisc_drop(skb, sch);------------------------------丟包,但不會亂序 f = fq_classify(skb, q);-------------根據skb的sk來獲取對應的flow,如果沒有,則申請一個。 if (unlikely(f->qlen >= q->flow_plimit && f != &q->internal)) { q->stat_flows_plimit++; return qdisc_drop(skb, sch); } f->qlen++; if (skb_is_retransmit(skb)) q->stat_tcp_retrans++; qdisc_qstats_backlog_inc(sch, skb); if (fq_flow_is_detached(f)) { fq_flow_add_tail(&q->new_flows, f);----------------------------------加到skb加到對應的flow中去,關鍵函數 if (time_after(jiffies, f->age + q->flow_refill_delay)) f->credit = max_t(u32, f->credit, q->quantum); q->inactive_flows--; } /* Note: this overwrites f->age */ flow_queue_add(f, skb); if (unlikely(f == &q->internal)) { q->stat_internal_packets++; } sch->q.qlen++; return NET_XMIT_SUCCESS; }
我們看一下加skb到flow中去的過程:
static void flow_queue_add(struct fq_flow *flow, struct sk_buff *skb) { struct sk_buff *prev, *head = flow->head; skb->next = NULL; if (!head) { flow->head = skb; flow->tail = skb; return; } if (likely(!skb_is_retransmit(skb))) { flow->tail->next = skb;--------------------skb加到鏈表尾,所以絕對不會亂序 flow->tail = skb; return; }
不管怎么樣,skb是加入到了對應的flow中了,就等着dequeue的時候發送了,間接地保證了時序。
同理也可以分析 :
static bool i40e_clean_tx_irq(struct i40e_vsi *vsi, struct i40e_ring *tx_ring, int napi_budget)//發送完之后資源回收 { 。。。。 if (__netif_subqueue_stopped(tx_ring->netdev, tx_ring->queue_index) && !test_bit(__I40E_DOWN, &vsi->state)) { netif_wake_subqueue(tx_ring->netdev, tx_ring->queue_index); ++tx_ring->tx_stats.restart_queue; } 。。。。。 }
具體查看 netif_wake_subqueue 的實現,
static inline void __netif_reschedule(struct Qdisc *q) { struct softnet_data *sd; unsigned long flags; local_irq_save(flags); sd = this_cpu_ptr(&softnet_data); q->next_sched = NULL; *sd->output_queue_tailp = q; sd->output_queue_tailp = &q->next_sched;//output_queue_tailp只會在中斷中操作,net_tx_action會獲取sd->output_queue,然后調用qdisc_run發包。
raise_softirq_irqoff(NET_TX_SOFTIRQ);
local_irq_restore(flags);
}
觸發軟中斷,而根據中斷綁核的設置,該中斷號綁定的cpu並不一定和xps的queue映射的cpu是同一個,這樣的話,就存在兩個cpu發送一個流的情況。一個是在sys態,調用qdisc_run發包,
一個是在軟中斷處理時,調用qdisc_run發包,這兩個cpu可能不是同一個。
由於發送的時候,還是要調用qdisc的spin_lock,所以雖然cpu不是同一個,但還是依靠自旋鎖控制了並發dequeue一個skb的情況。那就是不可能兩個cpu都dequeue到同一個skb的情況。
中斷里面處理流程:
root_lock = qdisc_lock(q); if (spin_trylock(root_lock)) {-------------獲取qdisc的自旋鎖 smp_mb__before_clear_bit(); clear_bit(__QDISC_STATE_SCHED, &q->state);//清理__QDISC_STATE_SCHED狀態, qdisc_run(q);-------------------------這個里面還是會判斷qdisc的state,如果是running狀態,則直接返回,不會調用__qdisc_run spin_unlock(root_lock);
殊途同歸,進入__qdisc_run:
void __qdisc_run(struct Qdisc *q)//進入該函數,此時應持有qdisc_lock { int quota = weight_p; while (qdisc_restart(q)) { /* * Ordered by possible occurrence: Postpone processing if * 1. we've exceeded packet quota * 2. another process needs the CPU; */ if (--quota <= 0 || need_resched()) {//配額用完了,__netif_schedule會觸發軟中斷 __netif_schedule(q); break; } } qdisc_run_end(q); }
所以按道理也能保證發送時序,但是由於我劫持了響應的網卡發包驅動,自己再做了一次緩存,而這些緩存的管理,是percpu的,所以qdisc持有鎖發送的時候,最終到了兩個不同的percpu的緩存,然后就存在發包亂序的可能了,畢竟兩個cpu再也看不到對方的存在了,也不會按順序從qdisc中取skb了,他們只管自己percpu緩存中的skb,所以會分別嘗試獲取:
也可以減小因為發送完成中斷造成的cache miss。
因此xps_cpus的配置最好結合 /proc/irq//smp_affinity, 映射最好在同一個cpu或者同一個numa node的cpu上。
當然還有一種情況是cpu的熱插拔,這個不在本文討論范圍之內。
附:
一個net_device在初始化的時候:
void dev_init_scheduler(struct net_device *dev) { dev->qdisc = &noop_qdisc; 。。。。 }
但是當網卡up的時候,調用dev_activate,會重新設置,如果是多隊列網卡,則設置為 mq_qdisc_ops:
void dev_activate(struct net_device *dev) { int need_watchdog; /* No queueing discipline is attached to device; * create default one for devices, which need queueing * and noqueue_qdisc for virtual interfaces */ if (dev->qdisc == &noop_qdisc) attach_default_qdiscs(dev); 。。。。。 } static void attach_default_qdiscs(struct net_device *dev) { struct netdev_queue *txq; struct Qdisc *qdisc; txq = netdev_get_tx_queue(dev, 0); if (!netif_is_multiqueue(dev) || dev->priv_flags & IFF_NO_QUEUE) { netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL); dev->qdisc = txq->qdisc_sleeping; atomic_inc(&dev->qdisc->refcnt); } else { qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);-------------默認使用多隊列策略設置root的策略,即mq_qdisc_ops if (qdisc) { dev->qdisc = qdisc; qdisc->ops->attach(qdisc); } } }