概念分析
XPS的全稱為Transmit Packet Steering。其根據當前處理軟中斷的cpu選擇網卡發包隊列,適合於多隊列網卡,如果當前CPU負責多個發送隊列,則根據hash值決定報文的發送隊列,如果發送隊列配置了mqprio等排隊規程,則還會根據報文的priority決定其所在的類,根據類進一步得到發送隊列,最后使用發送隊列的qdisc進行報文發送。XPS主要是為了避免cpu由RX隊列的中斷進入到TX隊列的中斷時發生切換,導致cpu cache-miss損失性能。
基本數據結構
#ifdef CONFIG_XPS
/*
* This structure holds an XPS map which can be of variable length. The
* map is an array of queues.
* 單個cpu對應的隊列映射表
*/
struct xps_map {
unsigned int len;//隊列個數
unsigned int alloc_len;//內存分配的個數alloc_len > len
struct rcu_head rcu;
u16 queues[0];//隊列索引數組
};
#define XPS_MAP_SIZE(_num) (sizeof(struct xps_map) + ((_num) * sizeof(u16)))
#define XPS_MIN_MAP_ALLOC ((L1_CACHE_ALIGN(offsetof(struct xps_map, queues[1])) \
- sizeof(struct xps_map)) / sizeof(u16))
/*
* This structure holds all XPS maps for device. Maps are indexed by CPU.
*/
struct xps_dev_maps {
struct rcu_head rcu;
struct xps_map __rcu *cpu_map[0];//每個cpu的隊列映射表
};
//映射表大小
#define XPS_DEV_MAPS_SIZE(_tcs) (sizeof(struct xps_dev_maps) + \
(nr_cpu_ids * (_tcs) * sizeof(struct xps_map *)))
#endif /* CONFIG_XPS */
代碼實現
構建cpu與報文優先級到發送隊列的映射關系
在我們發送報文的時候,我們能直到當前正在處理報文的cpu編號,以及通過skb->priority得到報文的優先級。XPS可以根據這兩個參數得到報文的發送隊列。如果發送網卡沒有配置mqprio排隊規程,那么skb->priority不會使用,直接使用cpu編號得到其發送隊列。一個CPU可以映射多個發送隊列,這個時候就會進行hash選擇其中一個。要完成XPS功能,首先要對XPS配置進行處理。
netif_set_xps_queue
根據輸入網卡dev的index隊列,以及其映射到的CPU掩碼參數mask,構建該隊列的映射信息。
int netif_set_xps_queue(struct net_device *dev, const struct cpumask *mask,
u16 index)
{
struct xps_dev_maps *dev_maps, *new_dev_maps = NULL;
int i, cpu, tci, numa_node_id = -2;
int maps_sz, num_tc = 1, tc = 0;
struct xps_map *map, *new_map;
bool active = false;
if (dev->num_tc) {//配置了mqprio排隊規程。這需要將類考慮進去,在發送報文時先將報文優先級映射到類,再在CPU的發送隊列中,根據類選擇發送隊列
num_tc = dev->num_tc;
//計算發送隊列對應的類,這個關系是在配置mqprio排隊規程時構建的。
tc = netdev_txq_to_tc(dev, index);
if (tc < 0)
return -EINVAL;
}
//計算總的映射表大小,cpunum*num_tc個元素
maps_sz = XPS_DEV_MAPS_SIZE(num_tc);
if (maps_sz < L1_CACHE_BYTES)
maps_sz = L1_CACHE_BYTES;
mutex_lock(&xps_map_mutex);//互斥保護
dev_maps = xmap_dereference(dev->xps_maps);
/* allocate memory for queue storage */
for_each_cpu_and(cpu, cpu_online_mask, mask) {//遍歷該隊列對應的每一CPU掩碼
if (!new_dev_maps)
new_dev_maps = kzalloc(maps_sz, GFP_KERNEL);
if (!new_dev_maps) {
mutex_unlock(&xps_map_mutex);
return -ENOMEM;
}
//計算該發送隊列對應的類的所在cpu的映射索引
tci = cpu * num_tc + tc;
map = dev_maps ? xmap_dereference(dev_maps->cpu_map[tci]) :
NULL;
//將該發送隊列索引添加到該cpu的映射表中
map = expand_xps_map(map, cpu, index);
if (!map)
goto error;
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
if (!new_dev_maps)
goto out_no_new_maps;
//遍歷每一個cpu
for_each_possible_cpu(cpu) {
/* copy maps belonging to foreign traffic classes */
/* */
for (i = tc, tci = cpu * num_tc; dev_maps && i--; tci++) {
/* fill in the new device map from the old device map */
map = xmap_dereference(dev_maps->cpu_map[tci]);
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
/* We need to explicitly update tci as prevous loop
* could break out early if dev_maps is NULL.
*/
tci = cpu * num_tc + tc;
if (cpumask_test_cpu(cpu, mask) && cpu_online(cpu)) {
/* add queue to CPU maps */
int pos = 0;
map = xmap_dereference(new_dev_maps->cpu_map[tci]);
while ((pos < map->len) && (map->queues[pos] != index))
pos++;
if (pos == map->len)
map->queues[map->len++] = index;
#ifdef CONFIG_NUMA
if (numa_node_id == -2)
numa_node_id = cpu_to_node(cpu);
else if (numa_node_id != cpu_to_node(cpu))
numa_node_id = -1;
#endif
} else if (dev_maps) {
/* fill in the new device map from the old device map */
map = xmap_dereference(dev_maps->cpu_map[tci]);
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
/* copy maps belonging to foreign traffic classes */
for (i = num_tc - tc, tci++; dev_maps && --i; tci++) {
/* fill in the new device map from the old device map */
map = xmap_dereference(dev_maps->cpu_map[tci]);
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
}
//替換掉原來的映射表
rcu_assign_pointer(dev->xps_maps, new_dev_maps);
/* Cleanup old maps */
if (!dev_maps)
goto out_no_old_maps;
//進行內存清理,將不再使用的內存釋放
for_each_possible_cpu(cpu) {
for (i = num_tc, tci = cpu * num_tc; i--; tci++) {
new_map = xmap_dereference(new_dev_maps->cpu_map[tci]);
map = xmap_dereference(dev_maps->cpu_map[tci]);
if (map && map != new_map)
kfree_rcu(map, rcu);
}
}
kfree_rcu(dev_maps, rcu);
out_no_old_maps:
dev_maps = new_dev_maps;
active = true;
out_no_new_maps:
/* update Tx queue numa node */
netdev_queue_numa_node_write(netdev_get_tx_queue(dev, index),
(numa_node_id >= 0) ? numa_node_id :
NUMA_NO_NODE);
if (!dev_maps)
goto out_no_maps;
/* removes queue from unused CPUs */
for_each_possible_cpu(cpu) {
for (i = tc, tci = cpu * num_tc; i--; tci++)
active |= remove_xps_queue(dev_maps, tci, index);
if (!cpumask_test_cpu(cpu, mask) || !cpu_online(cpu))
active |= remove_xps_queue(dev_maps, tci, index);
for (i = num_tc - tc, tci++; --i; tci++)
active |= remove_xps_queue(dev_maps, tci, index);
}
/* free map if not active */
if (!active) {
RCU_INIT_POINTER(dev->xps_maps, NULL);
kfree_rcu(dev_maps, rcu);
}
out_no_maps:
mutex_unlock(&xps_map_mutex);
return 0;
error:
/* remove any maps that we added */
for_each_possible_cpu(cpu) {
for (i = num_tc, tci = cpu * num_tc; i--; tci++) {
new_map = xmap_dereference(new_dev_maps->cpu_map[tci]);
map = dev_maps ?
xmap_dereference(dev_maps->cpu_map[tci]) :
NULL;
if (new_map && new_map != map)
kfree(new_map);
}
}
mutex_unlock(&xps_map_mutex);
kfree(new_dev_maps);
return -ENOMEM;
}
發送隊列選擇
dev_queue_xmit
int dev_queue_xmit(struct sk_buff *skb)
{
return __dev_queue_xmit(skb, NULL);
}
__dev_queue_xmit
static int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
{
struct net_device *dev = skb->dev;
struct netdev_queue *txq;
struct Qdisc *q;
int rc = -ENOMEM;
bool again = false;
/* 重新計算鏈路層頭部長度 */
skb_reset_mac_header(skb);
if (unlikely(skb_shinfo(skb)->tx_flags & SKBTX_SCHED_TSTAMP))
__skb_tstamp_tx(skb, NULL, skb->sk, SCM_TSTAMP_SCHED);
/* Disable soft irqs for various locks below. Also
* stops preemption for RCU.
*/
rcu_read_lock_bh();
skb_update_prio(skb);
qdisc_pkt_len_init(skb);
#ifdef CONFIG_NET_CLS_ACT
skb->tc_at_ingress = 0;
# ifdef CONFIG_NET_EGRESS
if (static_key_false(&egress_needed)) {
skb = sch_handle_egress(skb, &rc, dev);
if (!skb)
goto out;
}
# endif
#endif
/* If device/qdisc don't need skb->dst, release it right now while
* its hot in this cpu cache.
*/
if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
skb_dst_drop(skb);
else
skb_dst_force(skb);
//選擇發送隊列,accel_priv從上面可以看出為NULL
txq = netdev_pick_tx(dev, skb, accel_priv);
q = rcu_dereference_bh(txq->qdisc);//獲取發送隊列排隊規程
trace_net_dev_queue(skb);
if (q->enqueue) {//有入隊函數,則使用qdisc進行報文調度
rc = __dev_xmit_skb(skb, q, dev, txq);
goto out;
}
......
return rc;
}
netdev_pick_tx
struct netdev_queue *netdev_pick_tx(struct net_device *dev,
struct sk_buff *skb,
void *accel_priv)
{
int queue_index = 0;//對於單隊列網卡直接選擇第0個隊列即可
#ifdef CONFIG_XPS
u32 sender_cpu = skb->sender_cpu - 1;//獲取已經設置的發送cpu編號-1.實際映射的時候從0開始
if (sender_cpu >= (u32)NR_CPUS)//無效cpu編號,重新選擇,選擇當前正在處理該報文的cpu
//從這里看一看出,發送的時候並不一定要使用當前cpu,如果調用者
//想使用別的cpu隊列進行發送,可以在調用之前設置skb->sender_cpu
skb->sender_cpu = raw_smp_processor_id() + 1;
#endif
if (dev->real_num_tx_queues != 1) {//如果該設備支持多隊列的話,那么進行發送隊列的選擇
const struct net_device_ops *ops = dev->netdev_ops;
if (ops->ndo_select_queue)//如果網絡驅動實現了隊列選擇函數,則使用驅動提供的隊列選擇函數
queue_index = ops->ndo_select_queue(dev, skb, accel_priv,
__netdev_pick_tx);//__netdev_pick_tx在這里是回調函數,當驅動函數不能選擇出發送隊列時,使用該函數
else
queue_index = __netdev_pick_tx(dev, skb);//采用__netdev_pick_tx進行發送隊列選擇
//檢查
queue_index = netdev_cap_txqueue(dev, queue_index);
}
//設置報文的發送隊列
skb_set_queue_mapping(skb, queue_index);
//根據發送隊列索引選擇發送隊列
return netdev_get_tx_queue(dev, queue_index);
}
__netdev_pick_tx
//網卡選擇發送隊列
static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
{
struct sock *sk = skb->sk;
int queue_index = sk_tx_queue_get(sk);//報文來自套接口,選擇上次使用的發送隊列索引
if (queue_index < 0 || skb->ooo_okay ||//沒有設置索引,或者索引失效,或者非法,則重新進行發送隊列選擇
queue_index >= dev->real_num_tx_queues) {
int new_index = get_xps_queue(dev, skb);
if (new_index < 0)
new_index = skb_tx_hash(dev, skb);//xps沒有選中發送隊列,那么使用hash算法進行選擇。
if (queue_index != new_index && sk &&//如果報文關聯了套接口,那么設置套接口的發送隊列索引,后續報文將不需要再進行選擇
sk_fullsock(sk) &&
rcu_access_pointer(sk->sk_dst_cache))
sk_tx_queue_set(sk, new_index);//sk->sk_tx_queue_mapping = tx_queue;
queue_index = new_index;
}
return queue_index;
}
get_xps_queue
這個函數的實現邏輯在上圖中展示的很詳細。根據發送網卡設備dev,以及報文skb得到最終的發送隊列索引。
//XPS:根據當前處理軟中斷的cpu選擇網卡發包隊列,適合於多隊列網卡,
//主要是根據當前所在CPU選擇發送隊列。如果cpu映射了多個發送隊列(最好的方式是一個隊列映射一個cpu)
//則進行hash選取
static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb)
{
#ifdef CONFIG_XPS
struct xps_dev_maps *dev_maps;//設備的cpu映射表
struct xps_map *map;
int queue_index = -1;
rcu_read_lock();
dev_maps = rcu_dereference(dev->xps_maps);//獲取映射表
if (dev_maps) {
unsigned int tci = skb->sender_cpu - 1;//cpu編號從0開始
if (dev->num_tc) {//選擇qdisc,該設備配置了mqprio排隊規程
tci *= dev->num_tc;//每一個cpu需要乘以類數
//根據報文優先級得到tc索引
tci += netdev_get_prio_tc_map(dev, skb->priority);
}
//得到該cpu該類的映射結構
map = rcu_dereference(dev_maps->cpu_map[tci]);
if (map) {
if (map->len == 1)//只有一個隊列
queue_index = map->queues[0];
else//多個隊列,則進行hash選取
queue_index = map->queues[reciprocal_scale(skb_get_hash(skb),
map->len)];
if (unlikely(queue_index >= dev->real_num_tx_queues))
queue_index = -1;
}
}
rcu_read_unlock();
return queue_index;
#else
return -1;
#endif
}
如果xps沒有選出發送隊列,則使用skb_tx_hash進行選擇
/*
* Returns a Tx hash for the given packet when dev->real_num_tx_queues is used
* as a distribution range limit for the returned value.
*/
static inline u16 skb_tx_hash(const struct net_device *dev,
struct sk_buff *skb)
{
return __skb_tx_hash(dev, skb, dev->real_num_tx_queues);
}
/*
* Returns a Tx hash based on the given packet descriptor a Tx queues' number
* to be used as a distribution range.
*/
u16 __skb_tx_hash(const struct net_device *dev, struct sk_buff *skb,
unsigned int num_tx_queues)
{
u32 hash;
u16 qoffset = 0;
u16 qcount = num_tx_queues;
if (skb_rx_queue_recorded(skb)) {
hash = skb_get_rx_queue(skb);
while (unlikely(hash >= num_tx_queues))
hash -= num_tx_queues;
return hash;
}
if (dev->num_tc) {//設備配置了多個類
//根據報文優先級得到它的類索引
u8 tc = netdev_get_prio_tc_map(dev, skb->priority);
//獲取該類對應的發送隊列
qoffset = dev->tc_to_txq[tc].offset;
qcount = dev->tc_to_txq[tc].count;
}
//得到物理隊列索引
return (u16) reciprocal_scale(skb_get_hash(skb), qcount) + qoffset;
}