參考文獻:dpdk中的librte_malloc庫
《深入淺出DPDK》
一. librte_malloc 庫
dpdk中的librte_malloc庫提供了能夠分配任意大小內存的API。
該庫的目標是提供類似malloc的函數從hugepage中分配內存,以及幫助應用程序移植。通常情況下,這種類型的分配不應該在數據平面處理,因為其比基於內存池的分配更慢,並且在分配和釋放時會使用鎖。
1.1 Cookies
如果在配置文件中打開CONFIG_RTE_MALLOC_DEBUG,
分配的內存會包含覆蓋保護區域,以識別緩沖區溢出問題。
1.2 對齊與NUMA Constraints
rte_malloc()函數包含一個align參數,用來要求內存區域對齊到該值的倍數(必須是2的倍數)。
在支持NUMA的系統中,調用rte_malloc()函數時,會在調用該函數的進程所在的socket上分配內存。
同時該庫也提供了一組API,使用戶可以直接在指定的NUMA socket上分配內存,
或者在另一個core所在的NUMA socket上分配內存。
1.3 用例
應用程序在初始化時使用類似malloc這樣的函數時,可以使用該庫。
要在運行時分配/釋放內存數據,如果應用程序對速度有要求,
請用內存池庫代替本庫。
如果要使用一塊需要知道物理地址的內存塊,如硬件設備使用的內存塊,
則應該使用memory zone。
1.4 數據結構
在malloc庫的內部使用兩種數據結構類型:
struct malloc_heap: 用來管理每個socket上的空閑空間
struct malloc_elem: 分配的基本元素,由庫內部管理的空閑空間。
1.4.1 struct malloc_heap
該結構體用來管理每個socket上的空閑空間。
在庫的內部,每個NUMA node上包含一個 heap結構體,
使我們可以根據線程運行所在的NUMA node,在對應的結點分配內存。
雖然不能保存一定會在指定的結點上分配內存,但比總在某個固定的的結點或隨機結點分配要好。
heap的關鍵成員變量和成員函數描述如下:
mz_count: 保存本結點已經為heap內存分配的memory zone的數量。該值的唯一用途就是與numa_socket值組合為每個memory zone生成一個唯一的名字。
lock: 該變量用來做對heap訪問的同步。考慮到heap中的空閑空間是由一個list管理的,所以我們需要一個鎖來防止兩個線程同時訪問該list。
free_head: 該變量該malloc heap的free nodes list中的第一個元素。
注意: malloc_heap結構體不會管理已經分配的memzones,這么做是毫無意義的,因為它們不會被釋放。
也不會管理使用中的內存塊,因為除非它們被釋放,否則是不會再次接觸到這些內存塊的。
在釋放時,指向這些內存塊的指針會作為free()函數的參數。
1.4.1.2 struct malloc_elem結構體
malloc_elem結構體被用作memzone中各種內存塊的頭部結構。
有三種不同的用法:
1、分配或釋放內存塊時的頭部 - 普通情況
2、在內存塊中作為padding頭部
3、作為memzone結尾處的標記
下文描述了結構中最重要的部分以及用法。
注意:如果某種用法不屬於上面描述的三種中的任何一種,則認為對應的變量是未定義的。
例如,只有當"state"和"pad"兩個變量的值是有效值是,才認為其是一個padding header。
head:該指針是已經分配的內存塊中指向heap結構的反向引用,即指向對應的heap。
普通內存塊在釋放時會使用該指針,將當前釋放的內存塊添加到heap的free list中
prev:該指針指向memzone中當前內存塊緊前面的內存塊的header element/block。
當釋放一個內存塊時,該指針用來引用前一個內存塊,看其是否也需要釋放。
如果需要,則兩塊內存組合成一塊更大的內存塊。
next_free:該指針用來將未分配的內存塊鏈接到一起。
同樣,該變量只在普通內存塊中使用,在malloc()函數中找到一塊符合需求的內存塊來分配,
並且在調用free()函數將新釋放的內存添加到free-list中。
state:該變量可以是以下三個值之一:“Free”, “Busy”或“Pad”。
前兩個用業表示普通內存塊的分配狀態,
第三個用來表示在start-of-block padding的結尾處的元素結構體是一個dummy結構體。
(例如,由於強制對齊,內存塊中數據的開始處不在內存塊中。???)
在這種情況下,pad header用來定位實際分配的元素header。
對於end-of-memzone結構體,該值總是“busy”,
以確保在釋放時沒有元素為了整合成一個更大的內存塊,而在memzone的結尾外面查找其它內存塊。
pad:該變量保存內存塊開始處的padding區域的長度。
如果是普通內存塊header,該值會被加到header的結尾處的地址,以給出數據區域的正確地址。
例如,在調用malloc函數時傳回的值。
在padding中的dummy header的內部,該值也會被保存,
and is subtracted from the address of the dummy header to yield the address of the actual block header.
size:表示數據內存塊的大小,包含header自身。對於end-of-memzone結構,該值為0,雖然從不會檢查該值。
對於被釋放的普通內存塊,該值用來代替“next”指針,用來計算下一個內存塊所在的地址。
(因此如果下一個內存塊也是free的,兩個內存塊可以整合成一個)。
1.4.2 內存分配
應用程序調用類似malloc的函數時,malloc函數首先會根據調用線程索引lcore_config結構,
以及根據該線程確定其所在的NUMA結點。
即用來索引malloc_head結構數組,之后以該數組為參數調用heap_alloc()函數,
同時作為參數的還有要分配的大小,類型和對齊。
heap_alloc()函數會掃描heap的free_list,並嘗試找到一個合適大小的內存塊來存儲數據,同時強制對齊。
如果沒有找到合適大小的內存塊,例如,第一次在某結點上調用malloc函數時free-list是空的,
則會創建一個新的memzone並配置為heap元素,其會將一個dummy結構放置到memzone的結尾處,
作為一個標記,防止訪問超出這塊內存之外(由於該標記被置為“BUSY”,malloc庫永遠無法將這塊內存分配出去)。
同時在memzone的開始處放置一個合適的element header。這個header標記了memzone中的所有空間,
bar the sentinel value at the end,end, as a single free heap element, and it is then added to the free_list for the heap.
新的memzone配置好之后,會重新對heap的free-list進行描述,這次描述會找到新添加的合適大小的元素,
將其作為memzone中保留內存的大小,至少是調用函數中指定的大小的數據內存塊加上對齊,
至少是Intel DPDK運行時配置中指定的最小大小。
找到一個合適大小的空閑元素之后,會計算返回到用戶的指針,包含提供給用戶的空閑內存塊結尾處的空間。
緊跟着這塊內存的cache-line被填充一個struct malloc_elem頭:
如果內存塊中余下的空間比較小,如<=128字節,就會使用一個pad header,余下的空間就浪費了。
不過,如果余下的空間大於128字節,則這塊空閑內存塊就被分成兩份,
一個新的,合適的malloc_elem頭被放到返回的數據空間之前。
從已經存在的元素的結尾分配內存的好處是,在這種情況下,不需要調整free list——
free list中已經存在的元素已經調整過尺寸指針了,后面element的“prev”指針已經重新指向這個新創建的element了。
1.4.3 釋放內存
要釋放內存,需要將指向數據區域起始地址的指針傳遞給free函數。
函數會從指針中減去malloc_elem結構的大小以獲取內存塊的element header。
如果header的類型是“PAD”,則再從指針中減去pad的長度。
從該element指針中,可以獲取到指向堆的來源和需要釋放到哪里的指針,
以及指向前一個元素的,並且通過size變量,可以計算下一個元素的指針。
之后也會檢查后面的和前面的元素,看其是否也需要被釋放。
這意味着永遠不會發生兩個空閑內存塊相鄰的情況,這樣的內存塊總是會被整合成一個更大的內存塊。
二. 源碼分析
DPDK以兩種方式對外提供內存管理方法,一個是rte_mempool,主要用於網卡數據包的收發;一個是rte_malloc,主要為應用程序提供內存使用接口。這里我們主要講一下rte_malloc函數。
rte_malloc實現的大體流程如下圖所示。

