概念分析
XPS的全称为Transmit Packet Steering。其根据当前处理软中断的cpu选择网卡发包队列,适合于多队列网卡,如果当前CPU负责多个发送队列,则根据hash值决定报文的发送队列,如果发送队列配置了mqprio等排队规程,则还会根据报文的priority决定其所在的类,根据类进一步得到发送队列,最后使用发送队列的qdisc进行报文发送。XPS主要是为了避免cpu由RX队列的中断进入到TX队列的中断时发生切换,导致cpu cache-miss损失性能。
基本数据结构
#ifdef CONFIG_XPS
/*
* This structure holds an XPS map which can be of variable length. The
* map is an array of queues.
* 单个cpu对应的队列映射表
*/
struct xps_map {
unsigned int len;//队列个数
unsigned int alloc_len;//内存分配的个数alloc_len > len
struct rcu_head rcu;
u16 queues[0];//队列索引数组
};
#define XPS_MAP_SIZE(_num) (sizeof(struct xps_map) + ((_num) * sizeof(u16)))
#define XPS_MIN_MAP_ALLOC ((L1_CACHE_ALIGN(offsetof(struct xps_map, queues[1])) \
- sizeof(struct xps_map)) / sizeof(u16))
/*
* This structure holds all XPS maps for device. Maps are indexed by CPU.
*/
struct xps_dev_maps {
struct rcu_head rcu;
struct xps_map __rcu *cpu_map[0];//每个cpu的队列映射表
};
//映射表大小
#define XPS_DEV_MAPS_SIZE(_tcs) (sizeof(struct xps_dev_maps) + \
(nr_cpu_ids * (_tcs) * sizeof(struct xps_map *)))
#endif /* CONFIG_XPS */
代码实现
构建cpu与报文优先级到发送队列的映射关系
在我们发送报文的时候,我们能直到当前正在处理报文的cpu编号,以及通过skb->priority得到报文的优先级。XPS可以根据这两个参数得到报文的发送队列。如果发送网卡没有配置mqprio排队规程,那么skb->priority不会使用,直接使用cpu编号得到其发送队列。一个CPU可以映射多个发送队列,这个时候就会进行hash选择其中一个。要完成XPS功能,首先要对XPS配置进行处理。
netif_set_xps_queue
根据输入网卡dev的index队列,以及其映射到的CPU掩码参数mask,构建该队列的映射信息。
int netif_set_xps_queue(struct net_device *dev, const struct cpumask *mask,
u16 index)
{
struct xps_dev_maps *dev_maps, *new_dev_maps = NULL;
int i, cpu, tci, numa_node_id = -2;
int maps_sz, num_tc = 1, tc = 0;
struct xps_map *map, *new_map;
bool active = false;
if (dev->num_tc) {//配置了mqprio排队规程。这需要将类考虑进去,在发送报文时先将报文优先级映射到类,再在CPU的发送队列中,根据类选择发送队列
num_tc = dev->num_tc;
//计算发送队列对应的类,这个关系是在配置mqprio排队规程时构建的。
tc = netdev_txq_to_tc(dev, index);
if (tc < 0)
return -EINVAL;
}
//计算总的映射表大小,cpunum*num_tc个元素
maps_sz = XPS_DEV_MAPS_SIZE(num_tc);
if (maps_sz < L1_CACHE_BYTES)
maps_sz = L1_CACHE_BYTES;
mutex_lock(&xps_map_mutex);//互斥保护
dev_maps = xmap_dereference(dev->xps_maps);
/* allocate memory for queue storage */
for_each_cpu_and(cpu, cpu_online_mask, mask) {//遍历该队列对应的每一CPU掩码
if (!new_dev_maps)
new_dev_maps = kzalloc(maps_sz, GFP_KERNEL);
if (!new_dev_maps) {
mutex_unlock(&xps_map_mutex);
return -ENOMEM;
}
//计算该发送队列对应的类的所在cpu的映射索引
tci = cpu * num_tc + tc;
map = dev_maps ? xmap_dereference(dev_maps->cpu_map[tci]) :
NULL;
//将该发送队列索引添加到该cpu的映射表中
map = expand_xps_map(map, cpu, index);
if (!map)
goto error;
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
if (!new_dev_maps)
goto out_no_new_maps;
//遍历每一个cpu
for_each_possible_cpu(cpu) {
/* copy maps belonging to foreign traffic classes */
/* */
for (i = tc, tci = cpu * num_tc; dev_maps && i--; tci++) {
/* fill in the new device map from the old device map */
map = xmap_dereference(dev_maps->cpu_map[tci]);
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
/* We need to explicitly update tci as prevous loop
* could break out early if dev_maps is NULL.
*/
tci = cpu * num_tc + tc;
if (cpumask_test_cpu(cpu, mask) && cpu_online(cpu)) {
/* add queue to CPU maps */
int pos = 0;
map = xmap_dereference(new_dev_maps->cpu_map[tci]);
while ((pos < map->len) && (map->queues[pos] != index))
pos++;
if (pos == map->len)
map->queues[map->len++] = index;
#ifdef CONFIG_NUMA
if (numa_node_id == -2)
numa_node_id = cpu_to_node(cpu);
else if (numa_node_id != cpu_to_node(cpu))
numa_node_id = -1;
#endif
} else if (dev_maps) {
/* fill in the new device map from the old device map */
map = xmap_dereference(dev_maps->cpu_map[tci]);
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
/* copy maps belonging to foreign traffic classes */
for (i = num_tc - tc, tci++; dev_maps && --i; tci++) {
/* fill in the new device map from the old device map */
map = xmap_dereference(dev_maps->cpu_map[tci]);
RCU_INIT_POINTER(new_dev_maps->cpu_map[tci], map);
}
}
//替换掉原来的映射表
rcu_assign_pointer(dev->xps_maps, new_dev_maps);
/* Cleanup old maps */
if (!dev_maps)
goto out_no_old_maps;
//进行内存清理,将不再使用的内存释放
for_each_possible_cpu(cpu) {
for (i = num_tc, tci = cpu * num_tc; i--; tci++) {
new_map = xmap_dereference(new_dev_maps->cpu_map[tci]);
map = xmap_dereference(dev_maps->cpu_map[tci]);
if (map && map != new_map)
kfree_rcu(map, rcu);
}
}
kfree_rcu(dev_maps, rcu);
out_no_old_maps:
dev_maps = new_dev_maps;
active = true;
out_no_new_maps:
/* update Tx queue numa node */
netdev_queue_numa_node_write(netdev_get_tx_queue(dev, index),
(numa_node_id >= 0) ? numa_node_id :
NUMA_NO_NODE);
if (!dev_maps)
goto out_no_maps;
/* removes queue from unused CPUs */
for_each_possible_cpu(cpu) {
for (i = tc, tci = cpu * num_tc; i--; tci++)
active |= remove_xps_queue(dev_maps, tci, index);
if (!cpumask_test_cpu(cpu, mask) || !cpu_online(cpu))
active |= remove_xps_queue(dev_maps, tci, index);
for (i = num_tc - tc, tci++; --i; tci++)
active |= remove_xps_queue(dev_maps, tci, index);
}
/* free map if not active */
if (!active) {
RCU_INIT_POINTER(dev->xps_maps, NULL);
kfree_rcu(dev_maps, rcu);
}
out_no_maps:
mutex_unlock(&xps_map_mutex);
return 0;
error:
/* remove any maps that we added */
for_each_possible_cpu(cpu) {
for (i = num_tc, tci = cpu * num_tc; i--; tci++) {
new_map = xmap_dereference(new_dev_maps->cpu_map[tci]);
map = dev_maps ?
xmap_dereference(dev_maps->cpu_map[tci]) :
NULL;
if (new_map && new_map != map)
kfree(new_map);
}
}
mutex_unlock(&xps_map_mutex);
kfree(new_dev_maps);
return -ENOMEM;
}
发送队列选择
dev_queue_xmit
int dev_queue_xmit(struct sk_buff *skb)
{
return __dev_queue_xmit(skb, NULL);
}
__dev_queue_xmit
static int __dev_queue_xmit(struct sk_buff *skb, void *accel_priv)
{
struct net_device *dev = skb->dev;
struct netdev_queue *txq;
struct Qdisc *q;
int rc = -ENOMEM;
bool again = false;
/* 重新计算链路层头部长度 */
skb_reset_mac_header(skb);
if (unlikely(skb_shinfo(skb)->tx_flags & SKBTX_SCHED_TSTAMP))
__skb_tstamp_tx(skb, NULL, skb->sk, SCM_TSTAMP_SCHED);
/* Disable soft irqs for various locks below. Also
* stops preemption for RCU.
*/
rcu_read_lock_bh();
skb_update_prio(skb);
qdisc_pkt_len_init(skb);
#ifdef CONFIG_NET_CLS_ACT
skb->tc_at_ingress = 0;
# ifdef CONFIG_NET_EGRESS
if (static_key_false(&egress_needed)) {
skb = sch_handle_egress(skb, &rc, dev);
if (!skb)
goto out;
}
# endif
#endif
/* If device/qdisc don't need skb->dst, release it right now while
* its hot in this cpu cache.
*/
if (dev->priv_flags & IFF_XMIT_DST_RELEASE)
skb_dst_drop(skb);
else
skb_dst_force(skb);
//选择发送队列,accel_priv从上面可以看出为NULL
txq = netdev_pick_tx(dev, skb, accel_priv);
q = rcu_dereference_bh(txq->qdisc);//获取发送队列排队规程
trace_net_dev_queue(skb);
if (q->enqueue) {//有入队函数,则使用qdisc进行报文调度
rc = __dev_xmit_skb(skb, q, dev, txq);
goto out;
}
......
return rc;
}
netdev_pick_tx
struct netdev_queue *netdev_pick_tx(struct net_device *dev,
struct sk_buff *skb,
void *accel_priv)
{
int queue_index = 0;//对于单队列网卡直接选择第0个队列即可
#ifdef CONFIG_XPS
u32 sender_cpu = skb->sender_cpu - 1;//获取已经设置的发送cpu编号-1.实际映射的时候从0开始
if (sender_cpu >= (u32)NR_CPUS)//无效cpu编号,重新选择,选择当前正在处理该报文的cpu
//从这里看一看出,发送的时候并不一定要使用当前cpu,如果调用者
//想使用别的cpu队列进行发送,可以在调用之前设置skb->sender_cpu
skb->sender_cpu = raw_smp_processor_id() + 1;
#endif
if (dev->real_num_tx_queues != 1) {//如果该设备支持多队列的话,那么进行发送队列的选择
const struct net_device_ops *ops = dev->netdev_ops;
if (ops->ndo_select_queue)//如果网络驱动实现了队列选择函数,则使用驱动提供的队列选择函数
queue_index = ops->ndo_select_queue(dev, skb, accel_priv,
__netdev_pick_tx);//__netdev_pick_tx在这里是回调函数,当驱动函数不能选择出发送队列时,使用该函数
else
queue_index = __netdev_pick_tx(dev, skb);//采用__netdev_pick_tx进行发送队列选择
//检查
queue_index = netdev_cap_txqueue(dev, queue_index);
}
//设置报文的发送队列
skb_set_queue_mapping(skb, queue_index);
//根据发送队列索引选择发送队列
return netdev_get_tx_queue(dev, queue_index);
}
__netdev_pick_tx
//网卡选择发送队列
static u16 __netdev_pick_tx(struct net_device *dev, struct sk_buff *skb)
{
struct sock *sk = skb->sk;
int queue_index = sk_tx_queue_get(sk);//报文来自套接口,选择上次使用的发送队列索引
if (queue_index < 0 || skb->ooo_okay ||//没有设置索引,或者索引失效,或者非法,则重新进行发送队列选择
queue_index >= dev->real_num_tx_queues) {
int new_index = get_xps_queue(dev, skb);
if (new_index < 0)
new_index = skb_tx_hash(dev, skb);//xps没有选中发送队列,那么使用hash算法进行选择。
if (queue_index != new_index && sk &&//如果报文关联了套接口,那么设置套接口的发送队列索引,后续报文将不需要再进行选择
sk_fullsock(sk) &&
rcu_access_pointer(sk->sk_dst_cache))
sk_tx_queue_set(sk, new_index);//sk->sk_tx_queue_mapping = tx_queue;
queue_index = new_index;
}
return queue_index;
}
get_xps_queue
这个函数的实现逻辑在上图中展示的很详细。根据发送网卡设备dev,以及报文skb得到最终的发送队列索引。
//XPS:根据当前处理软中断的cpu选择网卡发包队列,适合于多队列网卡,
//主要是根据当前所在CPU选择发送队列。如果cpu映射了多个发送队列(最好的方式是一个队列映射一个cpu)
//则进行hash选取
static inline int get_xps_queue(struct net_device *dev, struct sk_buff *skb)
{
#ifdef CONFIG_XPS
struct xps_dev_maps *dev_maps;//设备的cpu映射表
struct xps_map *map;
int queue_index = -1;
rcu_read_lock();
dev_maps = rcu_dereference(dev->xps_maps);//获取映射表
if (dev_maps) {
unsigned int tci = skb->sender_cpu - 1;//cpu编号从0开始
if (dev->num_tc) {//选择qdisc,该设备配置了mqprio排队规程
tci *= dev->num_tc;//每一个cpu需要乘以类数
//根据报文优先级得到tc索引
tci += netdev_get_prio_tc_map(dev, skb->priority);
}
//得到该cpu该类的映射结构
map = rcu_dereference(dev_maps->cpu_map[tci]);
if (map) {
if (map->len == 1)//只有一个队列
queue_index = map->queues[0];
else//多个队列,则进行hash选取
queue_index = map->queues[reciprocal_scale(skb_get_hash(skb),
map->len)];
if (unlikely(queue_index >= dev->real_num_tx_queues))
queue_index = -1;
}
}
rcu_read_unlock();
return queue_index;
#else
return -1;
#endif
}
如果xps没有选出发送队列,则使用skb_tx_hash进行选择
/*
* Returns a Tx hash for the given packet when dev->real_num_tx_queues is used
* as a distribution range limit for the returned value.
*/
static inline u16 skb_tx_hash(const struct net_device *dev,
struct sk_buff *skb)
{
return __skb_tx_hash(dev, skb, dev->real_num_tx_queues);
}
/*
* Returns a Tx hash based on the given packet descriptor a Tx queues' number
* to be used as a distribution range.
*/
u16 __skb_tx_hash(const struct net_device *dev, struct sk_buff *skb,
unsigned int num_tx_queues)
{
u32 hash;
u16 qoffset = 0;
u16 qcount = num_tx_queues;
if (skb_rx_queue_recorded(skb)) {
hash = skb_get_rx_queue(skb);
while (unlikely(hash >= num_tx_queues))
hash -= num_tx_queues;
return hash;
}
if (dev->num_tc) {//设备配置了多个类
//根据报文优先级得到它的类索引
u8 tc = netdev_get_prio_tc_map(dev, skb->priority);
//获取该类对应的发送队列
qoffset = dev->tc_to_txq[tc].offset;
qcount = dev->tc_to_txq[tc].count;
}
//得到物理队列索引
return (u16) reciprocal_scale(skb_get_hash(skb), qcount) + qoffset;
}