VPP 報文處理學習


節點類型作用:

VLIB_NODE_TYPE_INTERNAL
  內部節點,最典型的節點接收緩沖向量,執行操作。vpp大部分節點是這個角色,主要對數據流做內部處理,比如ip4-input-no-checksum/ip4-icmp-input等內部功能節點

VLIB_NODE_TYPE_INPUT
  輸入節點,通常是設備輸入節點。從零開始創建框架並分派到內部節點(internal), 比如dpdk-input/af-packet-input節點,
  input節點收包模式分為輪詢和中斷兩種模式vlib_node_state_t.

VLIB_NODE_TYPE_PRE_INPUT
  目前只有一個epoll node,對socket相關邏輯提供服務,主要使用在控制業務上。

VLIB_NODE_TYPE_PROCESS

       該類型的node可以被掛起也可以被恢復,有獨立的分配在heap上的運行棧。類似與在一個線程中實現了多任務的調度機制,主要用來修改vpp node內部參數。
  線程節點,和線程一樣,可以可以暫停、等待事件、恢復,不同於pthread_thread,他是基於setjump/longjump實現的弦程.
  等待一個事件:always_inline f64 vlib_process_wait_for_event_or_clock (vlib_main_t * vm, f64 dt)
  發送一個事件: always_inline void vlib_process_signal_event (vlib_main_t * vm, uword node_index, uword type_opaque, uword data)

數據流節點中流程:

  1. 輸入節點輪詢(或中斷驅動)接口的接收隊列,獲取批量報文。
  2. 接着把這些報文按照下個節點功能組成一個矢量(vector)或者一幀(frame)。
  3. 比如:輸入節點收集所有IPv4的報文並把它們傳遞給ip4-input節點;
  4. 輸入節點收集所有IPv6的報文並把它們傳遞給ip6-input節點。
  5. 當ip6-input節點被調度時,它取出這一幀報文,利用雙循環(dual-loop) 或四循環(quad-loop)以及預取報文到CPU緩存技術處理報文,以達到最優性能。這能夠通過減少緩存未命中數來有效利用CPU緩存。當ip6-input節點處理完當前幀的所有報文后,把報文傳遞到后續不同的節點。比如:如果某報文校驗失敗,就被傳送到error-drop節點;正常報文被傳送到ip6-lookup節點。
  6. 一幀報文依次通過不同的圖形節點,直到它們被interface-output節點發送出去。

 

VPP圖形節點的處理邏輯

第一步:

 

 

 

第二步:

 

第三步:

 

第四步:

 

按照網絡功能一次處理一幀報文優勢:

  • 從軟件工程的角度看,每一個圖形節點是獨立和自治的。
  • 從性能的角度看,主要的好處是可以優化CPU指令緩存(i-cache)的使用。當前幀的第一個報文加載當前節點的指令到指令緩存,當前幀的后續報文就可以“免費”使用指令緩存。這里,VPP充分利用了CPU的超標量結構,使報文內存加載和報文處理交織進行,達到更有效地利用CPU處理流水線。
  • VPP也充分利用了CPU的預測執行功能來達到更好的性能。從預測重用報文間的轉發對象(比如鄰接表和路由查找表),以及預先加載報文內容到CPU的本地數據緩存(d-cache)供下一次循環使用,這些有效使用計算硬件的技術,使得VPP可以利用更細粒度的並行性

 

 VLIB_INIT_FUNCTION宏定義分析