下面我們逐個函數分析。
1 /* 2 * Allocate memory on default heap. 3 */ 4 void * 5 rte_malloc(const char *type, size_t size, unsigned align) 6 { 7 return rte_malloc_socket(type, size, align, SOCKET_ID_ANY); 8 }
這個函數沒什么可說的,直接調用rte_malloc_socket,但注意傳入的socketid參數為SOCKET_ID_ANY。
rte_malloc_socket
從這個函數的入口檢查可以看出,如果傳入的分配內存大小size為0或對其align不是2次方的倍數就返回NULL。
1 void * 2 rte_malloc_socket(const char *type, size_t size, unsigned align, int socket_arg) 3 { 4 struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config; 5 int socket, i; 6 void *ret; 7 8 /* return NULL if size is 0 or alignment is not power-of-2 */ 9 if (size == 0 || (align && !rte_is_power_of_2(align))) 10 return NULL; 11 12 if (!rte_eal_has_hugepages()) 13 socket_arg = SOCKET_ID_ANY; 14 /*如果傳入的socket參數為SOCKET_ID_ANY ,則會先嘗試在當前socket上分配內存*/ 15 if (socket_arg == SOCKET_ID_ANY) 16 socket = malloc_get_numa_socket(); /*獲取當前socket_id*/ 17 else 18 socket = socket_arg; 19 20 /* Check socket parameter */ 21 if (socket >= RTE_MAX_NUMA_NODES) 22 return NULL; 23 /*嘗試在當前socket上分配內存,如果分配成功則返回*/ 24 ret = malloc_heap_alloc(&mcfg->malloc_heaps[socket], type, 25 size, 0, align == 0 ? 1 : align, 0); 26 if (ret != NULL || socket_arg != SOCKET_ID_ANY) 27 return ret; 28 /*嘗試在其他socket上分配內存,直到分配成功或者所有socket都嘗試失敗*/ 29 /* try other heaps */ 30 for (i = 0; i < RTE_MAX_NUMA_NODES; i++) { 31 /* we already tried this one */ 32 if (i == socket) 33 continue; 34 35 ret = malloc_heap_alloc(&mcfg->malloc_heaps[i], type, 36 size, 0, align == 0 ? 1 : align, 0); 37 if (ret != NULL) 38 return ret; 39 } 40 41 return NULL; 42 }
malloc_heap_alloc
這個函數用來模擬從heap中(也就是struct malloc_heap)分配內存,其調用邏輯圖如下:
1 void * 2 malloc_heap_alloc(struct malloc_heap *heap, 3 const char *type __attribute__((unused)), size_t size, unsigned flags, 4 size_t align, size_t bound) 5 { 6 struct malloc_elem *elem; 7 /*將size調整為cache line對齊*/ 8 size = RTE_CACHE_LINE_ROUNDUP(size); 9 align = RTE_CACHE_LINE_ROUNDUP(align); 10 11 rte_spinlock_lock(&heap->lock); 12 /*找到合適的malloc_elem結構*/ 13 elem = find_suitable_element(heap, size, flags, align, bound); 14 if (elem != NULL) { 15 elem = malloc_elem_alloc(elem, size, align, bound); 16 /* increase heap's count of allocated elements */ 17 heap->alloc_count++; /*計數加一*/ 18 } 19 rte_spinlock_unlock(&heap->lock); 20 21 return elem == NULL ? NULL : (void *)(&elem[1]); 22 }
注意最后的返回值,返回的是elem[1]的地址,而不是elem的地址。elem[1]是什么呢?其實就是elem+1。說的直觀點,rte_malloc其實就是分配了一個內存塊,也可以說是分配了一個malloc_elem,這個malloc_elem作為這個內存塊的一部分(存放在開頭),相當於這個內存塊的描述符,真正可以使用的內存是malloc_elem之后的內存區域。
如下圖所示。
在補一張內存初始化中講到的數據結構關系圖。

