LINUX RPS分析


背景說明

RPS,即Receive Package Steering,其原理是單純地以軟件方式實現接收的報文在cpu之間平均分配,即利用報文的hash值找到匹配的cpu,然后將報文送至該cpu對應的backlog隊列中進行下一步的處理。適合於單隊列網卡或者虛擬網卡,把該網卡上的數據流讓多個cpu處理,在中斷處理程序中根據CPU_MAP決定將報文放入哪個CPU隊列中,然后觸發NAPI軟中斷。

配置

RPS是利用報文的hash值找到對應的cpu,然后將報文送至該cpu的backlog隊列來實現報文在多個cpu之間的負載均衡的。所以首先需要知道哪些cpu核會參與報文的分發處理。Linux是通過配置文件的方式指定哪些cpu核參與到報文的分發處理。RPS要求內核編譯了CONFIG_RPS選項(SMP上默認是打開的)。盡管編譯到內核,直到被配置了才能啟用。對於某個接收隊列,RPS可以轉發流量到哪個CPU,是由/sys/class/net//queues/rx-/rps_cpus來控制的。這個文件實現了CPU的位圖。默認,當值是0,RPS是無效的,數據包是由中斷的CPU來處理的。Documentation/IRQ-affinity.txt 解釋了CPU是怎么由位圖來設置的。

核心數據結構

/* This structure contains an instance of an RX queue. */
/* 網卡接收隊列 */
struct netdev_rx_queue {
#ifdef CONFIG_RPS
	struct rps_map __rcu		*rps_map;//RPS cpu映射表
	struct rps_dev_flow_table __rcu	*rps_flow_table;//RFS流表
#endif
	struct kobject			kobj;
	struct net_device		*dev;
	struct xdp_rxq_info		xdp_rxq;
} ____cacheline_aligned_in_smp;

/*
 * This structure holds an RPS map which can be of variable length.  The
 * map is an array of CPUs.
 * RPS的映射圖,即該網絡設備可以被分發的CPU映射表。是一個可變長度的數組。
 * 動態分配,其長度為len,即分配的cpu的個數
 */
struct rps_map {
	unsigned int len;//cpus數組的長度
	struct rcu_head rcu;
	u16 cpus[0];//cpu數組
};
//根據CPU的個數分配的映射表內存大小
#define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + ((_num) * sizeof(u16)))

流程圖

如上圖所示,網卡有四個接收多列,分別綁定到cpu0,2,4,6上,報文輸入處理流程如下:

0.報文進入網卡后,硬件hash選擇RX QUEUE。

1.RX QUEUE0硬件中斷CPU0。

2.CPU0在軟中斷中處理該報文時,進行RPS選擇目標CPU進行處理,通過查詢RXQUEUE0的rps_maps選擇的目標CPU為CPU1。

3.將報文送到CP1的backlog虛擬NAPI報文輸入隊列中。

4.向CPU1發送IPI。

代碼分析

解析CPU映射配置

