输入节点作为报文处理的入口,VPP支持多种类型的报文输入节点。每一种使用相同驱动类型的网卡有一个报文输入节点,比如dpdk类型的网卡,tap,af_packet类型的网卡等等。一种驱动类型一个输入节点,这主要是决定了收包的函数。输入节点使用宏进行注册,在main函数启动前注册到链表中。每一个工作线程都有相同的输入节点。线程的输入节点具体处理的队列动态分配,采用轮叫原则。
节点类型
typedef enum
{
/* An internal node on the call graph (could be output). */
VLIB_NODE_TYPE_INTERNAL,
/* Nodes which input data into the processing graph.
Input nodes are called for each iteration of main loop.
输入节点,报文流转入口 */
VLIB_NODE_TYPE_INPUT,
/* Nodes to be called before all input nodes.
Used, for example, to clean out driver TX rings before
processing input. */
VLIB_NODE_TYPE_PRE_INPUT,
/* "Process" nodes which can be suspended and later resumed. */
VLIB_NODE_TYPE_PROCESS,
VLIB_N_NODE_TYPE,
} vlib_node_type_t;
节点注册描述结构,用于注册一个节点
typedef struct _vlib_node_fn_registration
{
vlib_node_function_t *function;
int priority;
struct _vlib_node_fn_registration *next_registration;
char *name;
} vlib_node_fn_registration_t;
注册节点描述结构,用于表示一个注册节点
typedef struct _vlib_node_registration
{
/* Vector processing function for this node. */
vlib_node_function_t *function;
/* Node function candidate registration with priority */
vlib_node_fn_registration_t *node_fn_registrations;
/* Node name. */
char *name;
/* Name of sibling (if applicable). */
char *sibling_of;
/* Node index filled in by registration. */
u32 index;
/* Type of this node. */
vlib_node_type_t type;
/* Error strings indexed by error code for this node. */
char **error_strings;
/* Buffer format/unformat for this node. */
format_function_t *format_buffer;
unformat_function_t *unformat_buffer;
/* Trace format/unformat for this node. */
format_function_t *format_trace;
unformat_function_t *unformat_trace;
/* Function to validate incoming frames. */
u8 *(*validate_frame) (struct vlib_main_t * vm,
struct vlib_node_runtime_t *,
struct vlib_frame_t * f);
/* Per-node runtime data. */
void *runtime_data;
/* Process stack size. 协程栈大小 */
u16 process_log2_n_stack_bytes;
/* Number of bytes of per-node run time data. */
u8 runtime_data_bytes;
/* State for input nodes. */
u8 state;
/* Node flags. */
u16 flags;
/* protocol at b->data[b->current_data] upon entry to the dispatch fn */
u8 protocol_hint;
/* Size of scalar and vector arguments in bytes. */
u16 scalar_size, vector_size;
/* Number of error codes used by this node. */
u16 n_errors;
/* Number of next node names that follow. */
u16 n_next_nodes;
/* Constructor link-list, don't ask... */
struct _vlib_node_registration *next_registration;
/* Names of next nodes which this node feeds into. */
char *next_nodes[];
} vlib_node_registration_t;
节点注册相关的宏
#ifndef CLIB_MARCH_VARIANT
#define VLIB_REGISTER_NODE(x,...) \
__VA_ARGS__ vlib_node_registration_t x; \ //声明一个需要注册的节点
static void __vlib_add_node_registration_##x (void) \ //声明一个静态的添加一个节点的函数,有constructor属性,在main函数之前执行
__attribute__((__constructor__)) ; \
static void __vlib_add_node_registration_##x (void) \
{ \ //定义添加节点函数,即将节点x链接到vm->node_main.node_registrations链表中
vlib_main_t * vm = vlib_get_main(); \
x.next_registration = vm->node_main.node_registrations; \
vm->node_main.node_registrations = &x; \
} \
static void __vlib_rm_node_registration_##x (void) \ //从链表中移除节点
__attribute__((__destructor__)) ; \
static void __vlib_rm_node_registration_##x (void) \
{ \
vlib_main_t * vm = vlib_get_main(); \
VLIB_REMOVE_FROM_LINKED_LIST (vm->node_main.node_registrations, \
&x, next_registration); \
} \
__VA_ARGS__ vlib_node_registration_t x // 定义一个需要注册的节点,这里没有分号,是因为使用这个宏的时候有分号,并且初始化该变量。
#else
#define VLIB_REGISTER_NODE(x,...) \
static __clib_unused vlib_node_registration_t __clib_unused_##x
#endif
VPP定义的输入节点样例
我们以DPDK类型的输入节点来进行分析。
/* *INDENT-OFF* */
VLIB_REGISTER_NODE (dpdk_input_node) = {
.type = VLIB_NODE_TYPE_INPUT,
.name = "dpdk-input",
.sibling_of = "device-input",
/* Will be enabled if/when hardware is detected. */
.state = VLIB_NODE_STATE_DISABLED,
.format_buffer = format_ethernet_header_with_length,
.format_trace = format_dpdk_rx_trace,
.n_errors = DPDK_N_ERROR,
.error_strings = dpdk_error_strings,
};
输入节点没有指明其报文接收函数,因为接收函数也是采用宏来进行注册的,是一个链表,也就是说接收报文函数可以有多个,使用优先级最高的那个。
输入节点处理函数
输入节点处理函数宏
#define VLIB_NODE_FN(node) \
uword CLIB_MARCH_SFX (node##_fn)(); \
static vlib_node_fn_registration_t \
CLIB_MARCH_SFX(node##_fn_registration) = \
{ .function = &CLIB_MARCH_SFX (node##_fn), }; \
\
static void __clib_constructor \
CLIB_MARCH_SFX (node##_multiarch_register) (void) \
{ \
extern vlib_node_registration_t node; \ //这里引用了一个node节点,其名字为宏的输入参数,说明在定义节点和其处理函数的时候要求它们有一样的名字。
vlib_node_fn_registration_t *r; \
r = & CLIB_MARCH_SFX (node##_fn_registration); \
r->priority = CLIB_MARCH_FN_PRIORITY(); \//处理函数优先级,根据优先级选择最高优先级的处理函数
r->name = CLIB_MARCH_VARIANT_STR; \
r->next_registration = node.node_fn_registrations; \//将函数添加到其对应的节点链表中,从这里可以看出一个节点可以有多个处理函数,在函数register_node中会选择一个优先级最高的函数作为节点的最终处理函数。
node.node_fn_registrations = r; \
} \
uword CLIB_CPU_OPTIMIZED CLIB_MARCH_SFX (node##_fn)
VLIB_NODE_FN (dpdk_input_node) (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * f)
{
dpdk_main_t *dm = &dpdk_main;
dpdk_device_t *xd;
uword n_rx_packets = 0;
/* 获取输入节点的运行信息,其中的devices_and_queues包含了该线程在该输入节点需要处理的队列信息,动态增加该类设备时,会在修改其中的信息 */
vnet_device_input_runtime_t *rt = (void *) node->runtime_data;
vnet_device_and_queue_t *dq;/* */
u32 thread_index = node->thread_index;
/*
* Poll all devices on this cpu for input/interrupts.
*/
/* *INDENT-OFF* 遍历该线程接管的每一个设备的每一个队列 */
foreach_device_and_queue (dq, rt->devices_and_queues)
{
xd = vec_elt_at_index(dm->devices, dq->dev_instance);
if (PREDICT_FALSE (xd->flags & DPDK_DEVICE_FLAG_BOND_SLAVE))
continue; /* Do not poll slave to a bonded interface */
n_rx_packets += dpdk_device_input (vm, dm, xd, node, thread_index,
dq->queue_id);
}
/* *INDENT-ON* */
return n_rx_packets;
}
构建多线程时,为每一个线程构建输入节点
该工作主要是由start_workers函数负责。
static clib_error_t *
start_workers (vlib_main_t * vm)
{
int i, j;
vlib_worker_thread_t *w;
vlib_main_t *vm_clone;
void *oldheap;
vlib_thread_main_t *tm = &vlib_thread_main;
vlib_thread_registration_t *tr;
vlib_node_runtime_t *rt;
u32 n_vlib_mains = tm->n_vlib_mains;
u32 worker_thread_index;
u8 *main_heap = clib_mem_get_per_cpu_heap ();
vec_reset_length (vlib_worker_threads);
/* Set up the main thread */
vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
w->elog_track.name = "main thread";
elog_track_register (&vm->elog_main, &w->elog_track);
/* 设置主线程的名字,前缀是“VPP” */
if (vec_len (tm->thread_prefix))
{
w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
vlib_set_thread_name ((char *) w->name);
}
vm->elog_main.lock =
clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
vm->elog_main.lock[0] = 0;
/* 存在其它线程,可能是hqos或者是worker */
if (n_vlib_mains > 1)
{
/* Replace hand-crafted length-1 vector with a real vector */
vlib_mains = 0;
vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
CLIB_CACHE_LINE_BYTES);
_vec_len (vlib_mains) = 0;
vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
vlib_worker_threads->wait_at_barrier =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
vlib_worker_threads->workers_at_barrier =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
vlib_worker_threads->node_reforks_required =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
/* We'll need the rpc vector lock... */
clib_spinlock_init (&vm->pending_rpc_lock);
/* Ask for an initial barrier sync */
*vlib_worker_threads->workers_at_barrier = 0;
*vlib_worker_threads->wait_at_barrier = 1;
/* Without update or refork */
*vlib_worker_threads->node_reforks_required = 0;
vm->need_vlib_worker_thread_node_runtime_update = 0;
/* init timing */
vm->barrier_epoch = 0;
vm->barrier_no_close_before = 0;
worker_thread_index = 1;
/* 遍历每一个功能注册节点,一般只有一个,即报文处理函数 */
for (i = 0; i < vec_len (tm->registrations); i++)
{
vlib_node_main_t *nm, *nm_clone;
int k;
tr = tm->registrations[i];
if (tr->count == 0)
continue;
for (k = 0; k < tr->count; k++)/* 该类型任务需要几个核 */
{
vlib_node_t *n;
/* 添加一个线程描述控制块 */
vec_add2 (vlib_worker_threads, w, 1);
......
/* Allocate all nodes in single block for speed */
/* 分配注册节点数组内存 */
n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
/* 拷贝注册节点数组,是一个指针数组 */
for (j = 0; j < vec_len (nm->nodes); j++)
{
clib_memcpy (n, nm->nodes[j], sizeof (*n));
/* none of the copied nodes have enqueue rights given out */
n->owner_node_index = VLIB_INVALID_NODE_INDEX;
clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
clib_memset (&n->stats_last_clear, 0,
sizeof (n->stats_last_clear));
vec_add1 (nm_clone->nodes, n);
n++;
}
/* 拷贝内部节点数组 */
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
CLIB_CACHE_LINE_BYTES);
/* 初始化每一个内部节点 */
vec_foreach (rt,
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
{
vlib_node_t *n = vlib_get_node (vm, rt->node_index);
/* 设置运行的线程索引 */
rt->thread_index = vm_clone->thread_index;
/* copy initial runtime_data from node */
/* 拷贝原始的节点数据 */
if (n->runtime_data && n->runtime_data_bytes > 0)
clib_memcpy (rt->runtime_data, n->runtime_data,
clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
n->runtime_data_bytes));
}
/* 拷贝输入节点数组 */
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
CLIB_CACHE_LINE_BYTES);
/* 初始化每一个输入节点 */
vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
{
vlib_node_t *n = vlib_get_node (vm, rt->node_index);
/* 设置运行的线程索引 */
rt->thread_index = vm_clone->thread_index;
/* copy initial runtime_data from node */
/* 拷贝原始的节点数据 */
if (n->runtime_data && n->runtime_data_bytes > 0)
clib_memcpy (rt->runtime_data, n->runtime_data,
clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
n->runtime_data_bytes));
}
/* 复制所有VLIB_NODE_TYPE_PRE_INPUT节点运行时数据 */
nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
CLIB_CACHE_LINE_BYTES);
vec_foreach (rt,
nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
{
vlib_node_t *n = vlib_get_node (vm, rt->node_index);
/* 设置正确的运行线程 */
rt->thread_index = vm_clone->thread_index;
/* copy initial runtime_data from node */
/* 拷贝原始的注册运行数据,这样就跟源节点就无关了 */
if (n->runtime_data && n->runtime_data_bytes > 0)
clib_memcpy (rt->runtime_data, n->runtime_data,
clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
n->runtime_data_bytes));
}
/* 拷贝所有的协程节点 */
nm_clone->processes = vec_dup_aligned (nm->processes,
CLIB_CACHE_LINE_BYTES);
/* Create per-thread frame freelist */
/* 创建一个帧分配控制块 */
nm_clone->frame_sizes = vec_new (vlib_frame_size_t, 1);
#ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
#endif
/* Packet trace buffers are guaranteed to be empty, nothing to do here */
clib_mem_set_heap (oldheap);
/* 加入全局数组vlib_mains */
vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
/* 构建错误统计数组 */
vm_clone->error_main.counters = vec_dup_aligned
(vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
vm_clone->error_main.counters_last_clear = vec_dup_aligned
(vlib_mains[0]->error_main.counters_last_clear,
CLIB_CACHE_LINE_BYTES);
worker_thread_index++;
}
}
}
else
{
/* only have non-data-structure copy threads to create... */
for (i = 0; i < vec_len (tm->registrations); i++)
{
tr = tm->registrations[i];
for (j = 0; j < tr->count; j++)
{
vec_add2 (vlib_worker_threads, w, 1);
if (tr->mheap_size)
{
#if USE_DLMALLOC == 0
w->thread_mheap =
mheap_alloc (0 /* use VM */ , tr->mheap_size);
#else
w->thread_mheap =
create_mspace (tr->mheap_size, 0 /* locked */ );
#endif
}
else
w->thread_mheap = main_heap;
w->thread_stack =
vlib_thread_stack_init (w - vlib_worker_threads);
w->thread_function = tr->function;
w->thread_function_arg = w;
w->instance_id = j;
w->elog_track.name =
(char *) format (0, "%s %d", tr->name, j + 1);
w->registration = tr;
vec_add1 (w->elog_track.name, 0);
elog_track_register (&vm->elog_main, &w->elog_track);
}
}
}
......
return 0;
}
分配每一个线程处理的队列个数,采用的是轮叫策略
//dpdk组件初始化
static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
u32 nports;
u32 mtu, max_rx_frame;
int i;
clib_error_t *error;
vlib_main_t *vm = vlib_get_main ();
vlib_thread_main_t *tm = vlib_get_thread_main ();
vnet_device_main_t *vdm = &vnet_device_main;
vnet_sw_interface_t *sw;
vnet_hw_interface_t *hi;
dpdk_device_t *xd;
vlib_pci_addr_t last_pci_addr;
u32 last_pci_addr_port = 0;
vlib_thread_registration_t *tr_hqos;
uword *p_hqos;
u32 next_hqos_cpu = 0;
u8 af_packet_instance_num = 0;
u8 bond_ether_instance_num = 0;
last_pci_addr.as_u32 = ~0;
dm->hqos_cpu_first_index = 0;
dm->hqos_cpu_count = 0;
/* find out which cpus will be used for I/O TX */
p_hqos = hash_get_mem (tm->thread_registrations_by_name, "hqos-threads");
tr_hqos = p_hqos ? (vlib_thread_registration_t *) p_hqos[0] : 0;
if (tr_hqos && tr_hqos->count > 0)
{
dm->hqos_cpu_first_index = tr_hqos->first_index;
dm->hqos_cpu_count = tr_hqos->count;
}
vec_validate_aligned (dm->devices_by_hqos_cpu, tm->n_vlib_mains - 1,
CLIB_CACHE_LINE_BYTES);
nports = rte_eth_dev_count_avail ();
if (nports < 1)
{
dpdk_log_notice ("DPDK drivers found no ports...");
}
if (CLIB_DEBUG > 0)
dpdk_log_notice ("DPDK drivers found %d ports...", nports);
if (dm->conf->enable_tcp_udp_checksum)
dm->buffer_flags_template &= ~(VNET_BUFFER_F_L4_CHECKSUM_CORRECT
| VNET_BUFFER_F_L4_CHECKSUM_COMPUTED);
/* vlib_buffer_t template */
vec_validate_aligned (dm->per_thread_data, tm->n_vlib_mains - 1,
CLIB_CACHE_LINE_BYTES);
for (i = 0; i < tm->n_vlib_mains; i++)
{
dpdk_per_thread_data_t *ptd = vec_elt_at_index (dm->per_thread_data, i);
clib_memset (&ptd->buffer_template, 0, sizeof (vlib_buffer_t));
ptd->buffer_template.flags = dm->buffer_flags_template;
vnet_buffer (&ptd->buffer_template)->sw_if_index[VLIB_TX] = (u32) ~ 0;
}
/* *INDENT-OFF* 遍历每一个dpdk设备 */
RTE_ETH_FOREACH_DEV(i)
{
......
if (devconf->workers)
{
int i;
q = 0;
clib_bitmap_foreach (i, devconf->workers, (//遍历每一个工作者线程
{
//分配该设备的队列q到工作线程i上,采用的是轮叫策略
vnet_hw_interface_assign_rx_thread (dm->vnet_main, xd->hw_if_index, q++,
vdm->first_worker_thread_index + i);
}));
}
else
//如果没有工作者线程的话,全部由主线程负责
for (q = 0; q < xd->rx_q_used; q++)
{
vnet_hw_interface_assign_rx_thread (dm->vnet_main, xd->hw_if_index, q, /* any */
~1);
}
......
}
/* *INDENT-ON* */
return 0;
}
vnet_hw_interface_assign_rx_thread
该函数在进行队列分配时,使用如下结构来控制分配过程,保证公平性:
typedef struct
{
vnet_device_per_worker_data_t *workers;
uword first_worker_thread_index;
uword last_worker_thread_index;
uword next_worker_thread_index;//分配过程中记录的下一个分配的线程索引
} vnet_device_main_t;
函数实现:
/* 给接口分配收包线程 */
void
vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index,
u16 queue_id, uword thread_index)
{
vnet_device_main_t *vdm = &vnet_device_main;
vlib_main_t *vm, *vm0;
vnet_device_input_runtime_t *rt;
vnet_device_and_queue_t *dq;
vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index);//队列所属设备描述符
ASSERT (hw->input_node_index > 0);
if (vdm->first_worker_thread_index == 0)/* 没有收包线程,即主线程负责所有工作,则使用主模式进行收包 */
thread_index = 0;
if (thread_index != 0 &&/* 选取合适的线程进行收包,算法为rr,保证公平性 */
(thread_index < vdm->first_worker_thread_index ||
thread_index > vdm->last_worker_thread_index))
{
thread_index = vdm->next_worker_thread_index++;
if (vdm->next_worker_thread_index > vdm->last_worker_thread_index)/* 开始下一个来回 */
vdm->next_worker_thread_index = vdm->first_worker_thread_index;
}
vm = vlib_mains[thread_index];/* 收包线程 */
vm0 = vlib_get_main ();/* 本线程,一般是主线程 */
vlib_worker_thread_barrier_sync (vm0);/* 开始同步 */
//获取该类设备所在的输入节点动态运行信息
rt = vlib_node_get_runtime_data (vm, hw->input_node_index);/* 获取输入节点的运行数据 */
//给该输入节点增加一个队列
vec_add2 (rt->devices_and_queues, dq, 1);/* 增加一个队列 */
dq->hw_if_index = hw_if_index;
dq->dev_instance = hw->dev_instance;
dq->queue_id = queue_id;
dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;/* 默认设置轮询模式 */
rt->enabled_node_state = VLIB_NODE_STATE_POLLING;
vnet_device_queue_update (vnm, rt);
/* 建立队列与线程之间的映射关系 */
vec_validate (hw->input_node_thread_index_by_queue, queue_id);
vec_validate (hw->rx_mode_by_queue, queue_id);
hw->input_node_thread_index_by_queue[queue_id] = thread_index;
hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING;
vlib_worker_thread_barrier_release (vm0);
/* 更新节点状态统计信息 */
vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);
}
动态增加一个设备,如何分配到哪一个线程
动态增加一个新的网络设备,将会在设备初始化的时候调用函数vnet_hw_interface_assign_rx_thread分配到指定的工作者线程。我们以af_packet类型的网卡为例进行分析。
/* 创建一个af_packet类型的接口 */
int
af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
u32 * sw_if_index)
{
af_packet_main_t *apm = &af_packet_main;
int ret, fd = -1, fd2 = -1;
struct tpacket_req *rx_req = 0;
struct tpacket_req *tx_req = 0;
struct ifreq ifr;
u8 *ring = 0;
af_packet_if_t *apif = 0;
u8 hw_addr[6];
clib_error_t *error;
vnet_sw_interface_t *sw;
vnet_hw_interface_t *hw;
vlib_thread_main_t *tm = vlib_get_thread_main ();
vnet_main_t *vnm = vnet_get_main ();
uword *p;
uword if_index;
u8 *host_if_name_dup = vec_dup (host_if_name);
int host_if_index = -1;
p = mhash_get (&apm->if_index_by_host_if_name, host_if_name);
if (p)
{
apif = vec_elt_at_index (apm->interfaces, p[0]);
*sw_if_index = apif->sw_if_index;
return VNET_API_ERROR_IF_ALREADY_EXISTS;
}
vec_validate (rx_req, 0);
rx_req->tp_block_size = AF_PACKET_RX_BLOCK_SIZE;
rx_req->tp_frame_size = AF_PACKET_RX_FRAME_SIZE;
rx_req->tp_block_nr = AF_PACKET_RX_BLOCK_NR;
rx_req->tp_frame_nr = AF_PACKET_RX_FRAME_NR;
vec_validate (tx_req, 0);
tx_req->tp_block_size = AF_PACKET_TX_BLOCK_SIZE;
tx_req->tp_frame_size = AF_PACKET_TX_FRAME_SIZE;
tx_req->tp_block_nr = AF_PACKET_TX_BLOCK_NR;
tx_req->tp_frame_nr = AF_PACKET_TX_FRAME_NR;
/*
* make sure host side of interface is 'UP' before binding AF_PACKET
* socket on it.
* 需要确保af_packet类型的接口依附的主机侧的接口是up的
*/
if ((fd2 = socket (AF_UNIX, SOCK_DGRAM, 0)) < 0)
{
vlib_log_debug (apm->log_class, "Failed to create socket");
ret = VNET_API_ERROR_SYSCALL_ERROR_1;
goto error;
}
clib_memcpy (ifr.ifr_name, (const char *) host_if_name,
vec_len (host_if_name));
/* 根据名字获取主机接口的索引 */
if ((ret = ioctl (fd2, SIOCGIFINDEX, &ifr)) < 0)
{
vlib_log_debug (apm->log_class, "af_packet_create error: %d", ret);
close (fd2);
return VNET_API_ERROR_INVALID_INTERFACE;
}
host_if_index = ifr.ifr_ifindex;
/* 获取主机接口的标志信息 */
if ((ret = ioctl (fd2, SIOCGIFFLAGS, &ifr)) < 0)
{
vlib_log_warn (apm->log_class, "af_packet_create error: %d", ret);
goto error;
}
/* 如果没有up,那么设置其为UP */
if (!(ifr.ifr_flags & IFF_UP))
{
ifr.ifr_flags |= IFF_UP;
if ((ret = ioctl (fd2, SIOCSIFFLAGS, &ifr)) < 0)
{
vlib_log_warn (apm->log_class, "af_packet_create error: %d", ret);
goto error;
}
}
if (fd2 > -1)
close (fd2);
/* 创建af_packet套接字 */
ret = create_packet_v2_sock (host_if_index, rx_req, tx_req, &fd, &ring);
if (ret != 0)
goto error;
ret = is_bridge (host_if_name);
if (ret == 0) /* is a bridge, ignore state */
host_if_index = -1;
/* So far everything looks good, let's create interface */
pool_get (apm->interfaces, apif);
if_index = apif - apm->interfaces;
apif->host_if_index = host_if_index;
apif->fd = fd;
apif->rx_ring = ring;
apif->tx_ring = ring + rx_req->tp_block_size * rx_req->tp_block_nr;
apif->rx_req = rx_req;
apif->tx_req = tx_req;
apif->host_if_name = host_if_name_dup;
apif->per_interface_next_index = ~0;
apif->next_tx_frame = 0;
apif->next_rx_frame = 0;
if (tm->n_vlib_mains > 1)/* 添加epoll监听事件,用于模拟中断 */
clib_spinlock_init (&apif->lockp);
{
clib_file_t template = { 0 };
template.read_function = af_packet_fd_read_ready;
template.file_descriptor = fd;
template.private_data = if_index;
template.flags = UNIX_FILE_EVENT_EDGE_TRIGGERED;
template.description = format (0, "%U", format_af_packet_device_name,
if_index);
apif->clib_file_index = clib_file_add (&file_main, &template);
}
/*use configured or generate random MAC address */
if (hw_addr_set)
clib_memcpy (hw_addr, hw_addr_set, 6);
else
{
f64 now = vlib_time_now (vm);
u32 rnd;
rnd = (u32) (now * 1e6);
rnd = random_u32 (&rnd);
clib_memcpy (hw_addr + 2, &rnd, sizeof (rnd));
hw_addr[0] = 2;
hw_addr[1] = 0xfe;
}
/* 注册以太网接口 */
error = ethernet_register_interface (vnm, af_packet_device_class.index,
if_index, hw_addr, &apif->hw_if_index,
af_packet_eth_flag_change);
if (error)
{
clib_memset (apif, 0, sizeof (*apif));
pool_put (apm->interfaces, apif);
vlib_log_err (apm->log_class, "Unable to register interface: %U",
format_clib_error, error);
clib_error_free (error);
ret = VNET_API_ERROR_SYSCALL_ERROR_1;
goto error;
}
sw = vnet_get_hw_sw_interface (vnm, apif->hw_if_index);
hw = vnet_get_hw_interface (vnm, apif->hw_if_index);
apif->sw_if_index = sw->sw_if_index;
/* 设置该接口对应的输入节点索引 */
vnet_hw_interface_set_input_node (vnm, apif->hw_if_index,
af_packet_input_node.index);
/* 分配该接口到指定的收包线程中 */
vnet_hw_interface_assign_rx_thread (vnm, apif->hw_if_index, 0, /* queue */
~0 /* any cpu */ );
/* 接口支持中断模式 */
hw->flags |= VNET_HW_INTERFACE_FLAG_SUPPORTS_INT_MODE;
vnet_hw_interface_set_flags (vnm, apif->hw_if_index,
VNET_HW_INTERFACE_FLAG_LINK_UP);
/* 设置节点运行状态 */
vnet_hw_interface_set_rx_mode (vnm, apif->hw_if_index, 0,
VNET_HW_INTERFACE_RX_MODE_INTERRUPT);
mhash_set_mem (&apm->if_index_by_host_if_name, host_if_name_dup, &if_index,
0);
if (sw_if_index)
*sw_if_index = apif->sw_if_index;
return 0;
error:
if (fd2 > -1)
close (fd2);
vec_free (host_if_name_dup);
vec_free (rx_req);
vec_free (tx_req);
return ret;
}
通过命令行强制指定网卡队列处理线程
可以使用如下命名强制指定网卡的队列所在的工作线程:
set interface rx-placement <interface> [queue <n>]
可以使用如下命令查看当前网卡与队列之间的关系:
show interface rx-placement
具体命令实现
/*?
* This command is used to assign a given interface, and optionally a
* given queue, to a different thread. If the '<em>queue</em>' is not provided,
* it defaults to 0. The '<em>worker</em>' parameter is zero based and the index
* in the thread name, for example, 0 in the thread name '<em>vpp_wk_0</em>'.
*
* @cliexpar
* Example of how to display the interface placement:
* @cliexstart{show interface rx-placement}
* Thread 1 (vpp_wk_0):
* node dpdk-input:
* GigabitEthernet7/0/0 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 0 (polling)
* VirtualEthernet0/0/12 queue 2 (polling)
* VirtualEthernet0/0/13 queue 0 (polling)
* VirtualEthernet0/0/13 queue 2 (polling)
* Thread 2 (vpp_wk_1):
* node dpdk-input:
* GigabitEthernet7/0/1 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 1 (polling)
* VirtualEthernet0/0/12 queue 3 (polling)
* VirtualEthernet0/0/13 queue 1 (polling)
* VirtualEthernet0/0/13 queue 3 (polling)
* @cliexend
* Example of how to assign a interface and queue to a worker thread:
* @cliexcmd{set interface rx-placement VirtualEthernet0/0/12 queue 1 worker 0}
* Example of how to display the interface placement:
* @cliexstart{show interface rx-placement}
* Thread 1 (vpp_wk_0):
* node dpdk-input:
* GigabitEthernet7/0/0 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 0 (polling)
* VirtualEthernet0/0/12 queue 1 (polling)
* VirtualEthernet0/0/12 queue 2 (polling)
* VirtualEthernet0/0/13 queue 0 (polling)
* VirtualEthernet0/0/13 queue 2 (polling)
* Thread 2 (vpp_wk_1):
* node dpdk-input:
* GigabitEthernet7/0/1 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 3 (polling)
* VirtualEthernet0/0/13 queue 1 (polling)
* VirtualEthernet0/0/13 queue 3 (polling)
* @cliexend
?*/
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (cmd_set_if_rx_placement,static) =
{
.path = "set interface rx-placement",
.short_help = "set interface rx-placement <interface> [queue <n>] "
"[worker <n> | main]",
.function = set_interface_rx_placement,
.is_mp_safe = 1,
};
/* *INDENT-ON* */
/*?
* This command is used to display the interface and queue worker
* thread placement.
*
* @cliexpar
* Example of how to display the interface placement:
* @cliexstart{show interface rx-placement}
* Thread 1 (vpp_wk_0):
* node dpdk-input:
* GigabitEthernet7/0/0 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 0 (polling)
* VirtualEthernet0/0/12 queue 2 (polling)
* VirtualEthernet0/0/13 queue 0 (polling)
* VirtualEthernet0/0/13 queue 2 (polling)
* Thread 2 (vpp_wk_1):
* node dpdk-input:
* GigabitEthernet7/0/1 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 1 (polling)
* VirtualEthernet0/0/12 queue 3 (polling)
* VirtualEthernet0/0/13 queue 1 (polling)
* VirtualEthernet0/0/13 queue 3 (polling)
* @cliexend
?*/
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (show_interface_rx_placement, static) =
{
.path = "show interface rx-placement",
.short_help = "show interface rx-placement",
.function = show_interface_rx_placement_fn,
};
/* *INDENT-ON* */
clib_error_t *
set_hw_interface_rx_placement (u32 hw_if_index, u32 queue_id,
u32 thread_index, u8 is_main)
{
vnet_main_t *vnm = vnet_get_main ();
vnet_device_main_t *vdm = &vnet_device_main;
clib_error_t *error = 0;
vnet_hw_interface_rx_mode mode = VNET_HW_INTERFACE_RX_MODE_UNKNOWN;
int rv;
if (is_main)
thread_index = 0;
else
thread_index += vdm->first_worker_thread_index;
if (thread_index > vdm->last_worker_thread_index)
return clib_error_return (0,
"please specify valid worker thread or main");
rv = vnet_hw_interface_get_rx_mode (vnm, hw_if_index, queue_id, &mode);
if (rv)
return clib_error_return (0, "not found");
//取消原先的分配
rv = vnet_hw_interface_unassign_rx_thread (vnm, hw_if_index, queue_id);
if (rv)
return clib_error_return (0, "not found");
//重新分配
vnet_hw_interface_assign_rx_thread (vnm, hw_if_index, queue_id,
thread_index);
vnet_hw_interface_set_rx_mode (vnm, hw_if_index, queue_id, mode);
return (error);
}