下面看下find_suitable_element函數是如何找到合適的malloc_elem的。
l find_suitable_element
1 static struct malloc_elem * 2 find_suitable_element(struct malloc_heap *heap, size_t size, 3 unsigned flags, size_t align, size_t bound) 4 { 5 size_t idx; 6 struct malloc_elem *elem, *alt_elem = NULL; 7 /*根據申請內存的大小,在struct malloc_heap->free_head數組中找到合適的idx*/ 8 for (idx = malloc_elem_free_list_index(size); 9 idx < RTE_HEAP_NUM_FREELISTS; idx++) { 10 /*在heap->free_head[idx]鏈表中找到合適的malloc_elem*/ 11 for (elem = LIST_FIRST(&heap->free_head[idx]); 12 !!elem; elem = LIST_NEXT(elem, free_list)) { 13 if (malloc_elem_can_hold(elem, size, align, bound)) { 14 if (check_hugepage_sz(flags, elem->ms->hugepage_sz)) 15 return elem; 16 if (alt_elem == NULL) 17 alt_elem = elem; 18 } 19 } 20 } 21 22 if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY)) 23 return alt_elem; 24 25 return NULL; 26 }
我們知道malloc_elem的組織結構是個二維的鏈表,如下圖所示。所以第一步要找到合適的一維鏈表。也就是在struct malloc_heap->free_head數組中找到合適的idx。

