LINUX RPS分析


背景说明

RPS,即Receive Package Steering,其原理是单纯地以软件方式实现接收的报文在cpu之间平均分配,即利用报文的hash值找到匹配的cpu,然后将报文送至该cpu对应的backlog队列中进行下一步的处理。适合于单队列网卡或者虚拟网卡,把该网卡上的数据流让多个cpu处理,在中断处理程序中根据CPU_MAP决定将报文放入哪个CPU队列中,然后触发NAPI软中断。

配置

RPS是利用报文的hash值找到对应的cpu,然后将报文送至该cpu的backlog队列来实现报文在多个cpu之间的负载均衡的。所以首先需要知道哪些cpu核会参与报文的分发处理。Linux是通过配置文件的方式指定哪些cpu核参与到报文的分发处理。RPS要求内核编译了CONFIG_RPS选项(SMP上默认是打开的)。尽管编译到内核,直到被配置了才能启用。对于某个接收队列,RPS可以转发流量到哪个CPU,是由/sys/class/net//queues/rx-/rps_cpus来控制的。这个文件实现了CPU的位图。默认,当值是0,RPS是无效的,数据包是由中断的CPU来处理的。Documentation/IRQ-affinity.txt 解释了CPU是怎么由位图来设置的。

核心数据结构

/* This structure contains an instance of an RX queue. */
/* 网卡接收队列 */
struct netdev_rx_queue {
#ifdef CONFIG_RPS
	struct rps_map __rcu		*rps_map;//RPS cpu映射表
	struct rps_dev_flow_table __rcu	*rps_flow_table;//RFS流表
#endif
	struct kobject			kobj;
	struct net_device		*dev;
	struct xdp_rxq_info		xdp_rxq;
} ____cacheline_aligned_in_smp;

/*
 * This structure holds an RPS map which can be of variable length.  The
 * map is an array of CPUs.
 * RPS的映射图,即该网络设备可以被分发的CPU映射表。是一个可变长度的数组。
 * 动态分配,其长度为len,即分配的cpu的个数
 */
struct rps_map {
	unsigned int len;//cpus数组的长度
	struct rcu_head rcu;
	u16 cpus[0];//cpu数组
};
//根据CPU的个数分配的映射表内存大小
#define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + ((_num) * sizeof(u16)))

流程图

如上图所示,网卡有四个接收多列,分别绑定到cpu0,2,4,6上,报文输入处理流程如下:

0.报文进入网卡后,硬件hash选择RX QUEUE。

1.RX QUEUE0硬件中断CPU0。

2.CPU0在软中断中处理该报文时,进行RPS选择目标CPU进行处理,通过查询RXQUEUE0的rps_maps选择的目标CPU为CPU1。

3.将报文送到CP1的backlog虚拟NAPI报文输入队列中。

4.向CPU1发送IPI。

代码分析

解析CPU映射配置

//进行RPS的CPU映射表解析,即对文件/sys/class/net/eth0/queues/rx-0/rps_cpus
//中的内容进行解析。
static ssize_t store_rps_map(struct netdev_rx_queue *queue,//接受队列
			                       const char *buf, size_t len)
{
	struct rps_map *old_map, *map;
	cpumask_var_t mask;
	int err, cpu, i;
	static DEFINE_MUTEX(rps_map_mutex);//映射表互斥锁

	if (!capable(CAP_NET_ADMIN))//管理员权限
		return -EPERM;

	if (!alloc_cpumask_var(&mask, GFP_KERNEL))
		return -ENOMEM;
    //解析buf中的信息到mask中
	err = bitmap_parse(buf, len, cpumask_bits(mask), nr_cpumask_bits);
	if (err) {
		free_cpumask_var(mask);
		return err;
	}
    //分配映射表
	map = kzalloc(max_t(unsigned int,
			    RPS_MAP_SIZE(cpumask_weight(mask)), L1_CACHE_BYTES),
		      GFP_KERNEL);
	if (!map) {
		free_cpumask_var(mask);
		return -ENOMEM;
	}
    //解析每一个cpu的bit
	i = 0;
	for_each_cpu_and(cpu, mask, cpu_online_mask)
		map->cpus[i++] = cpu;//设置对应的cpu编号

	if (i) {
		map->len = i;//个数
	} else {
		kfree(map);
		map = NULL;
	}

	mutex_lock(&rps_map_mutex);、
	//进行rcu替换
	old_map = rcu_dereference_protected(queue->rps_map,
					    mutex_is_locked(&rps_map_mutex));
	rcu_assign_pointer(queue->rps_map, map);

	//设置是否需要进行rps
	if (map)//新配置则加1
		static_key_slow_inc(&rps_needed);
	if (old_map)//去掉老的配置减掉1
		static_key_slow_dec(&rps_needed);

	mutex_unlock(&rps_map_mutex);

	if (old_map)
		kfree_rcu(old_map, rcu);

	free_cpumask_var(mask);
	return len;
}

NAPI驱动收包进行报文RPS分发

int netif_receive_skb(struct sk_buff *skb)
{
	trace_netif_receive_skb_entry(skb);

	return netif_receive_skb_internal(skb);
}

static int netif_receive_skb_internal(struct sk_buff *skb)
{
	int ret;
    ......
        
	rcu_read_lock();
#ifdef CONFIG_RPS
	if (static_key_false(&rps_needed)) {
		struct rps_dev_flow voidflow, *rflow = &voidflow;
        //获取目的CPU
		int cpu = get_rps_cpu(skb->dev, skb, &rflow);

		if (cpu >= 0) {
            //将报文压入虚拟的napi设备收包队列
			ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
			rcu_read_unlock();
			return ret;
		}
	}
#endif
    //没有是能RPS的话,直接上送上层协议处理,进入软中断中。
	ret = __netif_receive_skb(skb);
	rcu_read_unlock();
	return ret;
}

