1. 主函數分析
1 /* 命令行解析 2 * 參數輸入 ./l2fwd -c 0x3 -n 4 -- -p 3 -q 1 3 * -c 為十六進制的分配的邏輯內核數量 4 * -n 為十進制的內存通道數量,EAL參數和程序參數用--分開 5 * -q 為分配給每個核心的收發隊列數量(端口數量) 6 * -p為十六進制的分配的端口數 7 * -t 為可選默認10s打印時間間隔參數 8 */ 9 int main(int argc, char **argv) 10 { 11 struct lcore_queue_conf *qconf; 12 int ret; 13 uint16_t nb_ports; 14 uint16_t nb_ports_available = 0; 15 uint16_t portid, last_port; 16 unsigned lcore_id, rx_lcore_id; 17 unsigned nb_ports_in_mask = 0; 18 unsigned int nb_lcores = 0; 19 unsigned int nb_mbufs; 20 21 /* init EAL */ 22 /* 初始化EAL參數,並解析參數,系統函數getopt以及getopt_long, 23 * 這些處理命令行參數的函數,處理到“--”時就會停止,分割參 24 */ 25 ret = rte_eal_init(argc, argv); 26 if (ret < 0) 27 rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n"); 28 //argc減去EAL參數的同時,argv加上EAL的參數,保證解析程序參數的時候已經跳過了EAL參數 29 argc -= ret; 30 argv += ret; 31 32 force_quit = false; 33 signal(SIGINT, signal_handler); 34 signal(SIGTERM, signal_handler); 35 36 /* parse application arguments (after the EAL ones) */ 37 //解析l2fwd程序參數 38 ret = l2fwd_parse_args(argc, argv); 39 if (ret < 0) 40 rte_exit(EXIT_FAILURE, "Invalid L2FWD arguments\n"); 41 42 printf("MAC updating %s\n", mac_updating ? "enabled" : "disabled"); 43 44 /* convert to number of cycles */ 45 //-t參數,打印時間間隔 46 timer_period *= rte_get_timer_hz(); 47 48 nb_ports = rte_eth_dev_count_avail(); 49 if (nb_ports == 0) 50 rte_exit(EXIT_FAILURE, "No Ethernet ports - bye\n"); 51 52 /* check port mask to possible port mask */ 53 /* 54 * DPDK運行時創建的大頁內存中,創建報文內存池, 55 * 其中socket不是套接字,是numa框架中的socket, 56 * 每個socket都有數個node,每個node右包括數個core。 57 * 每個socket都有自己的內存,每個socket里的處理器訪問自己內存的速度最快, 58 * 訪問其他socket的內存則較慢。 59 */ 60 if (l2fwd_enabled_port_mask & ~((1 << nb_ports) - 1)) 61 rte_exit(EXIT_FAILURE, "Invalid portmask; possible (0x%x)\n", 62 (1 << nb_ports) - 1); 63 64 /* reset l2fwd_dst_ports */ 65 //設置二層轉發目的端口 66 for (portid = 0; portid < RTE_MAX_ETHPORTS; portid++) 67 l2fwd_dst_ports[portid] = 0; 68 //初始化所有的目的端口為0 69 last_port = 0; 70 71 /* 72 * Each logical core is assigned a dedicated TX queue on each port. 73 */ 74 RTE_ETH_FOREACH_DEV(portid) { 75 /* skip ports that are not enabled */ 76 /* l2fwd_enabled_port_mask 可用端口位掩碼 77 * 跳過未分配或是不可用端口。 78 * 可用端口位掩碼表示,左數第n位如果為1,表示端口n可用,如果左數第n位如果為0,表示端口n不可用。 79 * 要得到第x位為1還是0,我們的方法是將1左移x位,得到一個只在x位為1,其他位都為0的數,再與位掩碼相與。 80 * 結果為1,那么第x位為1,結果位0,那么第x位為0. 81 */ 82 if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) 83 continue; 84 //此處,當輸入端口數,即nb_ports為1時,dst_port[0] = 0; 85 //此處,當輸入端口數,即nb_ports為2時,dst_port[0] = 0,dst_port[2] = 1,dst_port[1] = 2; 86 //此處,當輸入端口數,即nb_ports為3時,dst_port[0] = 0,dst_port[2] = 1,dst_port[1] = 2; 87 //此處,當輸入端口數,即nb_ports為4時,....dst_port[4] = 3,dst_port[3] = 4; 88 89 if (nb_ports_in_mask % 2) { 90 l2fwd_dst_ports[portid] = last_port; 91 l2fwd_dst_ports[last_port] = portid; 92 } 93 else 94 last_port = portid; 95 96 nb_ports_in_mask++; 97 } 98 if (nb_ports_in_mask % 2) { 99 printf("Notice: odd number of ports in portmask.\n"); 100 l2fwd_dst_ports[last_port] = last_port; 101 } 102 103 rx_lcore_id = 0; 104 qconf = NULL; 105 106 /* Initialize the port/queue configuration of each logical core */ 107 RTE_ETH_FOREACH_DEV(portid) { 108 /* skip ports that are not enabled */ 109 if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) 110 continue; 111 112 /* get the lcore_id for this port */ 113 //l2fwd_rx_queue_per_lcore即參數-q 114 while (rte_lcore_is_enabled(rx_lcore_id) == 0 || 115 lcore_queue_conf[rx_lcore_id].n_rx_port == 116 l2fwd_rx_queue_per_lcore) { 117 rx_lcore_id++; 118 if (rx_lcore_id >= RTE_MAX_LCORE) 119 rte_exit(EXIT_FAILURE, "Not enough cores\n"); 120 } 121 122 if (qconf != &lcore_queue_conf[rx_lcore_id]) { 123 /* Assigned a new logical core in the loop above. */ 124 qconf = &lcore_queue_conf[rx_lcore_id]; 125 nb_lcores++; 126 } 127 128 qconf->rx_port_list[qconf->n_rx_port] = portid; 129 qconf->n_rx_port++; 130 printf("Lcore %u: RX port %u\n", rx_lcore_id, portid); 131 } 132 133 nb_mbufs = RTE_MAX(nb_ports * (nb_rxd + nb_txd + MAX_PKT_BURST + 134 nb_lcores * MEMPOOL_CACHE_SIZE), 8192U); 135 136 /* create the mbuf pool */ 137 l2fwd_pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", nb_mbufs, 138 MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, 139 rte_socket_id()); 140 if (l2fwd_pktmbuf_pool == NULL) 141 rte_exit(EXIT_FAILURE, "Cannot init mbuf pool\n"); 142 143 /* Initialise each port */ 144 RTE_ETH_FOREACH_DEV(portid) { 145 struct rte_eth_rxconf rxq_conf; 146 struct rte_eth_txconf txq_conf; 147 struct rte_eth_conf local_port_conf = port_conf; 148 struct rte_eth_dev_info dev_info; 149 150 /* skip ports that are not enabled */ 151 if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) { 152 printf("Skipping disabled port %u\n", portid); 153 continue; 154 } 155 nb_ports_available++; 156 157 /* init port */ 158 printf("Initializing port %u... ", portid); 159 //清除讀寫緩沖區 160 fflush(stdout); 161 162 //配置端口,將一些配置寫進設備dev的一些字段,以及檢查設備支持什么類型的中斷、支持的包大小 163 ret = rte_eth_dev_info_get(portid, &dev_info); 164 if (ret != 0) 165 rte_exit(EXIT_FAILURE, 166 "Error during getting device (port %u) info: %s\n", 167 portid, strerror(-ret)); 168 169 if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE) 170 local_port_conf.txmode.offloads |= 171 DEV_TX_OFFLOAD_MBUF_FAST_FREE; 172 ret = rte_eth_dev_configure(portid, 1, 1, &local_port_conf); 173 if (ret < 0) 174 rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u\n", 175 ret, portid); 176 177 ret = rte_eth_dev_adjust_nb_rx_tx_desc(portid, &nb_rxd, 178 &nb_txd); 179 if (ret < 0) 180 rte_exit(EXIT_FAILURE, 181 "Cannot adjust number of descriptors: err=%d, port=%u\n", 182 ret, portid); 183 184 //獲取設備的MAC地址,存入l2fwd_ports_eth_addr[]數組,后續打印MAC地址 185 ret = rte_eth_macaddr_get(portid, 186 &l2fwd_ports_eth_addr[portid]); 187 if (ret < 0) 188 rte_exit(EXIT_FAILURE, 189 "Cannot get MAC address: err=%d, port=%u\n", 190 ret, portid); 191 192 /* init one RX queue */ 193 //清除讀寫緩沖區 194 fflush(stdout); 195 rxq_conf = dev_info.default_rxconf; 196 rxq_conf.offloads = local_port_conf.rxmode.offloads; 197 //設置接收隊列,nb_rxd指收取隊列的大小,最大能夠存儲mbuf的數量 198 ret = rte_eth_rx_queue_setup(portid, 0, nb_rxd, 199 rte_eth_dev_socket_id(portid), 200 &rxq_conf, 201 l2fwd_pktmbuf_pool); 202 if (ret < 0) 203 rte_exit(EXIT_FAILURE, "rte_eth_rx_queue_setup:err=%d, port=%u\n", 204 ret, portid); 205 206 /* init one TX queue on each port */ 207 fflush(stdout); 208 txq_conf = dev_info.default_txconf; 209 txq_conf.offloads = local_port_conf.txmode.offloads; 210 //初始化一個發送隊列,nb_txd指發送隊列的大小,最大能夠存儲mbuf的數量 211 ret = rte_eth_tx_queue_setup(portid, 0, nb_txd, 212 rte_eth_dev_socket_id(portid), 213 &txq_conf); 214 if (ret < 0) 215 rte_exit(EXIT_FAILURE, "rte_eth_tx_queue_setup:err=%d, port=%u\n", 216 ret, portid); 217 218 /* Initialize TX buffers */ 219 //為每個端口分配接收緩沖區,根據numa架構的socket就近分配 220 tx_buffer[portid] = rte_zmalloc_socket("tx_buffer", 221 RTE_ETH_TX_BUFFER_SIZE(MAX_PKT_BURST), 0, 222 rte_eth_dev_socket_id(portid)); 223 if (tx_buffer[portid] == NULL) 224 rte_exit(EXIT_FAILURE, "Cannot allocate buffer for tx on port %u\n", 225 portid); 226 227 rte_eth_tx_buffer_init(tx_buffer[portid], MAX_PKT_BURST); 228 229 ret = rte_eth_tx_buffer_set_err_callback(tx_buffer[portid], 230 rte_eth_tx_buffer_count_callback, 231 &port_statistics[portid].dropped); 232 if (ret < 0) 233 rte_exit(EXIT_FAILURE, 234 "Cannot set error callback for tx buffer on port %u\n", 235 portid); 236 237 ret = rte_eth_dev_set_ptypes(portid, RTE_PTYPE_UNKNOWN, NULL, 238 0); 239 if (ret < 0) 240 printf("Port %u, Failed to disable Ptype parsing\n", 241 portid); 242 /* Start device */ 243 //啟動端口 244 ret = rte_eth_dev_start(portid); 245 if (ret < 0) 246 rte_exit(EXIT_FAILURE, "rte_eth_dev_start:err=%d, port=%u\n", 247 ret, portid); 248 249 printf("done: \n"); 250 251 ret = rte_eth_promiscuous_enable(portid); 252 if (ret != 0) 253 rte_exit(EXIT_FAILURE, 254 "rte_eth_promiscuous_enable:err=%s, port=%u\n", 255 rte_strerror(-ret), portid); 256 257 printf("Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\n\n", 258 portid, 259 l2fwd_ports_eth_addr[portid].addr_bytes[0], 260 l2fwd_ports_eth_addr[portid].addr_bytes[1], 261 l2fwd_ports_eth_addr[portid].addr_bytes[2], 262 l2fwd_ports_eth_addr[portid].addr_bytes[3], 263 l2fwd_ports_eth_addr[portid].addr_bytes[4], 264 l2fwd_ports_eth_addr[portid].addr_bytes[5]); 265 266 /* initialize port stats */ 267 //初始化端口數據,就是后面要打印的,接收、發送、drop的包數 268 memset(&port_statistics, 0, sizeof(port_statistics)); 269 } 270 271 if (!nb_ports_available) { 272 rte_exit(EXIT_FAILURE, 273 "All available ports are disabled. Please set portmask.\n"); 274 } 275 276 277 //檢查每個端口的連接狀態 278 check_all_ports_link_status(l2fwd_enabled_port_mask); 279 280 ret = 0; 281 /* launch per-lcore init on every lcore */ 282 //在每個邏輯內核上啟動線程,開始轉發,l2fwd_launch_one_lcore實際上運行的是l2fwd_main_loop 283 rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER); 284 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 285 if (rte_eal_wait_lcore(lcore_id) < 0) { 286 ret = -1; 287 break; 288 } 289 } 290 291 RTE_ETH_FOREACH_DEV(portid) { 292 if ((l2fwd_enabled_port_mask & (1 << portid)) == 0) 293 continue; 294 printf("Closing port %d...", portid); 295 rte_eth_dev_stop(portid); 296 rte_eth_dev_close(portid); 297 printf(" Done\n"); 298 } 299 printf("Bye...\n"); 300 301 return ret; 302 }
程序的主要流程如下:

二. 二層轉發和普通的端口轉發區別:
特點 | L2fwd | basicfwd |
---|---|---|
端口數量 | 兩者都用端口掩碼來指定,L2fwd支持奇數個 | 只能是偶數個 |
lcore數量 | 多個,每個lcore負責一個port | 一個lcore,執行類似repeater的程序 |
轉發邏輯 | 轉發時會改寫MAC地址 | 只能是 0<-->1,2<-->3 這樣的 pair 互相轉發 |
Tx_buffer | 有發包緩存隊列,收的包會緩存到發包隊列里,一段時間后或者隊列滿后才會轉發 | 沒有發包緩存,Rx收到包后直接Tx出去 |
三. 任務分發
每個邏輯核在任務分發后會執行如下的循環,直到退出:
40 /* 41 * Check that every SLAVE lcores are in WAIT state, then call 42 * rte_eal_remote_launch() for all of them. If call_master is true 43 * (set to CALL_MASTER), also call the function on the master lcore. 44 */ 45 int 46 rte_eal_mp_remote_launch(int (*f)(void *), void *arg, 47 enum rte_rmt_call_master_t call_master) 48 { 49 int lcore_id; 50 int master = rte_get_master_lcore(); 51 52 /* check state of lcores */ 53 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 54 if (lcore_config[lcore_id].state != WAIT) 55 return -EBUSY; 56 } 57 58 /* send messages to cores */ 59 RTE_LCORE_FOREACH_SLAVE(lcore_id) { 60 rte_eal_remote_launch(f, arg, lcore_id); 61 } 62 63 if (call_master == CALL_MASTER) { 64 lcore_config[master].ret = f(arg); 65 lcore_config[master].state = FINISHED; 66 } 67 68 return 0; 69 }
rte_eal_mp_remote_launch(l2fwd_launch_one_lcore, NULL, CALL_MASTER)
283 static int 284 l2fwd_launch_one_lcore(__attribute__((unused)) void *dummy) 285 { 286 l2fwd_main_loop(); 287 return 0; 288 }
1 /* main processing loop */ 2 static void 3 l2fwd_main_loop(void) 4 { 5 struct rte_mbuf *pkts_burst[MAX_PKT_BURST]; 6 struct rte_mbuf *m; 7 int sent; 8 unsigned lcore_id; 9 uint64_t prev_tsc, diff_tsc, cur_tsc, timer_tsc; 10 unsigned i, j, portid, nb_rx; 11 struct lcore_queue_conf *qconf; 12 const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) / US_PER_S * 13 BURST_TX_DRAIN_US; 14 struct rte_eth_dev_tx_buffer *buffer; 15 16 prev_tsc = 0; 17 timer_tsc = 0; 18 19 //獲取自己的lcore_id 20 lcore_id = rte_lcore_id(); 21 qconf = &lcore_queue_conf[lcore_id]; 22 23 //分配后多余的lcore,無事可做,orz 24 if (qconf->n_rx_port == 0) { 25 RTE_LOG(INFO, L2FWD, "lcore %u has nothing to do\n", lcore_id); 26 return; 27 } 28 29 //有事做的核,很開心的進入了主循環~ 30 RTE_LOG(INFO, L2FWD, "entering main loop on lcore %u\n", lcore_id); 31 32 for (i = 0; i < qconf->n_rx_port; i++) { 33 34 portid = qconf->rx_port_list[i]; 35 RTE_LOG(INFO, L2FWD, " -- lcoreid=%u portid=%u\n", lcore_id, 36 portid); 37 38 } 39 40 //直到發生了強制退出,在這里就是ctrl+c或者kill了這個進程 41 while (!force_quit) { 42 43 cur_tsc = rte_rdtsc(); 44 45 /* 46 * TX burst queue drain 47 */ 48 //計算時間片 49 diff_tsc = cur_tsc - prev_tsc; 50 //過了100us,把發送buffer里的報文發出去 51 if (unlikely(diff_tsc > drain_tsc)) { 52 53 for (i = 0; i < qconf->n_rx_port; i++) { 54 55 portid = l2fwd_dst_ports[qconf->rx_port_list[i]]; 56 buffer = tx_buffer[portid]; 57 58 sent = rte_eth_tx_buffer_flush(portid, 0, buffer); 59 if (sent) 60 port_statistics[portid].tx += sent; 61 62 } 63 64 /* if timer is enabled */ 65 //到了時間片了打印各端口的數據 66 if (timer_period > 0) { 67 68 /* advance the timer */ 69 timer_tsc += diff_tsc; 70 71 /* if timer has reached its timeout */ 72 if (unlikely(timer_tsc >= timer_period)) { 73 74 /* do this only on master core */ 75 if (lcore_id == rte_get_master_lcore()) { 76 //打印讓master主線程來做 77 print_stats(); 78 /* reset the timer */ 79 timer_tsc = 0; 80 } 81 } 82 } 83 84 prev_tsc = cur_tsc; 85 } 86 87 /* 88 * Read packet from RX queues 89 */ 90 //沒有到發送時間片的話,讀接收隊列里的報文 91 for (i = 0; i < qconf->n_rx_port; i++) { 92 93 portid = qconf->rx_port_list[i]; 94 nb_rx = rte_eth_rx_burst(portid, 0, 95 pkts_burst, MAX_PKT_BURST); 96 97 //計數,收到的報文數 98 port_statistics[portid].rx += nb_rx; 99 100 for (j = 0; j < nb_rx; j++) { 101 m = pkts_burst[j]; 102 rte_prefetch0(rte_pktmbuf_mtod(m, void *)); 103 //updating mac地址以及目的端口發送buffer滿了的話,嘗試發送 104 l2fwd_simple_forward(m, portid); 105 } 106 } 107 } 108 }
流程圖:

四. 測試實驗
* 參數輸入 ./l2fwd -c 0x3 -n 4 -- -p 3 -q 1 * -c 為十六進制的分配的邏輯內核數量 * -n 為十進制的內存通道數量,EAL參數和程序參數用--分開 * -q 為分配給每個核心的收發隊列數量(端口數量) * -p為十六進制的分配的端口數 * -t 為可選默認10s打印時間間隔參數