我們在前面介紹過,struct malloc_heap->free_head數組的下標和數組中malloc_elem的大小有類似如下對應關系。所以malloc_elem_free_list_index就是返回能夠滿足申請大小size的最小的idx。
heap->free_head[0] - (0 , 2^8]
heap->free_head[1] - (2^8 , 2^10]
heap->free_head[2] - (2^10 ,2^12]
heap->free_head[3] - (2^12, 2^14]
heap->free_head[4] - (2^14, MAX_SIZE]
之后嘗試heap->free_head[idx]上的malloc_elem分配內存,如果分配失敗,再嘗試更大一點的(idx++)。
下面malloc_elem_can_hold負責在heap->free_head[idx]找到一個合適的malloc_elem。而其內部只是調用了elem_start_pt。
l elem_start_pt
1 static void * 2 elem_start_pt(struct malloc_elem *elem, size_t size, unsigned align, 3 size_t bound) 4 { 5 const size_t bmask = ~(bound - 1); 6 /*在debug模式下MALLOC_ELEM_TRAILER_LEN為cacheline大小,正常為0*/ 7 uintptr_t end_pt = (uintptr_t)elem + 8 elem->size - MALLOC_ELEM_TRAILER_LEN; 9 uintptr_t new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align); 10 uintptr_t new_elem_start; 11 12 /* check boundary */ 13 if ((new_data_start & bmask) != ((end_pt - 1) & bmask)) { 14 end_pt = RTE_ALIGN_FLOOR(end_pt, bound); 15 new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align); 16 if (((end_pt - 1) & bmask) != (new_data_start & bmask)) 17 return NULL; 18 } 19 20 new_elem_start = new_data_start - MALLOC_ELEM_HEADER_LEN; 21 22 /* if the new start point is before the exist start, it won't fit */ 23 return (new_elem_start < (uintptr_t)elem) ? NULL : (void *)new_elem_start; 24 }
代碼中的幾個指針如下如所示,其本質就是在當前malloc_elem中嘗試按照size分配一個新的malloc_elem,看下其起始地址是否越界。如果不越界就將當前malloc_elem返回(不是新的malloc_elem,這時還沒有真的分配新malloc_elem)。

找到合適的malloc_elem后,就調用malloc_elem_alloc從此malloc_elem分配新的滿足size大小的malloc_elem。
l malloc_elem_alloc
1 struct malloc_elem * 2 malloc_elem_alloc(struct malloc_elem *elem, size_t size, unsigned align, 3 size_t bound) 4 { 5 struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound); 6 const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem; 7 /*trailer_size就是align-MALLOC_ELEM_TRAILER_LEN的大小,而MALLOC_ELEM_TRAILER_LEN在debug下為cacheline,否則為0*/ 8 const size_t trailer_size = elem->size - old_elem_size - size - 9 MALLOC_ELEM_OVERHEAD; 10 /*將老的elem從鏈表中刪除*/ 11 elem_free_list_remove(elem); 12 13 if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) { 14 /* split it, too much free space after elem */ 15 struct malloc_elem *new_free_elem = 16 RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD); 17 18 split_elem(elem, new_free_elem); 19 malloc_elem_free_list_insert(new_free_elem); 20 } 21 22 /*如果old_elem_size太小,就將老的elem狀態設置為ELEM_BUSY*/ 23 if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) { 24 /* don't split it, pad the element instead */ 25 elem->state = ELEM_BUSY; 26 elem->pad = old_elem_size; 27 28 /* put a dummy header in padding, to point to real element header */ 29 if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything 30 * is cache-line aligned */ 31 new_elem->pad = elem->pad; 32 new_elem->state = ELEM_PAD; 33 new_elem->size = elem->size - elem->pad;/*elem->size -old_elem_size*/ 34 set_header(new_elem); 35 } 36 37 return new_elem; 38 } 39 40 /* we are going to split the element in two. The original element 41 * remains free, and the new element is the one allocated. 42 * Re-insert original element, in case its new size makes it 43 * belong on a different list. 44 */ 45 /*如果old_elem_size足夠大則將原有的elem分隔成兩個elem,分別設置elem,new_elem的size*/ 46 split_elem(elem, new_elem); 47 new_elem->state = ELEM_BUSY;/*設置new_elem的狀態*/ 48 malloc_elem_free_list_insert(elem);/*根據原有的elem調整后的size再找到合適的idx,將其插入heap->free_head[idx]*/ 49 50 return new_elem; 51 }
elem分裂前后對比如下圖所示:

分裂前