舉個例子:

 1 static clib_error_t *
 2 dpdk_init (vlib_main_t * vm)
 3 { 4 dpdk_main_t *dm = &dpdk_main; 5 clib_error_t *error = 0; 6 7 /* verify that structs are cacheline aligned */ 8 STATIC_ASSERT (offsetof (dpdk_device_t, cacheline0) == 0, 9 "Cache line marker must be 1st element in dpdk_device_t"); 10 STATIC_ASSERT (offsetof (dpdk_device_t, cacheline1) == 11  CLIB_CACHE_LINE_BYTES, 12 "Data in cache line 0 is bigger than cache line size"); 13 STATIC_ASSERT (offsetof (frame_queue_trace_t, cacheline0) == 0, 14 "Cache line marker must be 1st element in frame_queue_trace_t"); 15 STATIC_ASSERT (RTE_CACHE_LINE_SIZE == 1 << CLIB_LOG2_CACHE_LINE_BYTES, 16 "DPDK RTE CACHE LINE SIZE does not match with 1<<CLIB_LOG2_CACHE_LINE_BYTES"); 17 18  dpdk_cli_reference (); 19 20 dm->vlib_main = vm; 21 dm->vnet_main = vnet_get_main (); 22 dm->conf = &dpdk_config_main; 23 24 dm->conf->nchannels = 4; 25 vec_add1 (dm->conf->eal_init_args, (u8 *) "vnet"); 26 vec_add1 (dm->conf->eal_init_args, (u8 *) "--in-memory"); 27 28 /* Default vlib_buffer_t flags, DISABLES tcp/udp checksumming... */ 29 dm->buffer_flags_template = (VLIB_BUFFER_TOTAL_LENGTH_VALID | 30 VLIB_BUFFER_EXT_HDR_VALID | 31 VNET_BUFFER_F_L4_CHECKSUM_COMPUTED | 32  VNET_BUFFER_F_L4_CHECKSUM_CORRECT); 33 34 dm->stat_poll_interval = DPDK_STATS_POLL_INTERVAL; 35 dm->link_state_poll_interval = DPDK_LINK_POLL_INTERVAL; 36 37 dm->log_default = vlib_log_register_class ("dpdk", 0); 38 39 return error; 40 }

VLIB_INIT_FUNCTION (dpdk_init);

VLIB_INIT_FUNCTION 宏定義展開如下所示,主要由VLIB_DECLARE_INIT_FUNCTION宏定義完成注冊動作:

 1 #ifndef CLIB_MARCH_VARIANT
 2 #define VLIB_DECLARE_INIT_FUNCTION(x, tag)                              \
 3 vlib_init_function_t * _VLIB_INIT_FUNCTION_SYMBOL (x, tag) = x; \ 4 static void __vlib_add_##tag##_function_##x (void) \ 5  __attribute__((__constructor__)) ; \ 6 static _vlib_init_function_list_elt_t _vlib_init_function_##tag_##x; \ 7 static void __vlib_add_##tag##_function_##x (void) \ 8 { \ 9 vlib_main_t * vm = vlib_get_main(); \ 10  _vlib_init_function_##tag_##x.next_init_function \ 11 = vm->tag##_function_registrations; \ 12 vm->tag##_function_registrations = &_vlib_init_function_##tag_##x; \ 13 _vlib_init_function_##tag_##x.f = &x; \ 14 _vlib_init_function_##tag_##x.name = #x; \ 15 } \ 16 static void __vlib_rm_##tag##_function_##x (void) \ 17  __attribute__((__destructor__)) ; \ 18 static void __vlib_rm_##tag##_function_##x (void) \ 19 { \ 20 vlib_main_t * vm = vlib_get_main(); \ 21 _vlib_init_function_list_elt_t *this, *prev; \ 22 this = vm->tag##_function_registrations; \ 23 if (this == 0) \ 24 return; \ 25 if (this->f == &x) \ 26  { \ 27 vm->tag##_function_registrations = this->next_init_function; \ 28 return; \ 29  } \ 30 prev = this; \ 31 this = this->next_init_function; \ 32 while (this) \ 33  { \ 34 if (this->f == &x) \ 35  { \ 36 prev->next_init_function = \ 37 this->next_init_function; \ 38 return; \ 39  } \ 40 prev = this; \ 41 this = this->next_init_function; \ 42  } \ 43 } \ 44 static _vlib_init_function_list_elt_t _vlib_init_function_##tag_##x 45 #else 46 /* create unused pointer to silence compiler warnings and get whole 47 function optimized out */ 48 #define VLIB_DECLARE_INIT_FUNCTION(x, tag) \ 49 static __clib_unused void * __clib_unused_##tag##_##x = x 50 #endif 51 52 #define VLIB_INIT_FUNCTION(x) VLIB_DECLARE_INIT_FUNCTION(x,init)

VLIB_INIT_FUNCTION宏定義中的x表示注冊的函數,傳遞給VLIB_DECLARE_INIT_FUNCTION宏定義並添加第二個參數init,

VLIB_DECLARE_INIT_FUNCTION宏定義主要由兩個自動構造的函數組成,一個是注冊函數、一個是卸載函數

首先構造初始化函數,_VLIB_INIT_FUNCTION_SYMBOL宏定義構造函數頭:申明類型為:vlib_init_function_t *的變量

初始化注冊流程

