背景說明
RPS,即Receive Package Steering,其原理是單純地以軟件方式實現接收的報文在cpu之間平均分配,即利用報文的hash值找到匹配的cpu,然后將報文送至該cpu對應的backlog隊列中進行下一步的處理。適合於單隊列網卡或者虛擬網卡,把該網卡上的數據流讓多個cpu處理,在中斷處理程序中根據CPU_MAP決定將報文放入哪個CPU隊列中,然后觸發NAPI軟中斷。
配置
RPS是利用報文的hash值找到對應的cpu,然后將報文送至該cpu的backlog隊列來實現報文在多個cpu之間的負載均衡的。所以首先需要知道哪些cpu核會參與報文的分發處理。Linux是通過配置文件的方式指定哪些cpu核參與到報文的分發處理。RPS要求內核編譯了CONFIG_RPS選項(SMP上默認是打開的)。盡管編譯到內核,直到被配置了才能啟用。對於某個接收隊列,RPS可以轉發流量到哪個CPU,是由/sys/class/net//queues/rx-/rps_cpus來控制的。這個文件實現了CPU的位圖。默認,當值是0,RPS是無效的,數據包是由中斷的CPU來處理的。Documentation/IRQ-affinity.txt 解釋了CPU是怎么由位圖來設置的。
核心數據結構
/* This structure contains an instance of an RX queue. */
/* 網卡接收隊列 */
struct netdev_rx_queue {
#ifdef CONFIG_RPS
struct rps_map __rcu *rps_map;//RPS cpu映射表
struct rps_dev_flow_table __rcu *rps_flow_table;//RFS流表
#endif
struct kobject kobj;
struct net_device *dev;
struct xdp_rxq_info xdp_rxq;
} ____cacheline_aligned_in_smp;
/*
* This structure holds an RPS map which can be of variable length. The
* map is an array of CPUs.
* RPS的映射圖,即該網絡設備可以被分發的CPU映射表。是一個可變長度的數組。
* 動態分配,其長度為len,即分配的cpu的個數
*/
struct rps_map {
unsigned int len;//cpus數組的長度
struct rcu_head rcu;
u16 cpus[0];//cpu數組
};
//根據CPU的個數分配的映射表內存大小
#define RPS_MAP_SIZE(_num) (sizeof(struct rps_map) + ((_num) * sizeof(u16)))
流程圖
如上圖所示,網卡有四個接收多列,分別綁定到cpu0,2,4,6上,報文輸入處理流程如下:
0.報文進入網卡后,硬件hash選擇RX QUEUE。
1.RX QUEUE0硬件中斷CPU0。
2.CPU0在軟中斷中處理該報文時,進行RPS選擇目標CPU進行處理,通過查詢RXQUEUE0的rps_maps選擇的目標CPU為CPU1。
3.將報文送到CP1的backlog虛擬NAPI報文輸入隊列中。
4.向CPU1發送IPI。
代碼分析
解析CPU映射配置
//進行RPS的CPU映射表解析,即對文件/sys/class/net/eth0/queues/rx-0/rps_cpus
//中的內容進行解析。
static ssize_t store_rps_map(struct netdev_rx_queue *queue,//接受隊列
const char *buf, size_t len)
{
struct rps_map *old_map, *map;
cpumask_var_t mask;
int err, cpu, i;
static DEFINE_MUTEX(rps_map_mutex);//映射表互斥鎖
if (!capable(CAP_NET_ADMIN))//管理員權限
return -EPERM;
if (!alloc_cpumask_var(&mask, GFP_KERNEL))
return -ENOMEM;
//解析buf中的信息到mask中
err = bitmap_parse(buf, len, cpumask_bits(mask), nr_cpumask_bits);
if (err) {
free_cpumask_var(mask);
return err;
}
//分配映射表
map = kzalloc(max_t(unsigned int,
RPS_MAP_SIZE(cpumask_weight(mask)), L1_CACHE_BYTES),
GFP_KERNEL);
if (!map) {
free_cpumask_var(mask);
return -ENOMEM;
}
//解析每一個cpu的bit
i = 0;
for_each_cpu_and(cpu, mask, cpu_online_mask)
map->cpus[i++] = cpu;//設置對應的cpu編號
if (i) {
map->len = i;//個數
} else {
kfree(map);
map = NULL;
}
mutex_lock(&rps_map_mutex);、
//進行rcu替換
old_map = rcu_dereference_protected(queue->rps_map,
mutex_is_locked(&rps_map_mutex));
rcu_assign_pointer(queue->rps_map, map);
//設置是否需要進行rps
if (map)//新配置則加1
static_key_slow_inc(&rps_needed);
if (old_map)//去掉老的配置減掉1
static_key_slow_dec(&rps_needed);
mutex_unlock(&rps_map_mutex);
if (old_map)
kfree_rcu(old_map, rcu);
free_cpumask_var(mask);
return len;
}
NAPI驅動收包進行報文RPS分發
int netif_receive_skb(struct sk_buff *skb)
{
trace_netif_receive_skb_entry(skb);
return netif_receive_skb_internal(skb);
}
static int netif_receive_skb_internal(struct sk_buff *skb)
{
int ret;
......
rcu_read_lock();
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
//獲取目的CPU
int cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu >= 0) {
//將報文壓入虛擬的napi設備收包隊列
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
return ret;
}
}
#endif
//沒有是能RPS的話,直接上送上層協議處理,進入軟中斷中。
ret = __netif_receive_skb(skb);
rcu_read_unlock();
return ret;
}
非NAPI驅動收包進行報文RPS分發
static int netif_rx_internal(struct sk_buff *skb)
{
int ret;
......
#ifdef CONFIG_RPS
if (static_key_false(&rps_needed)) {
struct rps_dev_flow voidflow, *rflow = &voidflow;
int cpu;
preempt_disable();
rcu_read_lock();
//獲取處理的目的CPU
cpu = get_rps_cpu(skb->dev, skb, &rflow);
if (cpu < 0)
cpu = smp_processor_id();
//將報文壓入虛擬的napi設備收包隊列
ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
rcu_read_unlock();
preempt_enable();
} else
#endif
{
unsigned int qtail;
//沒有RPS的話,直接壓入本cpu的虛擬napi收包隊列中
ret = enqueue_to_backlog(skb, get_cpu(), &qtail);
put_cpu();
}
return ret;
}
獲取目標處理CPU
在這里我們刪除了除了RPS之外的代碼,這樣看起來就更加簡單明了了。
/*
* get_rps_cpu is called from netif_receive_skb and returns the target
* CPU from the RPS map of the receiving queue for a given skb.
* rcu_read_lock must be held on entry.
*/
static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,
struct rps_dev_flow **rflowp)
{
const struct rps_sock_flow_table *sock_flow_table;
struct netdev_rx_queue *rxqueue = dev->_rx;
struct rps_dev_flow_table *flow_table;
struct rps_map *map;
......
map = rcu_dereference(rxqueue->rps_map);
if (!flow_table && !map)
goto done;
skb_reset_network_header(skb);
hash = skb_get_hash(skb);//計算報文的hash值
if (!hash)
goto done;
......
try_rps:
if (map) {
//根據hash值從該設備的map表中獲取目標cpu
tcpu = map->cpus[reciprocal_scale(hash, map->len)];
if (cpu_online(tcpu)) {//cpu存在
cpu = tcpu;//則返回目標cpu
goto done;
}
}
done:
return cpu;
}
報文入隊
/*
* enqueue_to_backlog is called to queue an skb to a per CPU backlog
* queue (may be a remote CPU queue).
*/
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
unsigned int *qtail)
{
struct softnet_data *sd;
unsigned long flags;
unsigned int qlen;
//獲取目標CPU的softnet_data結構
sd = &per_cpu(softnet_data, cpu);
//關閉中斷
local_irq_save(flags);
rps_lock(sd);//鎖住目標cpu的softnet_data
if (!netif_running(skb->dev))
goto drop;
//獲取虛擬napi輸入隊列長度
qlen = skb_queue_len(&sd->input_pkt_queue);
if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
if (qlen) {//如果該輸入隊列已經有報文,說明已經觸發了軟中斷,這里只需要入隊即可。
enqueue:
__skb_queue_tail(&sd->input_pkt_queue, skb);
input_queue_tail_incr_save(sd, qtail);
rps_unlock(sd);//釋放sd鎖
local_irq_restore(flags);//恢復中斷
return NET_RX_SUCCESS;
}
/* Schedule NAPI for backlog device
* We can use non atomic operation since we own the queue lock
* 調度虛擬NAPI設備,即backlog設備。
* 如果該設備不處於調度狀態,則設置其狀態為調度狀態
*/
if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
if (!rps_ipi_queued(sd))//將sd加入到本cpu的ipi隊列中,后面會發送iqi中斷。
____napi_schedule(sd, &sd->backlog);
//將backlog napi設備加入到目標cpu的sd的napi鏈表中。
}
goto enqueue;
}
drop:
sd->dropped++;
rps_unlock(sd);
local_irq_restore(flags);
atomic_long_inc(&skb->dev->rx_dropped);
kfree_skb(skb);
return NET_RX_DROP;
}
目標CPU處理報文
backlog虛擬NAPI設備的poll函數為process_backlog。
static int process_backlog(struct napi_struct *napi, int quota)
{
struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
bool again = true;
int work = 0;
/* Check if we have pending ipi, its better to send them now,
* not waiting net_rx_action() end.
*/
if (sd_has_rps_ipi_waiting(sd)) {
local_irq_disable();
net_rps_action_and_irq_enable(sd);
}
napi->weight = dev_rx_weight;
while (again) {
struct sk_buff *skb;
//處理報文,直到隊列為空
while ((skb = __skb_dequeue(&sd->process_queue))) {
rcu_read_lock();
__netif_receive_skb(skb);
rcu_read_unlock();
input_queue_head_incr(sd);
if (++work >= quota)//超出額度
return work;
}
local_irq_disable();
rps_lock(sd);
if (skb_queue_empty(&sd->input_pkt_queue)) {//backlog輸入隊列為空,設置其狀態為0
/*
* Inline a custom version of __napi_complete().
* only current cpu owns and manipulates this napi,
* and NAPI_STATE_SCHED is the only possible flag set
* on backlog.
* We can use a plain write instead of clear_bit(),
* and we dont need an smp_mb() memory barrier.
*/
napi->state = 0;
again = false;
} else {
//將input_pkt_queue中的報文添加到process_queue隊列中
//again為true,繼續調度報文處理
skb_queue_splice_tail_init(&sd->input_pkt_queue,
&sd->process_queue);
}
rps_unlock(sd);
local_irq_enable();
}
return work;
}