二層發送中,實現qdisc的主要函數是__dev_xmit_skb和net_tx_action,本篇將分析qdisc實現的原理,僅對框架進行分析。
其框架如下圖所示
qdisc初始化
pktsched_init注冊了幾個系統算法,register_qdisc只是添加算法到一個全局的鏈表中
注冊設備驅動的時候會調用register_netdevice(), register_netdevice()會調用dev_init_scheduler來初始化默認的qidsc為noop_qdisc,
noop_qdisc算法什么也不做,enqueue函數為丟棄數據包。
當ifconfig up網卡后,會調用dev_open()最終分配默認的qdisc算法給隊列;
/* "NOOP" scheduler: the best scheduler, recommended for all interfaces under all circumstances. It is difficult to invent anything faster or cheaper. */ static int noop_enqueue(struct sk_buff *skb, struct Qdisc *qdisc) { kfree_skb(skb); return NET_XMIT_CN; } static struct sk_buff *noop_dequeue(struct Qdisc *qdisc) { return NULL; } struct Qdisc_ops noop_qdisc_ops __read_mostly = { .id = "noop", .priv_size = 0, .enqueue = noop_enqueue, .dequeue = noop_dequeue, .peek = noop_dequeue, .owner = THIS_MODULE, }; static struct netdev_queue noop_netdev_queue = { .qdisc = &noop_qdisc, .qdisc_sleeping = &noop_qdisc, }; struct Qdisc noop_qdisc = { .enqueue = noop_enqueue, .dequeue = noop_dequeue, .flags = TCQ_F_BUILTIN, .ops = &noop_qdisc_ops, .list = LIST_HEAD_INIT(noop_qdisc.list), .q.lock = __SPIN_LOCK_UNLOCKED(noop_qdisc.q.lock), .dev_queue = &noop_netdev_queue, .busylock = __SPIN_LOCK_UNLOCKED(noop_qdisc.busylock), }; //此時網絡設備還不能發送任何數據包,必須等網絡設備啟用之后才能發送數據包 void dev_init_scheduler(struct net_device *dev) { dev->qdisc = &noop_qdisc; netdev_for_each_tx_queue(dev, dev_init_scheduler_queue, &noop_qdisc); dev_init_scheduler_queue(dev, &dev->rx_queue, &noop_qdisc); setup_timer(&dev->watchdog_timer, dev_watchdog, (unsigned long)dev); }
TC 框架的初始化
/* tc可以使用以下命令對QDisc、類和過濾器進行操作: add,在一個節點里加入一個QDisc、類或者過濾器。添加時,需要傳遞一個祖先作為參數,傳遞參數時既可以使用ID也可以直接傳遞設備的根。如果要建立一個QDisc或者過濾器,可以使用句柄(handle)來命名;如果要建立一個類,可以使用類識別符(classid)來命名。 remove,刪除有某個句柄(handle)指定的QDisc,根QDisc(root)也可以刪除。被刪除QDisc上的所有子類以及附屬於各個類的過濾器都會被自動刪除。 change,以替代的方式修改某些條目。除了句柄(handle)和祖先不能修改以外,change命令的語法和add命令相同。換句話說,change命令不能一定節點的位置。 replace,對一個現有節點進行近於原子操作的刪除/添加。如果節點不存在,這個命令就會建立節點。 link,只適用於DQisc,替代一個現有的節點。 tc qdisc [ add | change | replace | link ] dev DEV [ parent qdisc-id | root ] [ handle qdisc-id ] qdisc [ qdisc specific parameters ] tc class [ add | change | replace ] dev DEV parent qdisc-id [ classid class-id ] qdisc [ qdisc specific parameters ] tc filter [ add | change | replace ] dev DEV [ parent qdisc-id | root ] protocol protocol prio priority filtertype [ filtertype specific parameters ] flowid flow-id tc [-s | -d ] qdisc show [ dev DEV ] tc [-s | -d ] class show dev DEV tc filter show dev DEV */ /* (四)用戶空間如何和內核通信 iproute2是一個用戶空間的程序,它的功能是解釋以tc開頭的命令,如果解釋成功,把它們通過AF_NETLINK的socket傳給Linux的內核空間,使用的netlink協議類型是NETLINK_ROUTE。 發送的netlink數據包都必須包含兩個字段:protocol和msgtype,內核根據這兩個字段來定位接收函數。 在系統初始化的時候將會調用如下函數: */ static int __init pktsched_init(void) { int err; err = register_pernet_subsys(&psched_net_ops); if (err) { pr_err("pktsched_init: " "cannot initialize per netns operations\n"); return err; } register_qdisc(&pfifo_fast_ops); register_qdisc(&pfifo_qdisc_ops); register_qdisc(&bfifo_qdisc_ops); register_qdisc(&pfifo_head_drop_qdisc_ops); register_qdisc(&mq_qdisc_ops); register_qdisc(&noqueue_qdisc_ops); //tc filer 的注冊在tc_filter_init //通過rtnetlink_rcv_msg和應用層netlink方式交互 //其中的rtnl_register()函數用於注冊TC要接收的消息類型以及對應的接收函數。 //每個表頭rtnl_msg_handlers[i]上面存儲RTM_NR_MSGTYPES個rtnl_link,圖解見TC流量控制實現分析 rtnl_register(PF_UNSPEC, RTM_NEWQDISC, tc_modify_qdisc, NULL, NULL); //tc qdisc add和tc calss change的時候會調用tc_modify_qdisc rtnl_register(PF_UNSPEC, RTM_DELQDISC, tc_get_qdisc, NULL, NULL);//tc qdisc del的時候會調用tc_modify_qdisc rtnl_register(PF_UNSPEC, RTM_GETQDISC, tc_get_qdisc, tc_dump_qdisc, NULL);//tc qdisc ls 的時候會調用tc_modify_qdisc rtnl_register(PF_UNSPEC, RTM_NEWTCLASS, tc_ctl_tclass, NULL, NULL); //tc class add 的時候會調用這個 rtnl_register(PF_UNSPEC, RTM_DELTCLASS, tc_ctl_tclass, NULL, NULL);//tc class del 的時候會調用這個 rtnl_register(PF_UNSPEC, RTM_GETTCLASS, tc_ctl_tclass, tc_dump_tclass, NULL);//tc class ls的時候調用這個 return 0; }
初始化的默認算法
對於多隊列設備,會先調用mq_qdisc_ops算法來做一次代理
在mq_init中為每個隊列分配默認的pfifo_fast_ops算法保持在mq_qdisc_ops的private結構中
然后馬上調用mq_attach, 把這些分配好的qdisc,設置到設備隊列的qdisc_sleeping中。
最終在dev_activate中把qdisc_sleeping中的qdisc算法,通過transition_one_qdisc賦值到netdev_queue->qdisc
int dev_open(struct net_device *dev) { .............................. ret = __dev_open(dev); .............................. } static int __dev_open(struct net_device *dev) { ................... dev_activate(dev); add_device_randomness(dev->dev_addr, dev->addr_len); ...................... } void dev_activate(struct net_device *dev) { .................................. /* No queueing discipline is attached to device; * create default one for devices, which need queueing * and noqueue_qdisc for virtual interfaces */ /*如果沒有Qdisc的規則設置到device,就給設備create一個默認的規則*/ if (dev->qdisc == &noop_qdisc) attach_default_qdiscs(dev);//設置默認qdisc算法 ......................... }
static void attach_default_qdiscs(struct net_device *dev) { struct netdev_queue *txq; struct Qdisc *qdisc; txq = netdev_get_tx_queue(dev, 0); /*這個地方判斷dev是否是多Q的設備,如果是不是多Q的設備,或者tx_queue_len為0 就會進入if判斷 *這個地方比較疑惑,if內部使用的是一個循環去遍歷tx_queue 感覺是要處理multiqueue的狀況,現在 *卻用來處理一個的...*/ if (!netif_is_multiqueue(dev) || dev->tx_queue_len == 0) { netdev_for_each_tx_queue(dev, attach_one_default_qdisc, NULL); dev->qdisc = txq->qdisc_sleeping; atomic_inc(&dev->qdisc->refcnt); } else { /*如果不滿足上述條件,就創建一個默認的,mq_qdisc_ops,從他的注釋來看 mq 的Qdisc只使用與多Q狀況*/ qdisc = qdisc_create_dflt(txq, &mq_qdisc_ops, TC_H_ROOT);//調用mq_init if (qdisc) { dev->qdisc = qdisc; qdisc->ops->attach(qdisc);//mq_attach } } }
truct Qdisc *qdisc_create_dflt(struct netdev_queue *dev_queue, const struct Qdisc_ops *ops, unsigned int parentid) { struct Qdisc *sch; if (!try_module_get(ops->owner)) return NULL; sch = qdisc_alloc(dev_queue, ops); if (IS_ERR(sch)) { module_put(ops->owner); return NULL; } sch->parent = parentid; if (!ops->init || ops->init(sch, NULL) == 0) //mq_init return sch; qdisc_destroy(sch); return NULL; } struct mq_sched { struct Qdisc **qdiscs; }; static int mq_init(struct Qdisc *sch, struct nlattr *opt) { struct net_device *dev = qdisc_dev(sch); struct mq_sched *priv = qdisc_priv(sch); struct netdev_queue *dev_queue; struct Qdisc *qdisc; unsigned int ntx; if (sch->parent != TC_H_ROOT) return -EOPNOTSUPP; if (!netif_is_multiqueue(dev)) return -EOPNOTSUPP; /* pre-allocate qdiscs, attachment can't fail */ priv->qdiscs = kcalloc(dev->num_tx_queues, sizeof(priv->qdiscs[0]), GFP_KERNEL); if (priv->qdiscs == NULL) return -ENOMEM; for (ntx = 0; ntx < dev->num_tx_queues; ntx++) { dev_queue = netdev_get_tx_queue(dev, ntx); qdisc = qdisc_create_dflt(dev_queue, get_default_qdisc_ops(dev, ntx), //為每個隊列分配默認的qdisc算法 TC_H_MAKE(TC_H_MAJ(sch->handle), TC_H_MIN(ntx + 1))); if (qdisc == NULL) goto err; priv->qdiscs[ntx] = qdisc; qdisc->flags |= TCQ_F_ONETXQUEUE | TCQ_F_NOPARENT; } sch->flags |= TCQ_F_MQROOT; return 0; err: mq_destroy(sch); return -ENOMEM; } static inline const struct Qdisc_ops * get_default_qdisc_ops(const struct net_device *dev, int ntx) { return ntx < dev->real_num_tx_queues ? default_qdisc_ops : &pfifo_fast_ops; } static void mq_attach(struct Qdisc *sch) { struct net_device *dev = qdisc_dev(sch); struct mq_sched *priv = qdisc_priv(sch); struct Qdisc *qdisc, *old; unsigned int ntx; for (ntx = 0; ntx < dev->num_tx_queues; ntx++) { qdisc = priv->qdiscs[ntx]; old = dev_graft_qdisc(qdisc->dev_queue, qdisc); //設置qdisc到設備隊列的qdisc_sleeping中 if (old) qdisc_destroy(old); #ifdef CONFIG_NET_SCHED if (ntx < dev->real_num_tx_queues) qdisc_hash_add(qdisc); #endif } kfree(priv->qdiscs); priv->qdiscs = NULL; } /* Attach toplevel qdisc to device queue. */ struct Qdisc *dev_graft_qdisc(struct netdev_queue *dev_queue, struct Qdisc *qdisc) { struct Qdisc *oqdisc = dev_queue->qdisc_sleeping; spinlock_t *root_lock; root_lock = qdisc_lock(oqdisc); spin_lock_bh(root_lock); /* Prune old scheduler */ if (oqdisc && atomic_read(&oqdisc->refcnt) <= 1) qdisc_reset(oqdisc); /* ... and graft new one */ if (qdisc == NULL) qdisc = &noop_qdisc; dev_queue->qdisc_sleeping = qdisc; rcu_assign_pointer(dev_queue->qdisc, &noop_qdisc); spin_unlock_bh(root_lock); return oqdisc; }
我們在使用的netdevice基本上都是單隊列的,默認情況下都是這個pfifo_fast_ops隊列;
/*pfifo_fast_ops pfifo_qdisc_ops tbf_qdisc_ops sfq_qdisc_ops prio_class_ops這幾個都為出口,ingress_qdisc_ops為入口 */ struct Qdisc_ops pfifo_fast_ops { //;//__read_mostly = { .id = "pfifo_fast", .priv_size = sizeof(struct pfifo_fast_priv), .enqueue = pfifo_fast_enqueue, .dequeue = pfifo_fast_dequeue, .peek = pfifo_fast_peek, .init = pfifo_fast_init, .reset = pfifo_fast_reset, .dump = pfifo_fast_dump, .owner = THIS_MODULE, };
可以看到TCQ_F_CAN_BYPASS,這個flag置位,就表明數據包發送不一定非得走隊列的規則,可以by pass這個規則,直接通過發送到driver,不過在一般沒有阻塞的通訊狀況下,有了這個flag,基本就都是直接發送出去了!
假設interface就是使用的pfifo_fast Qdisc規則,那么我們調用的enqueue直接走到pfifo_fast_enqueue,在里面就直接放到隊列里,如果超出了最大的積攢數量就DROP掉了,返回NET_XMIT_DROP
//通過skb->priority計算出band,從而來確定把該SKB加入到pfifo_fast_priv的q[i]隊列中 static int pfifo_fast_enqueue(struct sk_buff *skb, struct Qdisc* qdisc) //和下面的pfifo_fast_enqueue結合使用 {/*先是要判斷一下在Q中的數據包的數量,是否超過了tx_queue_len的值*/ if (skb_queue_len(&qdisc->q) < qdisc_dev(qdisc)->tx_queue_len) { int band = prio2band[skb->priority & TC_PRIO_MAX]; struct pfifo_fast_priv *priv = qdisc_priv(qdisc); struct sk_buff_head *list = band2list(priv, band); //例如0 1 2頻道都有數據,則bitmap=二進制0111,也就是7,bitmap2band[7]對應0,也就是首先發送第0頻道pfifo_fast_priv->q[0]skb隊列數據, //同理如果0發送完畢,則bitmap變為0x0110,也就是6,bitmap2band[6]對應1,也就是發送第1頻道pfifo_fast_priv->q[1]skb隊列數據, priv->bitmap |= (1 << band); qdisc->q.qlen++; return __qdisc_enqueue_tail(skb, qdisc, list); } return qdisc_drop(skb, qdisc); }
__qdisc_run
回到__dev_xmit_skb 這個函數,enqueue完畢之后,如果這個Qdisc不是Running狀態,就開啟Running狀態,然后調用了這個函數__qdisc_run(q);
在其中主要是調用了qdisc_restart來從隊列中dequeue出封包,然后再調用sch_direct_xmit函數去直接發送封包,這個函數我們上面有分析過,就是直接發送給driver了
然后出現BUSY的就requeue到隊列中。因為有可能從隊列中取封包,所以這個函數可能發若干個包,要注意的是這些發送封包的過程都是出於process context
/* __QDISC_STATE_RUNNING標志用於保證一個流控對象不會同時被多個例程運行。 軟中斷線程的動作:運行加入到output_queue鏈表中的所有流控對象,如果試圖運行某個流控對象時,發現已經有其他內核路徑在運行這個對象,直接返回,並試圖運行下一個流控對象。 */ void __qdisc_run(struct Qdisc *q) { int quota = weight_p; int packets; /*這個函數是調用qdisc_restart去發送Q中的數據包,packet記錄這次發送了多少...*/ while (qdisc_restart(q, &packets)) { /* * Ordered by possible occurrence: Postpone processing if * 1. we've exceeded packet quota * 2. another process needs the CPU; *//* 這邊會有限定額度64個封包,如果超過64個就不能再次連續發了, * 需要以后執行softirq去發送了*/ quota -= packets; if (quota <= 0 || need_resched()) {//將本隊列加入軟中斷的output_queue鏈表中。 /*將隊列加入發送軟中斷NET_TX_SOFTIRQ的處理隊列,當軟中斷被執行時,隊列又會繼續發送數據包。*/ __netif_schedule(q);//激活發送軟件中的,最終調用net_tx_action break; } } qdisc_run_end(q); }

//__qdisc_run -> qdisc_restart -> dequeue_skb -> prio_dequeue(這里面有個遞歸調用過程) -> qdisc_dequeue_head static inline int qdisc_restart(struct Qdisc *q, int *packets) { struct netdev_queue *txq; struct net_device *dev; spinlock_t *root_lock; struct sk_buff *skb; bool validate; /* Dequeue packet */ skb = dequeue_skb(q, &validate, packets); if (unlikely(!skb)) return 0; root_lock = qdisc_lock(q); dev = qdisc_dev(q); txq = skb_get_tx_queue(dev, skb); /*這個函數之前在將第一種狀況的時候講過,在Qdisc的狀況下 也有通過這個函數直接發送的情況 *實際上這個函數是在直接去發送,這個可以直接發送之前留存下來的包,所以函數的解釋是 *可以發送若干個封包*/ return sch_direct_xmit(skb, q, dev, txq, root_lock, validate); }
假設已經發送了若干個封包了,已經超過64個,那么會調用__netif_schedule去打開softirq利用軟中斷去發送在queue的封包;
/* * 激活數據包輸出軟中斷有多個接口,而 * __netif_schedule()是最常用的。 *///激活發送軟件中的,最終調用net_tx_action void __netif_schedule(struct Qdisc *q) { /* * 如果輸出網絡設備沒有處於流量 * 控制的調度中,則調用__netif_reschedule() * 激活輸出軟中斷 */ if (!test_and_set_bit(__QDISC_STATE_SCHED, &q->state)) __netif_reschedule(q); }
//把Qdisc中的數據放入cpu sd的output_queue_tailp輸出隊列,將隊列加入發送軟中斷NET_TX_SOFTIRQ的處理隊列,當軟中斷被執行時,隊列又會繼續發送數據包。__netif_reschedule /* 由於軟中斷被激活,軟中斷的優先級僅次於硬中斷,這樣就保證了隊列會被及時的運行,即保證了數據包會被及時的發送。 *///激活發送軟件中的,最終調用net_tx_action //dev_queue_xmit -> __dev_xmit_skb -> __qdisc_run最終調用到該函數,把流控對象Qdisc添加到CPU軟中斷的output_queue static inline void __netif_reschedule(struct Qdisc *q) { struct softnet_data *sd; unsigned long flags; /* * 將網絡設備鏈接到softnet_data中的output_queu * 隊列上,然后激活網絡輸出軟中斷對該 * 隊列進行處理。 */ local_irq_save(flags); sd = &__get_cpu_var(softnet_data); q->next_sched = NULL; ////在net_dev_init中,sd->output_queue_tailp = &sd->output_queue;所以相當於把q添加到了output_queue隊列中 *sd->output_queue_tailp = q; sd->output_queue_tailp = &q->next_sched; raise_softirq_irqoff(NET_TX_SOFTIRQ); //激活發送軟件中的,最終調用net_tx_action local_irq_restore(flags); }
net_tx_action
net_tx_action便是軟中斷的執行函數,主要是做2件事情
第一件事情就是free使用完的skb,driver一般發送完數據之后,就會調用dev_kfree_skb_irq 屆時這個地方就是來free的
第二件事情就是調用qdisc_run發送數據包了
/* * net_tx_action()是數據包輸出軟中斷的例程, * 一旦激活便會遍歷output_queue隊列中 * 待處理的輸出網絡設備,然后調用 * qdisc_run()在合適的時機發送數據包。 * 數據包輸出軟中斷通常有netif_schedule()激活。 */ //qos tc 流量控制的時候會用到 static void net_tx_action(struct softirq_action *h) { struct softnet_data *sd = this_cpu_ptr(&softnet_data); /* * 如果當前CPU的softnet_data中存在已完成 * 輸出待釋放的數據包,則遍歷 * completion_queue隊列,釋放該隊列中所有 * 數據包,對於發送而言,硬中斷只是通過網卡把包發走,但是回收內存的事情是通過軟中斷來做的, *設備驅動發送完數據之后,會調用dev_kfree_skb_irq,不過也有的設備比較個別 *自己去free,這個其實也沒有什么問題的...省掉了軟中斷的處理*/ */ if (sd->completion_queue) { struct sk_buff *clist; local_irq_disable(); clist = sd->completion_queue; sd->completion_queue = NULL; local_irq_enable(); while (clist) { struct sk_buff *skb = clist; clist = clist->next; WARN_ON(atomic_read(&skb->users)); if (likely(get_kfree_skb_cb(skb)->reason == SKB_REASON_CONSUMED)) trace_consume_skb(skb); else trace_kfree_skb(skb, net_tx_action); if (skb->fclone != SKB_FCLONE_UNAVAILABLE) __kfree_skb(skb); else __kfree_skb_defer(skb); } __kfree_skb_flush(); } /* * 如果當前CPU的softnet_data中存在待處理的輸出網絡 * 設備,則遍歷output_queue隊列,調用qdisc_run()來發送 * 數據包或者再次調度數據包輸出軟中斷,在 * 合適的時機發送數據包。 */ if (sd->output_queue) { struct Qdisc *head; local_irq_disable(); head = sd->output_queue; sd->output_queue = NULL; sd->output_queue_tailp = &sd->output_queue; local_irq_enable(); while (head) { struct Qdisc *q = head; spinlock_t *root_lock; head = head->next_sched; root_lock = qdisc_lock(q); if (spin_trylock(root_lock)) { smp_mb__before_atomic(); clear_bit(__QDISC_STATE_SCHED, &q->state); qdisc_run(q); spin_unlock(root_lock); } else { if (!test_bit(__QDISC_STATE_DEACTIVATED, &q->state)) { __netif_reschedule(q); } else { smp_mb__before_atomic(); clear_bit(__QDISC_STATE_SCHED, &q->state); } } } }
根據設備有無enqueue的方法,可以分成兩種發送數據包的方式,第一就是有擁塞控制的數據傳輸,第二個就是什么都沒有的直接傳輸到driver的,當然大部分的於外界溝通的interface都屬於第一種,像loopback,tunnel一些設備就屬於第二種沒有enqueue的!
2. 對於有擁塞控制的數據傳輸,也有2條路徑,第一條就是在滿足3個前提條件下,直接發送數據包到硬件,和上述第二種case是一樣的, 第二條就是出現擁塞的狀況,就是有封包發送不成功,或者數據包量比較大的狀況,這時候會用到enqueue,應該是保證順序,所以一般q有包的狀況 就都需要enqueue,然后再去dequeue發送到硬件,畢竟進程的上下文不會讓你過多的占用時間,有一定的量的限制,限制條件到了就會中斷發送,改用軟中斷的方式!
從抓包工具中抓出來的網絡日志的現象以及其他可疑的點來看可以分為2類:
1. 有些數據包被不斷的重傳(並不是TCP/UDP層的重傳),同一個ipid的包不斷的被抓到
2.數據包被送到netdevice層,但是並沒有看到被送到driver
問題一:
主要原因是driver 發送數據包出現問題,返回Error,導致數據包被放到Qdisc的queue中,但是還是會不斷被嘗試着重新發送!
下面我們從code 邏輯來分析:
正常的flow一般情況下是不會enqueue的,會直接發送,如果一旦driver發送失敗了,就先將數據包enqueue,詳細的發送過程可以參考文章:
最接近driver的函數__netdev_start_xmit如果發送失敗會返回一個Error,如下面的值,例如Tx_Busy
enum netdev_tx { __NETDEV_TX_MIN = INT_MIN, /* make sure enum is signed */ NETDEV_TX_OK = 0x00, /* driver took care of packet */ NETDEV_TX_BUSY = 0x10, /* driver tx path was busy*/ NETDEV_TX_LOCKED = 0x20, /* driver tx lock was already taken */ }; static inline netdev_tx_t __netdev_start_xmit(const struct net_device_ops *ops, struct sk_buff *skb, struct net_device *dev, bool more) { skb->xmit_more = more ? 1 : 0; return ops->ndo_start_xmit(skb, dev); } 返回一個錯誤的值,表示這個數據包發送出現問題,但是目前Qdisc會進行擁塞處理,將數據包在放到queue中,下面的代碼段是在sch_direct_xmit函數中返回值處理的部分 如果發送成功,就會返回Q的len,如果NETDEV_TX_LOCKED會說明driver去lock的時候失敗,其他狀況下一般認為是默認的錯誤即TX BUSY,這個時候會調用dev_requeue_skb把數據包重新放到Qdisc的隊列中 /*進行返回值處理! 如果ret < NET_XMIT_MASK 為true 否則 flase*/ if (dev_xmit_complete(ret)) { /* Driver sent out skb successfully or skb was consumed */ /*這個地方需要注意可能有driver的負數的case,也意味着這個skb被drop了*/ ret = qdisc_qlen(q);//成功發送報文,如果緩存區中還有報文,則嘗試繼續發送報文 } else { /* Driver returned NETDEV_TX_BUSY - requeue skb */ if (unlikely(ret != NETDEV_TX_BUSY)) net_warn_ratelimited("BUG %s code %d qlen %d\n", dev->name, ret, q->q.qlen); /*發生Tx Busy的時候,重新進行requeue*/ ret = dev_requeue_skb(skb, q);//發送失敗,則 入隊重新進行發送 }
把skb放到q->gso_skb中,然后增加q數量,然后調用__netif_schedule,這個是來使用軟中斷去發送數據包!
/* Modifications to data participating in scheduling must be protected with * qdisc_lock(qdisc) spinlock. * * The idea is the following: * - enqueue, dequeue are serialized via qdisc root lock * - ingress filtering is also serialized via qdisc root lock * - updates to tree and tree walking are only done under the rtnl mutex. */ static inline int dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q) { q->gso_skb = skb; q->qstats.requeues++; qdisc_qstats_backlog_inc(q, skb); q->q.qlen++; /* it's still part of the queue */ __netif_schedule(q); return 0; }
基本上這個包被放入queue中,還會通過軟中斷,或者下一個數據包包來的時候去觸發__qdisc_run, 這里應該就是誰快就使用誰吧。
中間發送的過程中重點說一點就是dequeue skb的時候,發現TX stop的狀況下都不會dequeue出skb
從下面來看,數據包仍然是能夠被dequeue出來,之后還是會走到dev_hard_start_xmit 發送給driver, 如果driver還是無作為的話,那就看到netdevice層同一個ipid的包被不斷的重發...

/* Note that dequeue_skb can possibly return a SKB list (via skb->next). * A requeued skb (via q->gso_skb) can also be a SKB list. */ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate, int *packets) { struct sk_buff *skb = q->gso_skb; const struct netdev_queue *txq = q->dev_queue; *packets = 1; *validate = true; if (unlikely(skb)) { /* check the reason of requeuing without tx lock first */ txq = skb_get_tx_queue(txq->dev, skb); if (!netif_xmit_frozen_or_stopped(txq)) { q->gso_skb = NULL; qdisc_qstats_backlog_dec(q, skb); q->q.qlen--; } else//如果TXQ被stop了,那么無法dequeue出skb 就返回空 skb = NULL; /* skb in gso_skb were already validated */ *validate = false; } else { //如果是多Q或者txq沒有被stop 就dequeue skb if (!(q->flags & TCQ_F_ONETXQUEUE) || !netif_xmit_frozen_or_stopped(txq)) { skb = q->dequeue(q); if (skb && qdisc_may_bulk(q)) try_bulk_dequeue_skb(q, skb, txq, packets); } } return skb; }
問題 2:
如果TCP協議中有數據包發送,送到netdevice了,但是driver並沒有收到,在網絡抓包工具中也沒有顯示出來任何發包的痕跡,不過一般情況下是 發着發着就不能發了。這個的原因主要是上層管理interface的process或者driver調用了netif_tx_stop_queue, 這個函數的作用就是把interface的txq停掉,不讓發包
來看下發包過程中會不會遇到阻礙,假設沒有enqueue的狀況下,一個數據包跑到了sch_direct_xmit里面,會判斷txq是否為stop或者frozen狀態,如果是 就不發了
直接requeue了.

if (likely(skb)) { HARD_TX_LOCK(dev, txq, smp_processor_id()); /*如果說txq被stop,即置位QUEUE_STATE_ANY_XOFF_OR_FROZEN,就直接ret = NETDEV_TX_BUSY *如果說txq 正常運行,那么直接調用dev_hard_start_xmit發送數據包*/ if (!netif_xmit_frozen_or_stopped(txq)) skb = dev_hard_start_xmit(skb, dev, txq, &ret);//調用驅動發送報文 HARD_TX_UNLOCK(dev, txq); } else { spin_lock(root_lock); return qdisc_qlen(q); } spin_lock(root_lock); /*進行返回值處理! 如果ret < NET_XMIT_MASK 為true 否則 flase*/ if (dev_xmit_complete(ret)) { /* Driver sent out skb successfully or skb was consumed */ /*這個地方需要注意可能有driver的負數的case,也意味着這個skb被drop了*/ ret = qdisc_qlen(q);//成功發送報文,如果緩存區中還有報文,則嘗試繼續發送報文 } else { /* Driver returned NETDEV_TX_BUSY - requeue skb */ if (unlikely(ret != NETDEV_TX_BUSY)) net_warn_ratelimited("BUG %s code %d qlen %d\n", dev->name, ret, q->q.qlen); /*發生Tx Busy的時候,重新進行requeue*/ ret = dev_requeue_skb(skb, q);//發送失敗,則 入隊重新進行發送 }
既然requeue了,還可以dequeue再去發,想一下,及時能夠dequeue出來,還是要調用到sch_direct_xmit,還是被down的繼續requeue,這樣其實舅舅白白的操作了鏈表,實際上kernel也沒有那么傻了,在dequeue的時候就已經做了手腳,可以參考dequeue_skb的解釋了... 如果txq 被stop直接就dequeue失敗了..

static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate, int *packets) { struct sk_buff *skb = q->gso_skb; const struct netdev_queue *txq = q->dev_queue; *packets = 1; *validate = true; if (unlikely(skb)) { /* check the reason of requeuing without tx lock first */ txq = skb_get_tx_queue(txq->dev, skb); if (!netif_xmit_frozen_or_stopped(txq)) { q->gso_skb = NULL; qdisc_qstats_backlog_dec(q, skb); q->q.qlen--; } else//如果TXQ被stop了,那么無法dequeue出skb 就返回空 skb = NULL; /* skb in gso_skb were already validated */ *validate = false; } else { //如果是多Q或者txq沒有被stop 就dequeue skb if (!(q->flags & TCQ_F_ONETXQUEUE) || !netif_xmit_frozen_or_stopped(txq)) { skb = q->dequeue(q); if (skb && qdisc_may_bulk(q)) try_bulk_dequeue_skb(q, skb, txq, packets); } } return skb; }
看到不能上網,但上層確實有數據包發出來,不過Q最多幫你頂到1000個包,如果超了就徹底被drop了。這時就會產生丟包現象,另外提一下就是在driver發送速度較慢,上層輸送數據較快的狀況下,這樣會不斷的有數據包被enqueue,當達到一定的瓶頸的時候,也就是1000的時候,后面灌輸過來的數據包就會被丟掉了,這樣就會產生掉包的現象