當vpp啟動時根據以下路徑對init_function_registrations鏈表中的注冊函數進行調用完成初始化工作:

main-> vlib_unix_main  -> clib_calljmp (thread0, (uword) vm,(void *) ->  vlib_main -> vlib_call_all_init_functions (vm) ->  vlib_call_init_exit_functions -> call_init_exit_functions_internal 
 1 static inline clib_error_t *
 2 call_init_exit_functions_internal (vlib_main_t * vm,
 3                    _vlib_init_function_list_elt_t ** headp, 4 int call_once, int do_sort) 5 { 6 clib_error_t *error = 0; 7 _vlib_init_function_list_elt_t *i; 8 9 if (do_sort && (error = vlib_sort_init_exit_functions (headp))) 10 return (error); 11 12 i = *headp; 13 while (i) 14  { 15 if (call_once && !hash_get (vm->init_functions_called, i->f)) 16  { 17 if (call_once) 18 hash_set1 (vm->init_functions_called, i->f); 19 error = i->f (vm); 20 if (error) 21 return error; 22  } 23 i = i->next_init_function; 24  } 25 return error; 26 }

 

dpdk_device_input

  1 static_always_inline u32
  2 dpdk_device_input (vlib_main_t * vm, dpdk_main_t * dm, dpdk_device_t * xd, 3 vlib_node_runtime_t * node, u32 thread_index, u16 queue_id) 4 { 5 uword n_rx_packets = 0, n_rx_bytes; 6  u32 n_left, n_trace; 7 u32 *buffers; 8 u32 next_index = VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT; 9 struct rte_mbuf **mb; 10 vlib_buffer_t *b0; 11 u16 *next; 12  u16 or_flags; 13  u32 n; 14 int single_next = 0; 15 16 dpdk_per_thread_data_t *ptd = vec_elt_at_index (dm->per_thread_data, 17  thread_index); 18 vlib_buffer_t *bt = &ptd->buffer_template; 19 20 if ((xd->flags & DPDK_DEVICE_FLAG_ADMIN_UP) == 0) 21 return 0; 22 23 /* get up to DPDK_RX_BURST_SZ buffers from PMD */ 24 while (n_rx_packets < DPDK_RX_BURST_SZ) 25  { 26 n = rte_eth_rx_burst (xd->port_id, queue_id, 27 ptd->mbufs + n_rx_packets, 28 DPDK_RX_BURST_SZ - n_rx_packets); 29 n_rx_packets += n; 30 31 if (n < 32) 32 break; 33  } 34 35 if (n_rx_packets == 0) 36 return 0; 37 38 /* Update buffer template */ 39 vnet_buffer (bt)->sw_if_index[VLIB_RX] = xd->sw_if_index; 40 bt->error = node->errors[DPDK_ERROR_NONE]; 41 /* as DPDK is allocating empty buffers from mempool provided before interface 42 start for each queue, it is safe to store this in the template */ 43 bt->buffer_pool_index = xd->buffer_pool_for_queue[queue_id]; 44 bt->ref_count = 1; 45 vnet_buffer (bt)->feature_arc_index = 0; 46 bt->current_config_index = 0; 47 48 /* receive burst of packets from DPDK PMD */ 49 if (PREDICT_FALSE (xd->per_interface_next_index != ~0)) 50 next_index = xd->per_interface_next_index; 51 52 /* as all packets belong to the same interface feature arc lookup 53 can be don once and result stored in the buffer template */ 54 if (PREDICT_FALSE (vnet_device_input_have_features (xd->sw_if_index))) 55 vnet_feature_start_device_input_x1 (xd->sw_if_index, &next_index, bt); 56 57 if (xd->flags & DPDK_DEVICE_FLAG_MAYBE_MULTISEG) 58 n_rx_bytes = dpdk_process_rx_burst (vm, ptd, n_rx_packets, 1, &or_flags); 59 else 60 n_rx_bytes = dpdk_process_rx_burst (vm, ptd, n_rx_packets, 0, &or_flags); 61 62 if (PREDICT_FALSE (or_flags & PKT_RX_FDIR)) 63  { 64 /* some packets will need to go to different next nodes */ 65 for (n = 0; n < n_rx_packets; n++) 66 ptd->next[n] = next_index; 67 68 /* flow offload - process if rx flow offload enabled and at least one 69 packet is marked */ 70 if (PREDICT_FALSE ((xd->flags & DPDK_DEVICE_FLAG_RX_FLOW_OFFLOAD) && 71 (or_flags & PKT_RX_FDIR))) 72  dpdk_process_flow_offload (xd, ptd, n_rx_packets); 73 74 /* enqueue buffers to the next node */ 75 vlib_get_buffer_indices_with_offset (vm, (void **) ptd->mbufs, 76 ptd->buffers, n_rx_packets, 77 sizeof (struct rte_mbuf)); 78 79 vlib_buffer_enqueue_to_next (vm, node, ptd->buffers, ptd->next, 80  n_rx_packets); 81  } 82 else 83  { 84 u32 *to_next, n_left_to_next; 85 86  vlib_get_new_next_frame (vm, node, next_index, to_next, n_left_to_next); 87 vlib_get_buffer_indices_with_offset (vm, (void **) ptd->mbufs, to_next, 88  n_rx_packets, 89 sizeof (struct rte_mbuf)); 90 91 if (PREDICT_TRUE (next_index == VNET_DEVICE_INPUT_NEXT_ETHERNET_INPUT)) 92  { 93 vlib_next_frame_t *nf; 94 vlib_frame_t *f; 95 ethernet_input_frame_t *ef; 96 nf = vlib_node_runtime_get_next_frame (vm, node, next_index); 97 f = vlib_get_frame (vm, nf->frame); 98 f->flags = ETH_INPUT_FRAME_F_SINGLE_SW_IF_IDX; 99 100 ef = vlib_frame_scalar_args (f); 101 ef->sw_if_index = xd->sw_if_index; 102 ef->hw_if_index = xd->hw_if_index; 103 104 /* if PMD supports ip4 checksum check and there are no packets 105 marked as ip4 checksum bad we can notify ethernet input so it 106 can send pacets to ip4-input-no-checksum node */ 107 if (xd->flags & DPDK_DEVICE_FLAG_RX_IP4_CKSUM && 108 (or_flags & PKT_RX_IP_CKSUM_BAD) == 0) 109 f->flags |= ETH_INPUT_FRAME_F_IP4_CKSUM_OK; 110  vlib_frame_no_append (f); 111  } 112 n_left_to_next -= n_rx_packets; 113  vlib_put_next_frame (vm, node, next_index, n_left_to_next); 114 single_next = 1; 115  } 116 117 /* packet trace if enabled */ 118 if (PREDICT_FALSE ((n_trace = vlib_get_trace_count (vm, node)))) 119  { 120 if (single_next) 121 vlib_get_buffer_indices_with_offset (vm, (void **) ptd->mbufs, 122 ptd->buffers, n_rx_packets, 123 sizeof (struct rte_mbuf)); 124 125 n_left = n_rx_packets; 126 buffers = ptd->buffers; 127 mb = ptd->mbufs; 128 next = ptd->next; 129 130 while (n_trace && n_left) 131  { 132 b0 = vlib_get_buffer (vm, buffers[0]); 133 if (single_next == 0) 134 next_index = next[0]; 135 vlib_trace_buffer (vm, node, next_index, b0, /* follow_chain */ 0); 136 137 dpdk_rx_trace_t *t0 = vlib_add_trace (vm, node, b0, sizeof t0[0]); 138 t0->queue_index = queue_id; 139 t0->device_index = xd->device_index; 140 t0->buffer_index = vlib_get_buffer_index (vm, b0); 141 142 clib_memcpy_fast (&t0->mb, mb[0], sizeof t0->mb); 143 clib_memcpy_fast (&t0->buffer, b0, 144 sizeof b0[0] - sizeof b0->pre_data); 145 clib_memcpy_fast (t0->buffer.pre_data, b0->data, 146 sizeof t0->buffer.pre_data); 147 clib_memcpy_fast (&t0->data, mb[0]->buf_addr + mb[0]->data_off, 148 sizeof t0->data); 149 n_trace--; 150 n_left--; 151 buffers++; 152 mb++; 153 next++; 154  } 155  vlib_set_trace_count (vm, node, n_trace); 156  } 157 158  vlib_increment_combined_counter 159 (vnet_get_main ()->interface_main.combined_sw_if_counters 160 + VNET_INTERFACE_COUNTER_RX, thread_index, xd->sw_if_index, 161  n_rx_packets, n_rx_bytes); 162 163  vnet_device_increment_rx_packets (thread_index, n_rx_packets); 164 165 return n_rx_packets; 166 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM