背景说明
RPS,即Receive Package Steering,其原理是单纯地以软件方式实现接收的报文在cpu之间平均分配,即利用报文的hash值找到匹配的cpu,然后将报文送至该cpu对应的backlog队列中进行下一步的处理。适合于单队列网卡或者虚拟网卡,把该网卡上的数据流让多个cpu处理,在中断处理程序中根据CPU_MAP决定将报文放入哪个CPU队列中,然后触发NAPI软中断。
配置
RPS是利用报文的hash值找到对应的cpu,然后将报文送至该cpu的backlog队列来实现报文在多个cpu之间的负载均衡的。所以首先需要知道哪些cpu核会参与报文的分发处理。Linux是通过配置文件的方式指定哪些cpu核参与到报文的分发处理。RPS要求内核编译了CONFIG_RPS选项(SMP上默认是打开的)。尽管编译到内核,直到被配置了才能启用。对于某个接收队列,RPS可以转发流量到哪个CPU,是由/sys/class/net//queues/rx-/rps_cpus来控制的。这个文件实现了CPU的位图。默认,当值是0,RPS是无效的,数据包是由中断的CPU来处理的。Documentation/IRQ-affinity.txt 解释了CPU是怎么由位图来设置的。
核心数据结构
/* This structure contains an instance of an RX queue. */
/* 网卡接收队列 */
struct netdev_rx_queue {
#ifdef CONFIG_RPS
struct rps_map __rcu *rps_map;//RPS cpu映射表
struct rps_dev_flow_table __rcu *rps_flow_table;//RFS流表
#endif
struct kobject kobj;
struct net_device *dev;
struct xdp_rxq_info xdp_rxq;
} ____cacheline_aligned_in_smp;
/*
* This structure holds an RPS map which can be of variable length. The
* map is an array of CPUs.
* RPS的映射图,即该网络设备可以被分发的CPU映射表。是一个可变长度的数组。
* 动态分配,其长度为len,即分配的cpu的个数
*/
struct rps_map {
unsigned int len;//cpus数组的长度
struct rcu_head rcu;
u16 cpus[0];//cpu数组
};
//根据CPU的个数分配的映射表内存大小
#define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + ((_num) * sizeof(u16)))
流程图
如上图所示,网卡有四个接收多列,分别绑定到cpu0,2,4,6上,报文输入处理流程如下:
0.报文进入网卡后,硬件hash选择RX QUEUE。
1.RX QUEUE0硬件中断CPU0。
2.CPU0在软中断中处理该报文时,进行RPS选择目标CPU进行处理,通过查询RXQUEUE0的rps_maps选择的目标CPU为CPU1。
3.将报文送到CP1的backlog虚拟NAPI报文输入队列中。
4.向CPU1发送IPI。
代码分析
解析CPU映射配置
//进行RPS的CPU映射表解析,即对文件/sys/class/net/eth0/queues/rx-0/rps_cpus
//中的内容进行解析。
static ssize_t store_rps_map(struct netdev_rx_queue *queue,//接受队列
const char *buf, size_t len)
{
struct rps_map *old_map, *map;
cpumask_var_t mask;
int err, cpu, i;
static DEFINE_MUTEX(rps_map_mutex);//映射表互斥锁
if (!capable(CAP_NET_ADMIN))//管理员权限
return -EPERM;
if (!alloc_cpumask_var(&mask, GFP_KERNEL))
return -ENOMEM;
//解析buf中的信息到mask中
err = bitmap_parse(buf, len, cpumask_bits(mask), nr_cpumask_bits);
if (err) {
free_cpumask_var(mask);
return err;
}
//分配映射表
map = kzalloc(max_t(unsigned int,
RPS_MAP_SIZE(cpumask_weight(mask)), L1_CACHE_BYTES),
GFP_KERNEL);
if (!map) {
free_cpumask_var(mask);
return -ENOMEM;
}
//解析每一个cpu的bit
i = 0;
for_each_cpu_and(cpu, mask, cpu_online_mask)
map->cpus[i++] = cpu;//设置对应的cpu编号
if (i) {
map->len = i;//个数
} else {
kfree(map);
map = NULL;
}
mutex_lock(&rps_map_mutex);、
//进行rcu替换
old_map = rcu_dereference_protected(queue->rps_map,
mutex_is_locked(&rps_map_mutex));
rcu_assign_pointer(queue->rps_map, map);
//设置是否需要进行rps
if (map)//新配置则加1
static_key_slow_inc(&rps_needed);
if (old_map)//去掉老的配置减掉1
static_key_slow_dec(&rps_needed);
mutex_unlock(&rps_map_mutex);
if (old_map)
kfree_rcu(old_map, rcu);
free_cpumask_var(mask);
return len;
}
NAPI驱动收包进行报文RPS分发
int netif_receive_skb(struct sk_buff *skb)
{
trace_netif_receive_skb_entry(skb);
return netif_receive_skb_internal(skb);
}
static int netif_receive_skb_internal(struct sk_buff *skb)
{
int ret;
......
rcu_read_lock();
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
//获取目的CPU
int cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu >= 0) {
//将报文压入虚拟的napi设备收包队列
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
return ret;
}
}
#endif
//没有是能RPS的话,直接上送上层协议处理,进入软中断中。
ret = __netif_receive_skb(skb);
rcu_read_unlock();
return ret;
}
非NAPI驱动收包进行报文RPS分发
static int netif_rx_internal(struct sk_buff *skb)
{
int ret;
......
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
int cpu;
preempt_disable();
rcu_read_lock();
//获取处理的目的CPU
cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu < 0)
cpu = smp_processor_id();
//将报文压入虚拟的napi设备收包队列
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
preempt_enable();
} else
#endif
{
unsigned int qtail;
//没有RPS的话,直接压入本cpu的虚拟napi收包队列中
ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
put_cpu();
}
return ret;
}
获取目标处理CPU
在这里我们删除了除了RPS之外的代码,这样看起来就更加简单明了了。
/*
* get_rps_cpu is called from netif_receive_skb and returns the target
* CPU from the RPS map of the receiving queue for a given skb.
* rcu_read_lock must be held on entry.
*/
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow **rflowp)
{
const struct rps_sock_flow_table *sock_flow_table;
struct netdev_rx_queue *rxqueue = dev->_rx;
struct rps_dev_flow_table *flow_table;
struct rps_map *map;
......
map = rcu_dereference(rxqueue->rps_map);
if (!flow_table && !map)
goto done;
skb_reset_network_header(skb);
hash = skb_get_hash(skb);//计算报文的hash值
if (!hash)
goto done;
......
try_rps:
if (map) {
//根据hash值从该设备的map表中获取目标cpu
tcpu = map->cpus[reciprocal_scale(hash, map->len)];
if (cpu_online(tcpu)) {//cpu存在
cpu = tcpu;//则返回目标cpu
goto done;
}
}
done:
return cpu;
}
报文入队
/*
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
* queue (may be a remote CPU queue).
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;
//获取目标CPU的softnet_data结构
sd = &per_cpu(softnet_data, cpu);
//关闭中断
local_irq_save(flags);
rps_lock(sd);//锁住目标cpu的softnet_data
if (!netif_running(skb->dev))
goto drop;
//获取虚拟napi输入队列长度
qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
if (qlen) {//如果该输入队列已经有报文,说明已经触发了软中断,这里只需要入队即可。
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);//释放sd锁
local_irq_restore(flags);//恢复中断
return NET_RX_SUCCESS;
}
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
* 调度虚拟NAPI设备,即backlog设备。
* 如果该设备不处于调度状态,则设置其状态为调度状态
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))//将sd加入到本cpu的ipi队列中,后面会发送iqi中断。
____napi_schedule(sd, &sd->backlog);
//将backlog napi设备加入到目标cpu的sd的napi链表中。
}
goto enqueue;
}
drop:
sd->dropped++;
rps_unlock(sd);
local_irq_restore(flags);
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}
目标CPU处理报文
backlog虚拟NAPI设备的poll函数为process_backlog。
static int process_backlog(struct napi_struct *napi, int quota)
{
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
bool again = true;
int work = 0;
/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}
napi->weight = dev_rx_weight;
while (again) {
struct sk_buff *skb;
//处理报文,直到队列为空
while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
__netif_receive_skb(skb);
rcu_read_unlock();
input_queue_head_incr(sd);
if (++work >= quota)//超出额度
return work;
}
local_irq_disable();
rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {//backlog输入队列为空,设置其状态为0
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
again = false;
} else {
//将input_pkt_queue中的报文添加到process_queue队列中
//again为true,继续调度报文处理
skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
}
rps_unlock(sd);
local_irq_enable();
}
return work;
}