//進行RPS的CPU映射表解析,即對文件/sys/class/net/eth0/queues/rx-0/rps_cpus
//中的內容進行解析。
static ssize_t store_rps_map(struct netdev_rx_queue *queue,//接受隊列
			                       const char *buf, size_t len)
{
	struct rps_map *old_map, *map;
	cpumask_var_t mask;
	int err, cpu, i;
	static DEFINE_MUTEX(rps_map_mutex);//映射表互斥鎖

	if (!capable(CAP_NET_ADMIN))//管理員權限
		return -EPERM;

	if (!alloc_cpumask_var(&mask, GFP_KERNEL))
		return -ENOMEM;
    //解析buf中的信息到mask中
	err = bitmap_parse(buf, len, cpumask_bits(mask), nr_cpumask_bits);
	if (err) {
		free_cpumask_var(mask);
		return err;
	}
    //分配映射表
	map = kzalloc(max_t(unsigned int,
			    RPS_MAP_SIZE(cpumask_weight(mask)), L1_CACHE_BYTES),
		      GFP_KERNEL);
	if (!map) {
		free_cpumask_var(mask);
		return -ENOMEM;
	}
    //解析每一個cpu的bit
	i = 0;
	for_each_cpu_and(cpu, mask, cpu_online_mask)
		map->cpus[i++] = cpu;//設置對應的cpu編號

	if (i) {
		map->len = i;//個數
	} else {
		kfree(map);
		map = NULL;
	}

	mutex_lock(&rps_map_mutex);、
	//進行rcu替換
	old_map = rcu_dereference_protected(queue->rps_map,
					    mutex_is_locked(&rps_map_mutex));
	rcu_assign_pointer(queue->rps_map, map);

	//設置是否需要進行rps
	if (map)//新配置則加1
		static_key_slow_inc(&rps_needed);
	if (old_map)//去掉老的配置減掉1
		static_key_slow_dec(&rps_needed);

	mutex_unlock(&rps_map_mutex);

	if (old_map)
		kfree_rcu(old_map, rcu);

	free_cpumask_var(mask);
	return len;
}

NAPI驅動收包進行報文RPS分發

int netif_receive_skb(struct sk_buff *skb)
{
	trace_netif_receive_skb_entry(skb);

	return netif_receive_skb_internal(skb);
}

static int netif_receive_skb_internal(struct sk_buff *skb)
{
	int ret;
    ......
        
	rcu_read_lock();
#ifdef CONFIG_RPS
	if (static_key_false(&rps_needed)) {
		struct rps_dev_flow voidflow, *rflow = &voidflow;
        //獲取目的CPU
		int cpu = get_rps_cpu(skb->dev, skb, &rflow);

		if (cpu >= 0) {
            //將報文壓入虛擬的napi設備收包隊列
			ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
			rcu_read_unlock();
			return ret;
		}
	}
#endif
    //沒有是能RPS的話,直接上送上層協議處理,進入軟中斷中。
	ret = __netif_receive_skb(skb);
	rcu_read_unlock();
	return ret;
}

非NAPI驅動收包進行報文RPS分發

static int netif_rx_internal(struct sk_buff *skb)
{
	int ret;
    ......

#ifdef CONFIG_RPS
	if (static_key_false(&rps_needed)) {
		struct rps_dev_flow voidflow, *rflow = &voidflow;
		int cpu;

		preempt_disable();
		rcu_read_lock();
        //獲取處理的目的CPU
		cpu = get_rps_cpu(skb->dev, skb, &rflow);
		if (cpu < 0)
			cpu = smp_processor_id();
        //將報文壓入虛擬的napi設備收包隊列
		ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);

		rcu_read_unlock();
		preempt_enable();
	} else
#endif
	{
		unsigned int qtail;
        //沒有RPS的話,直接壓入本cpu的虛擬napi收包隊列中
		ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
		put_cpu();
	}
	return ret;
}

獲取目標處理CPU

在這里我們刪除了除了RPS之外的代碼,這樣看起來就更加簡單明了了。

/*
 * get_rps_cpu is called from netif_receive_skb and returns the target
 * CPU from the RPS map of the receiving queue for a given skb.
 * rcu_read_lock must be held on entry.
 */
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
		       struct rps_dev_flow **rflowp)
{
    const struct rps_sock_flow_table *sock_flow_table;
	struct netdev_rx_queue *rxqueue = dev->_rx;
	struct rps_dev_flow_table *flow_table;
	struct rps_map *map;
    
	......

	map = rcu_dereference(rxqueue->rps_map);
	if (!flow_table && !map)
		goto done;

	skb_reset_network_header(skb);
	hash = skb_get_hash(skb);//計算報文的hash值
	if (!hash)
		goto done;

    ......

try_rps:

	if (map) {
        //根據hash值從該設備的map表中獲取目標cpu
		tcpu = map->cpus[reciprocal_scale(hash, map->len)];
		if (cpu_online(tcpu)) {//cpu存在
			cpu = tcpu;//則返回目標cpu
			goto done;
		}
	}

done:
	return cpu;
}

報文入隊

/*
 * enqueue_to_backlog is called to queue an skb to a per CPU backlog
 * queue (may be a remote CPU queue).
 */
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
			      unsigned int *qtail)
{
	struct softnet_data *sd;
	unsigned long flags;
	unsigned int qlen;
    //獲取目標CPU的softnet_data結構
	sd = &per_cpu(softnet_data, cpu);
    //關閉中斷
	local_irq_save(flags);

	rps_lock(sd);//鎖住目標cpu的softnet_data
	if (!netif_running(skb->dev))
		goto drop;
    //獲取虛擬napi輸入隊列長度
	qlen = skb_queue_len(&sd->input_pkt_queue);
	if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
		if (qlen) {//如果該輸入隊列已經有報文,說明已經觸發了軟中斷,這里只需要入隊即可。
enqueue:
			__skb_queue_tail(&sd->input_pkt_queue, skb);
			input_queue_tail_incr_save(sd, qtail);
			rps_unlock(sd);//釋放sd鎖
			local_irq_restore(flags);//恢復中斷
			return NET_RX_SUCCESS;
		}

		/* Schedule NAPI for backlog device
		 * We can use non atomic operation since we own the queue lock
		 * 調度虛擬NAPI設備,即backlog設備。
		 * 如果該設備不處於調度狀態,則設置其狀態為調度狀態
		 */
		if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
			if (!rps_ipi_queued(sd))//將sd加入到本cpu的ipi隊列中,后面會發送iqi中斷。
				____napi_schedule(sd, &sd->backlog);
            //將backlog napi設備加入到目標cpu的sd的napi鏈表中。
		}
		goto enqueue;
	}

drop:
	sd->dropped++;
	rps_unlock(sd);

	local_irq_restore(flags);

	atomic_long_inc(&skb->dev->rx_dropped);
	kfree_skb(skb);
	return NET_RX_DROP;
}

目標CPU處理報文

backlog虛擬NAPI設備的poll函數為process_backlog

static int process_backlog(struct napi_struct *napi, int quota)
{
	struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
	bool again = true;
	int work = 0;

	/* Check if we have pending ipi, its better to send them now,
	 * not waiting net_rx_action() end.
	 */
	if (sd_has_rps_ipi_waiting(sd)) {
		local_irq_disable();
		net_rps_action_and_irq_enable(sd);
	}

	napi->weight = dev_rx_weight;
	while (again) {
		struct sk_buff *skb;
        //處理報文,直到隊列為空
		while ((skb = __skb_dequeue(&sd->process_queue))) {
			rcu_read_lock();
			__netif_receive_skb(skb);
			rcu_read_unlock();
			input_queue_head_incr(sd);
			if (++work >= quota)//超出額度
				return work;

		}

		local_irq_disable();
		rps_lock(sd);
		if (skb_queue_empty(&sd->input_pkt_queue)) {//backlog輸入隊列為空,設置其狀態為0
			/*
			 * Inline a custom version of __napi_complete().
			 * only current cpu owns and manipulates this napi,
			 * and NAPI_STATE_SCHED is the only possible flag set
			 * on backlog.
			 * We can use a plain write instead of clear_bit(),
			 * and we dont need an smp_mb() memory barrier.
			 */
			napi->state = 0;
			again = false;
		} else {
			//將input_pkt_queue中的報文添加到process_queue隊列中
			//again為true,繼續調度報文處理
			skb_queue_splice_tail_init(&sd->input_pkt_queue,
						   &sd->process_queue);
		}
		rps_unlock(sd);
		local_irq_enable();
	}

	return work;
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM