輸入節點作為報文處理的入口,VPP支持多種類型的報文輸入節點。每一種使用相同驅動類型的網卡有一個報文輸入節點,比如dpdk類型的網卡,tap,af_packet類型的網卡等等。一種驅動類型一個輸入節點,這主要是決定了收包的函數。輸入節點使用宏進行注冊,在main函數啟動前注冊到鏈表中。每一個工作線程都有相同的輸入節點。線程的輸入節點具體處理的隊列動態分配,采用輪叫原則。
節點類型
typedef enum
{
/* An internal node on the call graph (could be output). */
VLIB_NODE_TYPE_INTERNAL,
/* Nodes which input data into the processing graph.
Input nodes are called for each iteration of main loop.
輸入節點,報文流轉入口 */
VLIB_NODE_TYPE_INPUT,
/* Nodes to be called before all input nodes.
Used, for example, to clean out driver TX rings before
processing input. */
VLIB_NODE_TYPE_PRE_INPUT,
/* "Process" nodes which can be suspended and later resumed. */
VLIB_NODE_TYPE_PROCESS,
VLIB_N_NODE_TYPE,
} vlib_node_type_t;
節點注冊描述結構,用於注冊一個節點
typedef struct _vlib_node_fn_registration
{
vlib_node_function_t *function;
int priority;
struct _vlib_node_fn_registration *next_registration;
char *name;
} vlib_node_fn_registration_t;
注冊節點描述結構,用於表示一個注冊節點
typedef struct _vlib_node_registration
{
/* Vector processing function for this node. */
vlib_node_function_t *function;
/* Node function candidate registration with priority */
vlib_node_fn_registration_t *node_fn_registrations;
/* Node name. */
char *name;
/* Name of sibling (if applicable). */
char *sibling_of;
/* Node index filled in by registration. */
u32 index;
/* Type of this node. */
vlib_node_type_t type;
/* Error strings indexed by error code for this node. */
char **error_strings;
/* Buffer format/unformat for this node. */
format_function_t *format_buffer;
unformat_function_t *unformat_buffer;
/* Trace format/unformat for this node. */
format_function_t *format_trace;
unformat_function_t *unformat_trace;
/* Function to validate incoming frames. */
u8 *(*validate_frame) (struct vlib_main_t * vm,
struct vlib_node_runtime_t *,
struct vlib_frame_t * f);
/* Per-node runtime data. */
void *runtime_data;
/* Process stack size. 協程棧大小 */
u16 process_log2_n_stack_bytes;
/* Number of bytes of per-node run time data. */
u8 runtime_data_bytes;
/* State for input nodes. */
u8 state;
/* Node flags. */
u16 flags;
/* protocol at b->data[b->current_data] upon entry to the dispatch fn */
u8 protocol_hint;
/* Size of scalar and vector arguments in bytes. */
u16 scalar_size, vector_size;
/* Number of error codes used by this node. */
u16 n_errors;
/* Number of next node names that follow. */
u16 n_next_nodes;
/* Constructor link-list, don't ask... */
struct _vlib_node_registration *next_registration;
/* Names of next nodes which this node feeds into. */
char *next_nodes[];
} vlib_node_registration_t;
節點注冊相關的宏
#ifndef CLIB_MARCH_VARIANT
#define VLIB_REGISTER_NODE(x,...) \
__VA_ARGS__ vlib_node_registration_t x; \ //聲明一個需要注冊的節點
static void __vlib_add_node_registration_##x (void) \ //聲明一個靜態的添加一個節點的函數,有constructor屬性,在main函數之前執行
__attribute__((__constructor__)) ; \
static void __vlib_add_node_registration_##x (void) \
{ \ //定義添加節點函數,即將節點x鏈接到vm->node_main.node_registrations鏈表中
vlib_main_t * vm = vlib_get_main(); \
x.next_registration = vm->node_main.node_registrations; \
vm->node_main.node_registrations = &x; \
} \
static void __vlib_rm_node_registration_##x (void) \ //從鏈表中移除節點
__attribute__((__destructor__)) ; \
static void __vlib_rm_node_registration_##x (void) \
{ \
vlib_main_t * vm = vlib_get_main(); \
VLIB_REMOVE_FROM_LINKED_LIST (vm->node_main.node_registrations, \
&x, next_registration); \
} \
__VA_ARGS__ vlib_node_registration_t x // 定義一個需要注冊的節點,這里沒有分號,是因為使用這個宏的時候有分號,並且初始化該變量。
#else
#define VLIB_REGISTER_NODE(x,...) \
static __clib_unused vlib_node_registration_t __clib_unused_##x
#endif
VPP定義的輸入節點樣例
我們以DPDK類型的輸入節點來進行分析。
/* *INDENT-OFF* */
VLIB_REGISTER_NODE (dpdk_input_node) = {
.type = VLIB_NODE_TYPE_INPUT,
.name = "dpdk-input",
.sibling_of = "device-input",
/* Will be enabled if/when hardware is detected. */
.state = VLIB_NODE_STATE_DISABLED,
.format_buffer = format_ethernet_header_with_length,
.format_trace = format_dpdk_rx_trace,
.n_errors = DPDK_N_ERROR,
.error_strings = dpdk_error_strings,
};
輸入節點沒有指明其報文接收函數,因為接收函數也是采用宏來進行注冊的,是一個鏈表,也就是說接收報文函數可以有多個,使用優先級最高的那個。
輸入節點處理函數
輸入節點處理函數宏
#define VLIB_NODE_FN(node) \
uword CLIB_MARCH_SFX (node##_fn)(); \
static vlib_node_fn_registration_t \
CLIB_MARCH_SFX(node##_fn_registration) = \
{ .function = &CLIB_MARCH_SFX (node##_fn), }; \
\
static void __clib_constructor \
CLIB_MARCH_SFX (node##_multiarch_register) (void) \
{ \
extern vlib_node_registration_t node; \ //這里引用了一個node節點,其名字為宏的輸入參數,說明在定義節點和其處理函數的時候要求它們有一樣的名字。
vlib_node_fn_registration_t *r; \
r = & CLIB_MARCH_SFX (node##_fn_registration); \
r->priority = CLIB_MARCH_FN_PRIORITY(); \//處理函數優先級,根據優先級選擇最高優先級的處理函數
r->name = CLIB_MARCH_VARIANT_STR; \
r->next_registration = node.node_fn_registrations; \//將函數添加到其對應的節點鏈表中,從這里可以看出一個節點可以有多個處理函數,在函數register_node中會選擇一個優先級最高的函數作為節點的最終處理函數。
node.node_fn_registrations = r; \
} \
uword CLIB_CPU_OPTIMIZED CLIB_MARCH_SFX (node##_fn)
VLIB_NODE_FN (dpdk_input_node) (vlib_main_t * vm, vlib_node_runtime_t * node,
vlib_frame_t * f)
{
dpdk_main_t *dm = &dpdk_main;
dpdk_device_t *xd;
uword n_rx_packets = 0;
/* 獲取輸入節點的運行信息,其中的devices_and_queues包含了該線程在該輸入節點需要處理的隊列信息,動態增加該類設備時,會在修改其中的信息 */
vnet_device_input_runtime_t *rt = (void *) node->runtime_data;
vnet_device_and_queue_t *dq;/* */
u32 thread_index = node->thread_index;
/*
* Poll all devices on this cpu for input/interrupts.
*/
/* *INDENT-OFF* 遍歷該線程接管的每一個設備的每一個隊列 */
foreach_device_and_queue (dq, rt->devices_and_queues)
{
xd = vec_elt_at_index(dm->devices, dq->dev_instance);
if (PREDICT_FALSE (xd->flags & DPDK_DEVICE_FLAG_BOND_SLAVE))
continue; /* Do not poll slave to a bonded interface */
n_rx_packets += dpdk_device_input (vm, dm, xd, node, thread_index,
dq->queue_id);
}
/* *INDENT-ON* */
return n_rx_packets;
}
構建多線程時,為每一個線程構建輸入節點
該工作主要是由start_workers函數負責。
static clib_error_t *
start_workers (vlib_main_t * vm)
{
int i, j;
vlib_worker_thread_t *w;
vlib_main_t *vm_clone;
void *oldheap;
vlib_thread_main_t *tm = &vlib_thread_main;
vlib_thread_registration_t *tr;
vlib_node_runtime_t *rt;
u32 n_vlib_mains = tm->n_vlib_mains;
u32 worker_thread_index;
u8 *main_heap = clib_mem_get_per_cpu_heap ();
vec_reset_length (vlib_worker_threads);
/* Set up the main thread */
vec_add2_aligned (vlib_worker_threads, w, 1, CLIB_CACHE_LINE_BYTES);
w->elog_track.name = "main thread";
elog_track_register (&vm->elog_main, &w->elog_track);
/* 設置主線程的名字,前綴是“VPP” */
if (vec_len (tm->thread_prefix))
{
w->name = format (0, "%v_main%c", tm->thread_prefix, '\0');
vlib_set_thread_name ((char *) w->name);
}
vm->elog_main.lock =
clib_mem_alloc_aligned (CLIB_CACHE_LINE_BYTES, CLIB_CACHE_LINE_BYTES);
vm->elog_main.lock[0] = 0;
/* 存在其它線程,可能是hqos或者是worker */
if (n_vlib_mains > 1)
{
/* Replace hand-crafted length-1 vector with a real vector */
vlib_mains = 0;
vec_validate_aligned (vlib_mains, tm->n_vlib_mains - 1,
CLIB_CACHE_LINE_BYTES);
_vec_len (vlib_mains) = 0;
vec_add1_aligned (vlib_mains, vm, CLIB_CACHE_LINE_BYTES);
vlib_worker_threads->wait_at_barrier =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
vlib_worker_threads->workers_at_barrier =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
vlib_worker_threads->node_reforks_required =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
/* We'll need the rpc vector lock... */
clib_spinlock_init (&vm->pending_rpc_lock);
/* Ask for an initial barrier sync */
*vlib_worker_threads->workers_at_barrier = 0;
*vlib_worker_threads->wait_at_barrier = 1;
/* Without update or refork */
*vlib_worker_threads->node_reforks_required = 0;
vm->need_vlib_worker_thread_node_runtime_update = 0;
/* init timing */
vm->barrier_epoch = 0;
vm->barrier_no_close_before = 0;
worker_thread_index = 1;
/* 遍歷每一個功能注冊節點,一般只有一個,即報文處理函數 */
for (i = 0; i < vec_len (tm->registrations); i++)
{
vlib_node_main_t *nm, *nm_clone;
int k;
tr = tm->registrations[i];
if (tr->count == 0)
continue;
for (k = 0; k < tr->count; k++)/* 該類型任務需要幾個核 */
{
vlib_node_t *n;
/* 添加一個線程描述控制塊 */
vec_add2 (vlib_worker_threads, w, 1);
......
/* Allocate all nodes in single block for speed */
/* 分配注冊節點數組內存 */
n = clib_mem_alloc_no_fail (vec_len (nm->nodes) * sizeof (*n));
/* 拷貝注冊節點數組,是一個指針數組 */
for (j = 0; j < vec_len (nm->nodes); j++)
{
clib_memcpy (n, nm->nodes[j], sizeof (*n));
/* none of the copied nodes have enqueue rights given out */
n->owner_node_index = VLIB_INVALID_NODE_INDEX;
clib_memset (&n->stats_total, 0, sizeof (n->stats_total));
clib_memset (&n->stats_last_clear, 0,
sizeof (n->stats_last_clear));
vec_add1 (nm_clone->nodes, n);
n++;
}
/* 拷貝內部節點數組 */
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL],
CLIB_CACHE_LINE_BYTES);
/* 初始化每一個內部節點 */
vec_foreach (rt,
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL])
{
vlib_node_t *n = vlib_get_node (vm, rt->node_index);
/* 設置運行的線程索引 */
rt->thread_index = vm_clone->thread_index;
/* copy initial runtime_data from node */
/* 拷貝原始的節點數據 */
if (n->runtime_data && n->runtime_data_bytes > 0)
clib_memcpy (rt->runtime_data, n->runtime_data,
clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
n->runtime_data_bytes));
}
/* 拷貝輸入節點數組 */
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT],
CLIB_CACHE_LINE_BYTES);
/* 初始化每一個輸入節點 */
vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
{
vlib_node_t *n = vlib_get_node (vm, rt->node_index);
/* 設置運行的線程索引 */
rt->thread_index = vm_clone->thread_index;
/* copy initial runtime_data from node */
/* 拷貝原始的節點數據 */
if (n->runtime_data && n->runtime_data_bytes > 0)
clib_memcpy (rt->runtime_data, n->runtime_data,
clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
n->runtime_data_bytes));
}
/* 復制所有VLIB_NODE_TYPE_PRE_INPUT節點運行時數據 */
nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT] =
vec_dup_aligned (nm->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT],
CLIB_CACHE_LINE_BYTES);
vec_foreach (rt,
nm_clone->nodes_by_type[VLIB_NODE_TYPE_PRE_INPUT])
{
vlib_node_t *n = vlib_get_node (vm, rt->node_index);
/* 設置正確的運行線程 */
rt->thread_index = vm_clone->thread_index;
/* copy initial runtime_data from node */
/* 拷貝原始的注冊運行數據,這樣就跟源節點就無關了 */
if (n->runtime_data && n->runtime_data_bytes > 0)
clib_memcpy (rt->runtime_data, n->runtime_data,
clib_min (VLIB_NODE_RUNTIME_DATA_SIZE,
n->runtime_data_bytes));
}
/* 拷貝所有的協程節點 */
nm_clone->processes = vec_dup_aligned (nm->processes,
CLIB_CACHE_LINE_BYTES);
/* Create per-thread frame freelist */
/* 創建一個幀分配控制塊 */
nm_clone->frame_sizes = vec_new (vlib_frame_size_t, 1);
#ifdef VLIB_SUPPORTS_ARBITRARY_SCALAR_SIZES
nm_clone->frame_size_hash = hash_create (0, sizeof (uword));
#endif
/* Packet trace buffers are guaranteed to be empty, nothing to do here */
clib_mem_set_heap (oldheap);
/* 加入全局數組vlib_mains */
vec_add1_aligned (vlib_mains, vm_clone, CLIB_CACHE_LINE_BYTES);
/* 構建錯誤統計數組 */
vm_clone->error_main.counters = vec_dup_aligned
(vlib_mains[0]->error_main.counters, CLIB_CACHE_LINE_BYTES);
vm_clone->error_main.counters_last_clear = vec_dup_aligned
(vlib_mains[0]->error_main.counters_last_clear,
CLIB_CACHE_LINE_BYTES);
worker_thread_index++;
}
}
}
else
{
/* only have non-data-structure copy threads to create... */
for (i = 0; i < vec_len (tm->registrations); i++)
{
tr = tm->registrations[i];
for (j = 0; j < tr->count; j++)
{
vec_add2 (vlib_worker_threads, w, 1);
if (tr->mheap_size)
{
#if USE_DLMALLOC == 0
w->thread_mheap =
mheap_alloc (0 /* use VM */ , tr->mheap_size);
#else
w->thread_mheap =
create_mspace (tr->mheap_size, 0 /* locked */ );
#endif
}
else
w->thread_mheap = main_heap;
w->thread_stack =
vlib_thread_stack_init (w - vlib_worker_threads);
w->thread_function = tr->function;
w->thread_function_arg = w;
w->instance_id = j;
w->elog_track.name =
(char *) format (0, "%s %d", tr->name, j + 1);
w->registration = tr;
vec_add1 (w->elog_track.name, 0);
elog_track_register (&vm->elog_main, &w->elog_track);
}
}
}
......
return 0;
}
分配每一個線程處理的隊列個數,采用的是輪叫策略
//dpdk組件初始化
static clib_error_t *
dpdk_lib_init (dpdk_main_t * dm)
{
u32 nports;
u32 mtu, max_rx_frame;
int i;
clib_error_t *error;
vlib_main_t *vm = vlib_get_main ();
vlib_thread_main_t *tm = vlib_get_thread_main ();
vnet_device_main_t *vdm = &vnet_device_main;
vnet_sw_interface_t *sw;
vnet_hw_interface_t *hi;
dpdk_device_t *xd;
vlib_pci_addr_t last_pci_addr;
u32 last_pci_addr_port = 0;
vlib_thread_registration_t *tr_hqos;
uword *p_hqos;
u32 next_hqos_cpu = 0;
u8 af_packet_instance_num = 0;
u8 bond_ether_instance_num = 0;
last_pci_addr.as_u32 = ~0;
dm->hqos_cpu_first_index = 0;
dm->hqos_cpu_count = 0;
/* find out which cpus will be used for I/O TX */
p_hqos = hash_get_mem (tm->thread_registrations_by_name, "hqos-threads");
tr_hqos = p_hqos ? (vlib_thread_registration_t *) p_hqos[0] : 0;
if (tr_hqos && tr_hqos->count > 0)
{
dm->hqos_cpu_first_index = tr_hqos->first_index;
dm->hqos_cpu_count = tr_hqos->count;
}
vec_validate_aligned (dm->devices_by_hqos_cpu, tm->n_vlib_mains - 1,
CLIB_CACHE_LINE_BYTES);
nports = rte_eth_dev_count_avail ();
if (nports < 1)
{
dpdk_log_notice ("DPDK drivers found no ports...");
}
if (CLIB_DEBUG > 0)
dpdk_log_notice ("DPDK drivers found %d ports...", nports);
if (dm->conf->enable_tcp_udp_checksum)
dm->buffer_flags_template &= ~(VNET_BUFFER_F_L4_CHECKSUM_CORRECT
| VNET_BUFFER_F_L4_CHECKSUM_COMPUTED);
/* vlib_buffer_t template */
vec_validate_aligned (dm->per_thread_data, tm->n_vlib_mains - 1,
CLIB_CACHE_LINE_BYTES);
for (i = 0; i < tm->n_vlib_mains; i++)
{
dpdk_per_thread_data_t *ptd = vec_elt_at_index (dm->per_thread_data, i);
clib_memset (&ptd->buffer_template, 0, sizeof (vlib_buffer_t));
ptd->buffer_template.flags = dm->buffer_flags_template;
vnet_buffer (&ptd->buffer_template)->sw_if_index[VLIB_TX] = (u32) ~ 0;
}
/* *INDENT-OFF* 遍歷每一個dpdk設備 */
RTE_ETH_FOREACH_DEV(i)
{
......
if (devconf->workers)
{
int i;
q = 0;
clib_bitmap_foreach (i, devconf->workers, (//遍歷每一個工作者線程
{
//分配該設備的隊列q到工作線程i上,采用的是輪叫策略
vnet_hw_interface_assign_rx_thread (dm->vnet_main, xd->hw_if_index, q++,
vdm->first_worker_thread_index + i);
}));
}
else
//如果沒有工作者線程的話,全部由主線程負責
for (q = 0; q < xd->rx_q_used; q++)
{
vnet_hw_interface_assign_rx_thread (dm->vnet_main, xd->hw_if_index, q, /* any */
~1);
}
......
}
/* *INDENT-ON* */
return 0;
}
vnet_hw_interface_assign_rx_thread
該函數在進行隊列分配時,使用如下結構來控制分配過程,保證公平性:
typedef struct
{
vnet_device_per_worker_data_t *workers;
uword first_worker_thread_index;
uword last_worker_thread_index;
uword next_worker_thread_index;//分配過程中記錄的下一個分配的線程索引
} vnet_device_main_t;
函數實現:
/* 給接口分配收包線程 */
void
vnet_hw_interface_assign_rx_thread (vnet_main_t * vnm, u32 hw_if_index,
u16 queue_id, uword thread_index)
{
vnet_device_main_t *vdm = &vnet_device_main;
vlib_main_t *vm, *vm0;
vnet_device_input_runtime_t *rt;
vnet_device_and_queue_t *dq;
vnet_hw_interface_t *hw = vnet_get_hw_interface (vnm, hw_if_index);//隊列所屬設備描述符
ASSERT (hw->input_node_index > 0);
if (vdm->first_worker_thread_index == 0)/* 沒有收包線程,即主線程負責所有工作,則使用主模式進行收包 */
thread_index = 0;
if (thread_index != 0 &&/* 選取合適的線程進行收包,算法為rr,保證公平性 */
(thread_index < vdm->first_worker_thread_index ||
thread_index > vdm->last_worker_thread_index))
{
thread_index = vdm->next_worker_thread_index++;
if (vdm->next_worker_thread_index > vdm->last_worker_thread_index)/* 開始下一個來回 */
vdm->next_worker_thread_index = vdm->first_worker_thread_index;
}
vm = vlib_mains[thread_index];/* 收包線程 */
vm0 = vlib_get_main ();/* 本線程,一般是主線程 */
vlib_worker_thread_barrier_sync (vm0);/* 開始同步 */
//獲取該類設備所在的輸入節點動態運行信息
rt = vlib_node_get_runtime_data (vm, hw->input_node_index);/* 獲取輸入節點的運行數據 */
//給該輸入節點增加一個隊列
vec_add2 (rt->devices_and_queues, dq, 1);/* 增加一個隊列 */
dq->hw_if_index = hw_if_index;
dq->dev_instance = hw->dev_instance;
dq->queue_id = queue_id;
dq->mode = VNET_HW_INTERFACE_RX_MODE_POLLING;/* 默認設置輪詢模式 */
rt->enabled_node_state = VLIB_NODE_STATE_POLLING;
vnet_device_queue_update (vnm, rt);
/* 建立隊列與線程之間的映射關系 */
vec_validate (hw->input_node_thread_index_by_queue, queue_id);
vec_validate (hw->rx_mode_by_queue, queue_id);
hw->input_node_thread_index_by_queue[queue_id] = thread_index;
hw->rx_mode_by_queue[queue_id] = VNET_HW_INTERFACE_RX_MODE_POLLING;
vlib_worker_thread_barrier_release (vm0);
/* 更新節點狀態統計信息 */
vlib_node_set_state (vm, hw->input_node_index, rt->enabled_node_state);
}
動態增加一個設備,如何分配到哪一個線程
動態增加一個新的網絡設備,將會在設備初始化的時候調用函數vnet_hw_interface_assign_rx_thread分配到指定的工作者線程。我們以af_packet類型的網卡為例進行分析。
/* 創建一個af_packet類型的接口 */
int
af_packet_create_if (vlib_main_t * vm, u8 * host_if_name, u8 * hw_addr_set,
u32 * sw_if_index)
{
af_packet_main_t *apm = &af_packet_main;
int ret, fd = -1, fd2 = -1;
struct tpacket_req *rx_req = 0;
struct tpacket_req *tx_req = 0;
struct ifreq ifr;
u8 *ring = 0;
af_packet_if_t *apif = 0;
u8 hw_addr[6];
clib_error_t *error;
vnet_sw_interface_t *sw;
vnet_hw_interface_t *hw;
vlib_thread_main_t *tm = vlib_get_thread_main ();
vnet_main_t *vnm = vnet_get_main ();
uword *p;
uword if_index;
u8 *host_if_name_dup = vec_dup (host_if_name);
int host_if_index = -1;
p = mhash_get (&apm->if_index_by_host_if_name, host_if_name);
if (p)
{
apif = vec_elt_at_index (apm->interfaces, p[0]);
*sw_if_index = apif->sw_if_index;
return VNET_API_ERROR_IF_ALREADY_EXISTS;
}
vec_validate (rx_req, 0);
rx_req->tp_block_size = AF_PACKET_RX_BLOCK_SIZE;
rx_req->tp_frame_size = AF_PACKET_RX_FRAME_SIZE;
rx_req->tp_block_nr = AF_PACKET_RX_BLOCK_NR;
rx_req->tp_frame_nr = AF_PACKET_RX_FRAME_NR;
vec_validate (tx_req, 0);
tx_req->tp_block_size = AF_PACKET_TX_BLOCK_SIZE;
tx_req->tp_frame_size = AF_PACKET_TX_FRAME_SIZE;
tx_req->tp_block_nr = AF_PACKET_TX_BLOCK_NR;
tx_req->tp_frame_nr = AF_PACKET_TX_FRAME_NR;
/*
* make sure host side of interface is 'UP' before binding AF_PACKET
* socket on it.
* 需要確保af_packet類型的接口依附的主機側的接口是up的
*/
if ((fd2 = socket (AF_UNIX, SOCK_DGRAM, 0)) < 0)
{
vlib_log_debug (apm->log_class, "Failed to create socket");
ret = VNET_API_ERROR_SYSCALL_ERROR_1;
goto error;
}
clib_memcpy (ifr.ifr_name, (const char *) host_if_name,
vec_len (host_if_name));
/* 根據名字獲取主機接口的索引 */
if ((ret = ioctl (fd2, SIOCGIFINDEX, &ifr)) < 0)
{
vlib_log_debug (apm->log_class, "af_packet_create error: %d", ret);
close (fd2);
return VNET_API_ERROR_INVALID_INTERFACE;
}
host_if_index = ifr.ifr_ifindex;
/* 獲取主機接口的標志信息 */
if ((ret = ioctl (fd2, SIOCGIFFLAGS, &ifr)) < 0)
{
vlib_log_warn (apm->log_class, "af_packet_create error: %d", ret);
goto error;
}
/* 如果沒有up,那么設置其為UP */
if (!(ifr.ifr_flags & IFF_UP))
{
ifr.ifr_flags |= IFF_UP;
if ((ret = ioctl (fd2, SIOCSIFFLAGS, &ifr)) < 0)
{
vlib_log_warn (apm->log_class, "af_packet_create error: %d", ret);
goto error;
}
}
if (fd2 > -1)
close (fd2);
/* 創建af_packet套接字 */
ret = create_packet_v2_sock (host_if_index, rx_req, tx_req, &fd, &ring);
if (ret != 0)
goto error;
ret = is_bridge (host_if_name);
if (ret == 0) /* is a bridge, ignore state */
host_if_index = -1;
/* So far everything looks good, let's create interface */
pool_get (apm->interfaces, apif);
if_index = apif - apm->interfaces;
apif->host_if_index = host_if_index;
apif->fd = fd;
apif->rx_ring = ring;
apif->tx_ring = ring + rx_req->tp_block_size * rx_req->tp_block_nr;
apif->rx_req = rx_req;
apif->tx_req = tx_req;
apif->host_if_name = host_if_name_dup;
apif->per_interface_next_index = ~0;
apif->next_tx_frame = 0;
apif->next_rx_frame = 0;
if (tm->n_vlib_mains > 1)/* 添加epoll監聽事件,用於模擬中斷 */
clib_spinlock_init (&apif->lockp);
{
clib_file_t template = { 0 };
template.read_function = af_packet_fd_read_ready;
template.file_descriptor = fd;
template.private_data = if_index;
template.flags = UNIX_FILE_EVENT_EDGE_TRIGGERED;
template.description = format (0, "%U", format_af_packet_device_name,
if_index);
apif->clib_file_index = clib_file_add (&file_main, &template);
}
/*use configured or generate random MAC address */
if (hw_addr_set)
clib_memcpy (hw_addr, hw_addr_set, 6);
else
{
f64 now = vlib_time_now (vm);
u32 rnd;
rnd = (u32) (now * 1e6);
rnd = random_u32 (&rnd);
clib_memcpy (hw_addr + 2, &rnd, sizeof (rnd));
hw_addr[0] = 2;
hw_addr[1] = 0xfe;
}
/* 注冊以太網接口 */
error = ethernet_register_interface (vnm, af_packet_device_class.index,
if_index, hw_addr, &apif->hw_if_index,
af_packet_eth_flag_change);
if (error)
{
clib_memset (apif, 0, sizeof (*apif));
pool_put (apm->interfaces, apif);
vlib_log_err (apm->log_class, "Unable to register interface: %U",
format_clib_error, error);
clib_error_free (error);
ret = VNET_API_ERROR_SYSCALL_ERROR_1;
goto error;
}
sw = vnet_get_hw_sw_interface (vnm, apif->hw_if_index);
hw = vnet_get_hw_interface (vnm, apif->hw_if_index);
apif->sw_if_index = sw->sw_if_index;
/* 設置該接口對應的輸入節點索引 */
vnet_hw_interface_set_input_node (vnm, apif->hw_if_index,
af_packet_input_node.index);
/* 分配該接口到指定的收包線程中 */
vnet_hw_interface_assign_rx_thread (vnm, apif->hw_if_index, 0, /* queue */
~0 /* any cpu */ );
/* 接口支持中斷模式 */
hw->flags |= VNET_HW_INTERFACE_FLAG_SUPPORTS_INT_MODE;
vnet_hw_interface_set_flags (vnm, apif->hw_if_index,
VNET_HW_INTERFACE_FLAG_LINK_UP);
/* 設置節點運行狀態 */
vnet_hw_interface_set_rx_mode (vnm, apif->hw_if_index, 0,
VNET_HW_INTERFACE_RX_MODE_INTERRUPT);
mhash_set_mem (&apm->if_index_by_host_if_name, host_if_name_dup, &if_index,
0);
if (sw_if_index)
*sw_if_index = apif->sw_if_index;
return 0;
error:
if (fd2 > -1)
close (fd2);
vec_free (host_if_name_dup);
vec_free (rx_req);
vec_free (tx_req);
return ret;
}
通過命令行強制指定網卡隊列處理線程
可以使用如下命名強制指定網卡的隊列所在的工作線程:
set interface rx-placement <interface> [queue <n>]
可以使用如下命令查看當前網卡與隊列之間的關系:
show interface rx-placement
具體命令實現
/*?
* This command is used to assign a given interface, and optionally a
* given queue, to a different thread. If the '<em>queue</em>' is not provided,
* it defaults to 0. The '<em>worker</em>' parameter is zero based and the index
* in the thread name, for example, 0 in the thread name '<em>vpp_wk_0</em>'.
*
* @cliexpar
* Example of how to display the interface placement:
* @cliexstart{show interface rx-placement}
* Thread 1 (vpp_wk_0):
* node dpdk-input:
* GigabitEthernet7/0/0 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 0 (polling)
* VirtualEthernet0/0/12 queue 2 (polling)
* VirtualEthernet0/0/13 queue 0 (polling)
* VirtualEthernet0/0/13 queue 2 (polling)
* Thread 2 (vpp_wk_1):
* node dpdk-input:
* GigabitEthernet7/0/1 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 1 (polling)
* VirtualEthernet0/0/12 queue 3 (polling)
* VirtualEthernet0/0/13 queue 1 (polling)
* VirtualEthernet0/0/13 queue 3 (polling)
* @cliexend
* Example of how to assign a interface and queue to a worker thread:
* @cliexcmd{set interface rx-placement VirtualEthernet0/0/12 queue 1 worker 0}
* Example of how to display the interface placement:
* @cliexstart{show interface rx-placement}
* Thread 1 (vpp_wk_0):
* node dpdk-input:
* GigabitEthernet7/0/0 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 0 (polling)
* VirtualEthernet0/0/12 queue 1 (polling)
* VirtualEthernet0/0/12 queue 2 (polling)
* VirtualEthernet0/0/13 queue 0 (polling)
* VirtualEthernet0/0/13 queue 2 (polling)
* Thread 2 (vpp_wk_1):
* node dpdk-input:
* GigabitEthernet7/0/1 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 3 (polling)
* VirtualEthernet0/0/13 queue 1 (polling)
* VirtualEthernet0/0/13 queue 3 (polling)
* @cliexend
?*/
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (cmd_set_if_rx_placement,static) =
{
.path = "set interface rx-placement",
.short_help = "set interface rx-placement <interface> [queue <n>] "
"[worker <n> | main]",
.function = set_interface_rx_placement,
.is_mp_safe = 1,
};
/* *INDENT-ON* */
/*?
* This command is used to display the interface and queue worker
* thread placement.
*
* @cliexpar
* Example of how to display the interface placement:
* @cliexstart{show interface rx-placement}
* Thread 1 (vpp_wk_0):
* node dpdk-input:
* GigabitEthernet7/0/0 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 0 (polling)
* VirtualEthernet0/0/12 queue 2 (polling)
* VirtualEthernet0/0/13 queue 0 (polling)
* VirtualEthernet0/0/13 queue 2 (polling)
* Thread 2 (vpp_wk_1):
* node dpdk-input:
* GigabitEthernet7/0/1 queue 0 (polling)
* node vhost-user-input:
* VirtualEthernet0/0/12 queue 1 (polling)
* VirtualEthernet0/0/12 queue 3 (polling)
* VirtualEthernet0/0/13 queue 1 (polling)
* VirtualEthernet0/0/13 queue 3 (polling)
* @cliexend
?*/
/* *INDENT-OFF* */
VLIB_CLI_COMMAND (show_interface_rx_placement, static) =
{
.path = "show interface rx-placement",
.short_help = "show interface rx-placement",
.function = show_interface_rx_placement_fn,
};
/* *INDENT-ON* */
clib_error_t *
set_hw_interface_rx_placement (u32 hw_if_index, u32 queue_id,
u32 thread_index, u8 is_main)
{
vnet_main_t *vnm = vnet_get_main ();
vnet_device_main_t *vdm = &vnet_device_main;
clib_error_t *error = 0;
vnet_hw_interface_rx_mode mode = VNET_HW_INTERFACE_RX_MODE_UNKNOWN;
int rv;
if (is_main)
thread_index = 0;
else
thread_index += vdm->first_worker_thread_index;
if (thread_index > vdm->last_worker_thread_index)
return clib_error_return (0,
"please specify valid worker thread or main");
rv = vnet_hw_interface_get_rx_mode (vnm, hw_if_index, queue_id, &mode);
if (rv)
return clib_error_return (0, "not found");
//取消原先的分配
rv = vnet_hw_interface_unassign_rx_thread (vnm, hw_if_index, queue_id);
if (rv)
return clib_error_return (0, "not found");
//重新分配
vnet_hw_interface_assign_rx_thread (vnm, hw_if_index, queue_id,
thread_index);
vnet_hw_interface_set_rx_mode (vnm, hw_if_index, queue_id, mode);
return (error);
}