节点类型作用:
VLIB_NODE_TYPE_INTERNAL
内部节点,最典型的节点接收缓冲向量,执行操作。vpp大部分节点是这个角色,主要对数据流做内部处理,比如ip4-input-no-checksum/ip4-icmp-input等内部功能节点
VLIB_NODE_TYPE_INPUT
输入节点,通常是设备输入节点。从零开始创建框架并分派到内部节点(internal), 比如dpdk-input/af-packet-input节点,
input节点收包模式分为轮询和中断两种模式vlib_node_state_t.
VLIB_NODE_TYPE_PRE_INPUT
目前只有一个epoll node,对socket相关逻辑提供服务,主要使用在控制业务上。
VLIB_NODE_TYPE_PROCESS
该类型的node可以被挂起也可以被恢复,有独立的分配在heap上的运行栈。类似与在一个线程中实现了多任务的调度机制,主要用来修改vpp node内部参数。
线程节点,和线程一样,可以可以暂停、等待事件、恢复,不同于pthread_thread,他是基于setjump/longjump实现的弦程.
等待一个事件:always_inline f64 vlib_process_wait_for_event_or_clock (vlib_main_t * vm, f64 dt)
发送一个事件: always_inline void vlib_process_signal_event (vlib_main_t * vm, uword node_index, uword type_opaque, uword data)
数据流节点中流程:
- 输入节点轮询(或中断驱动)接口的接收队列,获取批量报文。
- 接着把这些报文按照下个节点功能组成一个矢量(vector)或者一帧(frame)。
- 比如:输入节点收集所有IPv4的报文并把它们传递给ip4-input节点;
- 输入节点收集所有IPv6的报文并把它们传递给ip6-input节点。
- 当ip6-input节点被调度时,它取出这一帧报文,利用双循环(dual-loop) 或四循环(quad-loop)以及预取报文到CPU缓存技术处理报文,以达到最优性能。这能够通过减少缓存未命中数来有效利用CPU缓存。当ip6-input节点处理完当前帧的所有报文后,把报文传递到后续不同的节点。比如:如果某报文校验失败,就被传送到error-drop节点;正常报文被传送到ip6-lookup节点。
- 一帧报文依次通过不同的图形节点,直到它们被interface-output节点发送出去。
VPP图形节点的处理逻辑
第一步:
第二步:
第三步:
第四步:
按照网络功能一次处理一帧报文优势:
- 从软件工程的角度看,每一个图形节点是独立和自治的。
- 从性能的角度看,主要的好处是可以优化CPU指令缓存(i-cache)的使用。当前帧的第一个报文加载当前节点的指令到指令缓存,当前帧的后续报文就可以“免费”使用指令缓存。这里,VPP充分利用了CPU的超标量结构,使报文内存加载和报文处理交织进行,达到更有效地利用CPU处理流水线。
- VPP也充分利用了CPU的预测执行功能来达到更好的性能。从预测重用报文间的转发对象(比如邻接表和路由查找表),以及预先加载报文内容到CPU的本地数据缓存(d-cache)供下一次循环使用,这些有效使用计算硬件的技术,使得VPP可以利用更细粒度的并行性
VLIB_INIT_FUNCTION宏定义分析
举个例子:
1 static clib_error_t *
2 dpdk_init (vlib_main_t * vm)
3 { 4 dpdk_main_t *dm = &dpdk_main; 5 clib_error_t *error = 0; 6 7 /* verify that structs are cacheline aligned */ 8 STATIC_ASSERT (offsetof (dpdk_device_t, cacheline0) == 0, 9 "Cache line marker must be 1st element in dpdk_device_t"); 10 STATIC_ASSERT (offsetof (dpdk_device_t, cacheline1) == 11 CLIB_CACHE_LINE_BYTES, 12 "Data in cache line 0 is bigger than cache line size"); 13 STATIC_ASSERT (offsetof (frame_queue_trace_t, cacheline0) == 0, 14 "Cache line marker must be 1st element in frame_queue_trace_t"); 15 STATIC_ASSERT (RTE_CACHE_LINE_SIZE == 1 << CLIB_LOG2_CACHE_LINE_BYTES, 16 "DPDK RTE CACHE LINE SIZE does not match with 1<<CLIB_LOG2_CACHE_LINE_BYTES"); 17 18 dpdk_cli_reference (); 19 20 dm->vlib_main = vm; 21 dm->vnet_main = vnet_get_main (); 22 dm->conf = &dpdk_config_main; 23 24 dm->conf->nchannels = 4; 25 vec_add1 (dm->conf->eal_init_args, (u8 *) "vnet"); 26 vec_add1 (dm->conf->eal_init_args, (u8 *) "--in-memory"); 27 28 /* Default vlib_buffer_t flags, DISABLES tcp/udp checksumming... */ 29 dm->buffer_flags_template = (VLIB_BUFFER_TOTAL_LENGTH_VALID | 30 VLIB_BUFFER_EXT_HDR_VALID | 31 VNET_BUFFER_F_L4_CHECKSUM_COMPUTED | 32 VNET_BUFFER_F_L4_CHECKSUM_CORRECT); 33 34 dm->stat_poll_interval = DPDK_STATS_POLL_INTERVAL; 35 dm->link_state_poll_interval = DPDK_LINK_POLL_INTERVAL; 36 37 dm->log_default = vlib_log_register_class ("dpdk", 0); 38 39 return error; 40 }
VLIB_INIT_FUNCTION (dpdk_init);
VLIB_INIT_FUNCTION 宏定义展开如下所示,主要由VLIB_DECLARE_INIT_FUNCTION宏定义完成注册动作:
1 #ifndef CLIB_MARCH_VARIANT
2 #define VLIB_DECLARE_INIT_FUNCTION(x, tag) \
3 vlib_init_function_t * _VLIB_INIT_FUNCTION_SYMBOL (x, tag) = x; \ 4 static void __vlib_add_##tag##_function_##x (void) \ 5 __attribute__((__constructor__)) ; \ 6 static _vlib_init_function_list_elt_t _vlib_init_function_##tag_##x; \ 7 static void __vlib_add_##tag##_function_##x (void) \ 8 { \ 9 vlib_main_t * vm = vlib_get_main(); \ 10 _vlib_init_function_##tag_##x.next_init_function \ 11 = vm->tag##_function_registrations; \ 12 vm->tag##_function_registrations = &_vlib_init_function_##tag_##x; \ 13 _vlib_init_function_##tag_##x.f = &x; \ 14 _vlib_init_function_##tag_##x.name = #x; \ 15 } \ 16 static void __vlib_rm_##tag##_function_##x (void) \ 17 __attribute__((__destructor__)) ; \ 18 static void __vlib_rm_##tag##_function_##x (void) \ 19 { \ 20 vlib_main_t * vm = vlib_get_main(); \ 21 _vlib_init_function_list_elt_t *this, *prev; \ 22 this = vm->tag##_function_registrations; \ 23 if (this == 0) \ 24 return; \ 25 if (this->f == &x) \ 26 { \ 27 vm->tag##_function_registrations = this->next_init_function; \ 28 return; \ 29 } \ 30 prev = this; \ 31 this = this->next_init_function; \ 32 while (this) \ 33 { \ 34 if (this->f == &x) \ 35 { \ 36 prev->next_init_function = \ 37 this->next_init_function; \ 38 return; \ 39 } \ 40 prev = this; \ 41 this = this->next_init_function; \ 42 } \ 43 } \ 44 static _vlib_init_function_list_elt_t _vlib_init_function_##tag_##x 45 #else 46 /* create unused pointer to silence compiler warnings and get whole 47 function optimized out */ 48 #define VLIB_DECLARE_INIT_FUNCTION(x, tag) \ 49 static __clib_unused void * __clib_unused_##tag##_##x = x 50 #endif 51 52 #define VLIB_INIT_FUNCTION(x) VLIB_DECLARE_INIT_FUNCTION(x,init)
VLIB_INIT_FUNCTION宏定义中的x表示注册的函数,传递给VLIB_DECLARE_INIT_FUNCTION宏定义并添加第二个参数init,
VLIB_DECLARE_INIT_FUNCTION宏定义主要由两个自动构造的函数组成,一个是注册函数、一个是卸载函数
首先构造初始化函数,_VLIB_INIT_FUNCTION_SYMBOL宏定义构造函数头:申明类型为:vlib_init_function_t *的变量
初始化注册流程
当vpp启动时根据以下路径对init_function_registrations链表中的注册函数进行调用完成初始化工作:
main-> vlib_unix_main -> clib_calljmp (thread0, (uword) vm,(void *) -> vlib_main -> vlib_call_all_init_functions (vm) -> vlib_call_init_exit_functions -> call_init_exit_functions_internal
1 static inline clib_error_t *
2 call_init_exit_functions_internal (vlib_main_t * vm,
3 _vlib_init_function_list_elt_t ** headp, 4 int call_once, int do_sort) 5 { 6 clib_error_t *error = 0; 7 _vlib_init_function_list_elt_t *i; 8 9 if (do_sort && (error = vlib_sort_init_exit_functions (headp))) 10 return (error); 11 12 i = *headp; 13 while (i) 14 { 15 if (call_once && !hash_get (vm->init_functions_called, i->f)) 16 { 17 if (call_once) 18 hash_set1 (vm->init_functions_called, i->f); 19 error = i->f (vm); 20 if (error) 21 return error; 22 } 23 i = i->next_init_function; 24 } 25 return error; 26 }
dpdk_device_input
1 static_always_inline u32
2 dpdk_device_input (vlib_main_t * vm, dpdk_main_t * dm, dpdk_device_t * xd, 3 vlib_node_runtime_t * node, u32 thread_index, u16 queue_id) 4 { 5 uword n_rx_packets = 0, n_rx_bytes; 6 u32 n_left, n_trace; 7 u32 *buffers; 8 u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT; 9 struct rte_mbuf **mb; 10 vlib_buffer_t *b0; 11 u16 *next; 12 u16 or_flags; 13 u32 n; 14 int single_next = 0; 15 16 dpdk_per_thread_data_t *ptd = vec_elt_at_index (dm->per_thread_data, 17 thread_index); 18 vlib_buffer_t *bt = &ptd->buffer_template; 19 20 if ((xd->flags & DPDK_DEVICE_FLAG_ADMIN_UP) == 0) 21 return 0; 22 23 /* get up to DPDK_RX_BURST_SZ buffers from PMD */ 24 while (n_rx_packets < DPDK_RX_BURST_SZ) 25 { 26 n = rte_eth_rx_burst (xd->port_id, queue_id, 27 ptd->mbufs + n_rx_packets, 28 DPDK_RX_BURST_SZ - n_rx_packets); 29 n_rx_packets += n; 30 31 if (n < 32) 32 break; 33 } 34 35 if (n_rx_packets == 0) 36 return 0; 37 38 /* Update buffer template */ 39 vnet_buffer (bt)->sw_if_index[VLIB_RX] = xd->sw_if_index; 40 bt->error = node->errors[DPDK_ERROR_NONE]; 41 /* as DPDK is allocating empty buffers from mempool provided before interface 42 start for each queue, it is safe to store this in the template */ 43 bt->buffer_pool_index = xd->buffer_pool_for_queue[queue_id]; 44 bt->ref_count = 1; 45 vnet_buffer (bt)->feature_arc_index = 0; 46 bt->current_config_index = 0; 47 48 /* receive burst of packets from DPDK PMD */ 49 if (PREDICT_FALSE (xd->per_interface_next_index != ~0)) 50 next_index = xd->per_interface_next_index; 51 52 /* as all packets belong to the same interface feature arc lookup 53 can be don once and result stored in the buffer template */ 54 if (PREDICT_FALSE (vnet_device_input_have_features (xd->sw_if_index))) 55 vnet_feature_start_device_input_x1 (xd->sw_if_index, &next_index, bt); 56 57 if (xd->flags & DPDK_DEVICE_FLAG_MAYBE_MULTISEG) 58 n_rx_bytes = dpdk_process_rx_burst (vm, ptd, n_rx_packets, 1, &or_flags); 59 else 60 n_rx_bytes = dpdk_process_rx_burst (vm, ptd, n_rx_packets, 0, &or_flags); 61 62 if (PREDICT_FALSE (or_flags & PKT_RX_FDIR)) 63 { 64 /* some packets will need to go to different next nodes */ 65 for (n = 0; n < n_rx_packets; n++) 66 ptd->next[n] = next_index; 67 68 /* flow offload - process if rx flow offload enabled and at least one 69 packet is marked */ 70 if (PREDICT_FALSE ((xd->flags & DPDK_DEVICE_FLAG_RX_FLOW_OFFLOAD) && 71 (or_flags & PKT_RX_FDIR))) 72 dpdk_process_flow_offload (xd, ptd, n_rx_packets); 73 74 /* enqueue buffers to the next node */ 75 vlib_get_buffer_indices_with_offset (vm, (void **) ptd->mbufs, 76 ptd->buffers, n_rx_packets, 77 sizeof (struct rte_mbuf)); 78 79 vlib_buffer_enqueue_to_next (vm, node, ptd->buffers, ptd->next, 80 n_rx_packets); 81 } 82 else 83 { 84 u32 *to_next, n_left_to_next; 85 86 vlib_get_new_next_frame (vm, node, next_index, to_next, n_left_to_next); 87 vlib_get_buffer_indices_with_offset (vm, (void **) ptd->mbufs, to_next, 88 n_rx_packets, 89 sizeof (struct rte_mbuf)); 90 91 if (PREDICT_TRUE (next_index == VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT)) 92 { 93 vlib_next_frame_t *nf; 94 vlib_frame_t *f; 95 ethernet_input_frame_t *ef; 96 nf = vlib_node_runtime_get_next_frame (vm, node, next_index); 97 f = vlib_get_frame (vm, nf->frame); 98 f->flags = ETH_INPUT_FRAME_F_SINGLE_SW_IF_IDX; 99 100 ef = vlib_frame_scalar_args (f); 101 ef->sw_if_index = xd->sw_if_index; 102 ef->hw_if_index = xd->hw_if_index; 103 104 /* if PMD supports ip4 checksum check and there are no packets 105 marked as ip4 checksum bad we can notify ethernet input so it 106 can send pacets to ip4-input-no-checksum node */ 107 if (xd->flags & DPDK_DEVICE_FLAG_RX_IP4_CKSUM && 108 (or_flags & PKT_RX_IP_CKSUM_BAD) == 0) 109 f->flags |= ETH_INPUT_FRAME_F_IP4_CKSUM_OK; 110 vlib_frame_no_append (f); 111 } 112 n_left_to_next -= n_rx_packets; 113 vlib_put_next_frame (vm, node, next_index, n_left_to_next); 114 single_next = 1; 115 } 116 117 /* packet trace if enabled */ 118 if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)))) 119 { 120 if (single_next) 121 vlib_get_buffer_indices_with_offset (vm, (void **) ptd->mbufs, 122 ptd->buffers, n_rx_packets, 123 sizeof (struct rte_mbuf)); 124 125 n_left = n_rx_packets; 126 buffers = ptd->buffers; 127 mb = ptd->mbufs; 128 next = ptd->next; 129 130 while (n_trace && n_left) 131 { 132 b0 = vlib_get_buffer (vm, buffers[0]); 133 if (single_next == 0) 134 next_index = next[0]; 135 vlib_trace_buffer (vm, node, next_index, b0, /* follow_chain */ 0); 136 137 dpdk_rx_trace_t *t0 = vlib_add_trace (vm, node, b0, sizeof t0[0]); 138 t0->queue_index = queue_id; 139 t0->device_index = xd->device_index; 140 t0->buffer_index = vlib_get_buffer_index (vm, b0); 141 142 clib_memcpy_fast (&t0->mb, mb[0], sizeof t0->mb); 143 clib_memcpy_fast (&t0->buffer, b0, 144 sizeof b0[0] - sizeof b0->pre_data); 145 clib_memcpy_fast (t0->buffer.pre_data, b0->data, 146 sizeof t0->buffer.pre_data); 147 clib_memcpy_fast (&t0->data, mb[0]->buf_addr + mb[0]->data_off, 148 sizeof t0->data); 149 n_trace--; 150 n_left--; 151 buffers++; 152 mb++; 153 next++; 154 } 155 vlib_set_trace_count (vm, node, n_trace); 156 } 157 158 vlib_increment_combined_counter 159 (vnet_get_main ()->interface_main.combined_sw_if_counters 160 + VNET_INTERFACE_COUNTER_RX, thread_index, xd->sw_if_index, 161 n_rx_packets, n_rx_bytes); 162 163 vnet_device_increment_rx_packets (thread_index, n_rx_packets); 164 165 return n_rx_packets; 166 }