非NAPI驱动收包进行报文RPS分发

static int netif_rx_internal(struct sk_buff *skb)
{
	int ret;
    ......

#ifdef CONFIG_RPS
	if (static_key_false(&rps_needed)) {
		struct rps_dev_flow voidflow, *rflow = &voidflow;
		int cpu;

		preempt_disable();
		rcu_read_lock();
        //获取处理的目的CPU
		cpu = get_rps_cpu(skb->dev, skb, &rflow);
		if (cpu < 0)
			cpu = smp_processor_id();
        //将报文压入虚拟的napi设备收包队列
		ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);

		rcu_read_unlock();
		preempt_enable();
	} else
#endif
	{
		unsigned int qtail;
        //没有RPS的话,直接压入本cpu的虚拟napi收包队列中
		ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
		put_cpu();
	}
	return ret;
}

获取目标处理CPU

在这里我们删除了除了RPS之外的代码,这样看起来就更加简单明了了。

/*
 * get_rps_cpu is called from netif_receive_skb and returns the target
 * CPU from the RPS map of the receiving queue for a given skb.
 * rcu_read_lock must be held on entry.
 */
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
		       struct rps_dev_flow **rflowp)
{
    const struct rps_sock_flow_table *sock_flow_table;
	struct netdev_rx_queue *rxqueue = dev->_rx;
	struct rps_dev_flow_table *flow_table;
	struct rps_map *map;
    
	......

	map = rcu_dereference(rxqueue->rps_map);
	if (!flow_table && !map)
		goto done;

	skb_reset_network_header(skb);
	hash = skb_get_hash(skb);//计算报文的hash值
	if (!hash)
		goto done;

    ......

try_rps:

	if (map) {
        //根据hash值从该设备的map表中获取目标cpu
		tcpu = map->cpus[reciprocal_scale(hash, map->len)];
		if (cpu_online(tcpu)) {//cpu存在
			cpu = tcpu;//则返回目标cpu
			goto done;
		}
	}

done:
	return cpu;
}

报文入队

/*
 * enqueue_to_backlog is called to queue an skb to a per CPU backlog
 * queue (may be a remote CPU queue).
 */
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
			      unsigned int *qtail)
{
	struct softnet_data *sd;
	unsigned long flags;
	unsigned int qlen;
    //获取目标CPU的softnet_data结构
	sd = &per_cpu(softnet_data, cpu);
    //关闭中断
	local_irq_save(flags);

	rps_lock(sd);//锁住目标cpu的softnet_data
	if (!netif_running(skb->dev))
		goto drop;
    //获取虚拟napi输入队列长度
	qlen = skb_queue_len(&sd->input_pkt_queue);
	if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
		if (qlen) {//如果该输入队列已经有报文,说明已经触发了软中断,这里只需要入队即可。
enqueue:
			__skb_queue_tail(&sd->input_pkt_queue, skb);
			input_queue_tail_incr_save(sd, qtail);
			rps_unlock(sd);//释放sd锁
			local_irq_restore(flags);//恢复中断
			return NET_RX_SUCCESS;
		}

		/* Schedule NAPI for backlog device
		 * We can use non atomic operation since we own the queue lock
		 * 调度虚拟NAPI设备,即backlog设备。
		 * 如果该设备不处于调度状态,则设置其状态为调度状态
		 */
		if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
			if (!rps_ipi_queued(sd))//将sd加入到本cpu的ipi队列中,后面会发送iqi中断。
				____napi_schedule(sd, &sd->backlog);
            //将backlog napi设备加入到目标cpu的sd的napi链表中。
		}
		goto enqueue;
	}

drop:
	sd->dropped++;
	rps_unlock(sd);

	local_irq_restore(flags);

	atomic_long_inc(&skb->dev->rx_dropped);
	kfree_skb(skb);
	return NET_RX_DROP;
}

目标CPU处理报文

backlog虚拟NAPI设备的poll函数为process_backlog

static int process_backlog(struct napi_struct *napi, int quota)
{
	struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
	bool again = true;
	int work = 0;

	/* Check if we have pending ipi, its better to send them now,
	 * not waiting net_rx_action() end.
	 */
	if (sd_has_rps_ipi_waiting(sd)) {
		local_irq_disable();
		net_rps_action_and_irq_enable(sd);
	}

	napi->weight = dev_rx_weight;
	while (again) {
		struct sk_buff *skb;
        //处理报文,直到队列为空
		while ((skb = __skb_dequeue(&sd->process_queue))) {
			rcu_read_lock();
			__netif_receive_skb(skb);
			rcu_read_unlock();
			input_queue_head_incr(sd);
			if (++work >= quota)//超出额度
				return work;

		}

		local_irq_disable();
		rps_lock(sd);
		if (skb_queue_empty(&sd->input_pkt_queue)) {//backlog输入队列为空,设置其状态为0
			/*
			 * Inline a custom version of __napi_complete().
			 * only current cpu owns and manipulates this napi,
			 * and NAPI_STATE_SCHED is the only possible flag set
			 * on backlog.
			 * We can use a plain write instead of clear_bit(),
			 * and we dont need an smp_mb() memory barrier.
			 */
			napi->state = 0;
			again = false;
		} else {
			//将input_pkt_queue中的报文添加到process_queue队列中
			//again为true,继续调度报文处理
			skb_queue_splice_tail_init(&sd->input_pkt_queue,
						   &sd->process_queue);
		}
		rps_unlock(sd);
		local_irq_enable();
	}

	return work;
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM