DPDK rte_malloc


參考文獻:dpdk中的librte_malloc

    《深入淺出DPDK》 

 

一.   librte_malloc 庫  

  dpdk中的librte_malloc庫提供了能夠分配任意大小內存的API。

  該庫的目標是提供類似malloc的函數從hugepage中分配內存,以及幫助應用程序移植。通常情況下,這種類型的分配不應該在數據平面處理,因為其比基於內存池的分配更慢,並且在分配和釋放時會使用鎖。

  

1.1 Cookies

如果在配置文件中打開CONFIG_RTE_MALLOC_DEBUG,

分配的內存會包含覆蓋保護區域,以識別緩沖區溢出問題。

 

1.2 對齊與NUMA Constraints

rte_malloc()函數包含一個align參數,用來要求內存區域對齊到該值的倍數(必須是2的倍數)。

在支持NUMA的系統中,調用rte_malloc()函數時,會在調用該函數的進程所在的socket上分配內存。

同時該庫也提供了一組API,使用戶可以直接在指定的NUMA socket上分配內存,

或者在另一個core所在的NUMA socket上分配內存。

 

1.3 用例

應用程序在初始化時使用類似malloc這樣的函數時,可以使用該庫。

要在運行時分配/釋放內存數據,如果應用程序對速度有要求,

請用內存池庫代替本庫。

如果要使用一塊需要知道物理地址的內存塊,如硬件設備使用的內存塊,

則應該使用memory zone。

 

1.4 數據結構

在malloc庫的內部使用兩種數據結構類型:

struct malloc_heap: 用來管理每個socket上的空閑空間

struct malloc_elem: 分配的基本元素,由庫內部管理的空閑空間。

 

1.4.1 struct malloc_heap

該結構體用來管理每個socket上的空閑空間。

在庫的內部,每個NUMA node上包含一個 heap結構體,

使我們可以根據線程運行所在的NUMA node,在對應的結點分配內存。

雖然不能保存一定會在指定的結點上分配內存,但比總在某個固定的的結點或隨機結點分配要好。

heap的關鍵成員變量和成員函數描述如下:

mz_count: 保存本結點已經為heap內存分配的memory zone的數量。該值的唯一用途就是與numa_socket值組合為每個memory zone生成一個唯一的名字。

lock: 該變量用來做對heap訪問的同步。考慮到heap中的空閑空間是由一個list管理的,所以我們需要一個鎖來防止兩個線程同時訪問該list。

free_head: 該變量該malloc heap的free nodes list中的第一個元素。

 

注意: malloc_heap結構體不會管理已經分配的memzones,這么做是毫無意義的,因為它們不會被釋放。

也不會管理使用中的內存塊,因為除非它們被釋放,否則是不會再次接觸到這些內存塊的。

在釋放時,指向這些內存塊的指針會作為free()函數的參數。

1.4.1.2 struct malloc_elem結構體

malloc_elem結構體被用作memzone中各種內存塊的頭部結構。

有三種不同的用法:

1、分配或釋放內存塊時的頭部 - 普通情況

2、在內存塊中作為padding頭部

3、作為memzone結尾處的標記

下文描述了結構中最重要的部分以及用法。

 

注意:如果某種用法不屬於上面描述的三種中的任何一種,則認為對應的變量是未定義的。

例如,只有當"state"和"pad"兩個變量的值是有效值是,才認為其是一個padding header。

head:該指針是已經分配的內存塊中指向heap結構的反向引用,即指向對應的heap。

        普通內存塊在釋放時會使用該指針,將當前釋放的內存塊添加到heap的free list中

prev:該指針指向memzone中當前內存塊緊前面的內存塊的header element/block。

        當釋放一個內存塊時,該指針用來引用前一個內存塊,看其是否也需要釋放。

        如果需要,則兩塊內存組合成一塊更大的內存塊。

next_free:該指針用來將未分配的內存塊鏈接到一起。

        同樣,該變量只在普通內存塊中使用,在malloc()函數中找到一塊符合需求的內存塊來分配,

        並且在調用free()函數將新釋放的內存添加到free-list中。

state:該變量可以是以下三個值之一:“Free”, “Busy”或“Pad”。

        前兩個用業表示普通內存塊的分配狀態,

        第三個用來表示在start-of-block padding的結尾處的元素結構體是一個dummy結構體。

        (例如,由於強制對齊,內存塊中數據的開始處不在內存塊中。???)

        在這種情況下,pad header用來定位實際分配的元素header。

        對於end-of-memzone結構體,該值總是“busy”,

        以確保在釋放時沒有元素為了整合成一個更大的內存塊,而在memzone的結尾外面查找其它內存塊。

pad:該變量保存內存塊開始處的padding區域的長度。

        如果是普通內存塊header,該值會被加到header的結尾處的地址,以給出數據區域的正確地址。

        例如,在調用malloc函數時傳回的值。

        在padding中的dummy header的內部,該值也會被保存,

        and is subtracted from the address of the dummy header to yield  the address of the actual block header.

size:表示數據內存塊的大小,包含header自身。對於end-of-memzone結構,該值為0,雖然從不會檢查該值。

        對於被釋放的普通內存塊,該值用來代替“next”指針,用來計算下一個內存塊所在的地址。

        (因此如果下一個內存塊也是free的,兩個內存塊可以整合成一個)。

 

1.4.2 內存分配

應用程序調用類似malloc的函數時,malloc函數首先會根據調用線程索引lcore_config結構,

以及根據該線程確定其所在的NUMA結點。

即用來索引malloc_head結構數組,之后以該數組為參數調用heap_alloc()函數,

同時作為參數的還有要分配的大小,類型和對齊。

heap_alloc()函數會掃描heap的free_list,並嘗試找到一個合適大小的內存塊來存儲數據,同時強制對齊。

如果沒有找到合適大小的內存塊,例如,第一次在某結點上調用malloc函數時free-list是空的,

則會創建一個新的memzone並配置為heap元素,其會將一個dummy結構放置到memzone的結尾處,

作為一個標記,防止訪問超出這塊內存之外(由於該標記被置為“BUSY”,malloc庫永遠無法將這塊內存分配出去)。

同時在memzone的開始處放置一個合適的element header。這個header標記了memzone中的所有空間,

bar the sentinel value at the end,end, as a single free  heap element, and it is then added to the free_list for the heap.

 

新的memzone配置好之后,會重新對heap的free-list進行描述,這次描述會找到新添加的合適大小的元素,

將其作為memzone中保留內存的大小,至少是調用函數中指定的大小的數據內存塊加上對齊,

至少是Intel DPDK運行時配置中指定的最小大小。

 

找到一個合適大小的空閑元素之后,會計算返回到用戶的指針,包含提供給用戶的空閑內存塊結尾處的空間。

緊跟着這塊內存的cache-line被填充一個struct malloc_elem頭:

如果內存塊中余下的空間比較小,如<=128字節,就會使用一個pad header,余下的空間就浪費了。

不過,如果余下的空間大於128字節,則這塊空閑內存塊就被分成兩份,

一個新的,合適的malloc_elem頭被放到返回的數據空間之前。

從已經存在的元素的結尾分配內存的好處是,在這種情況下,不需要調整free list——

free list中已經存在的元素已經調整過尺寸指針了,后面element的“prev”指針已經重新指向這個新創建的element了。

1.4.3 釋放內存

要釋放內存,需要將指向數據區域起始地址的指針傳遞給free函數。

函數會從指針中減去malloc_elem結構的大小以獲取內存塊的element header。

如果header的類型是“PAD”,則再從指針中減去pad的長度。

 

從該element指針中,可以獲取到指向堆的來源和需要釋放到哪里的指針,

以及指向前一個元素的,並且通過size變量,可以計算下一個元素的指針。

之后也會檢查后面的和前面的元素,看其是否也需要被釋放。

這意味着永遠不會發生兩個空閑內存塊相鄰的情況,這樣的內存塊總是會被整合成一個更大的內存塊。

 

二. 源碼分析

    DPDK以兩種方式對外提供內存管理方法,一個是rte_mempool,主要用於網卡數據包的收發;一個是rte_malloc,主要為應用程序提供內存使用接口。這里我們主要講一下rte_malloc函數。

rte_malloc實現的大體流程如下圖所示。


 

    下面我們逐個函數分析。

1      /*
2      * Allocate memory on default heap.
3      */
4     void *
5     rte_malloc(const char *type, size_t size, unsigned align)
6     {
7              return rte_malloc_socket(type, size, align, SOCKET_ID_ANY);
8     }

 

    這個函數沒什么可說的,直接調用rte_malloc_socket,但注意傳入的socketid參數為SOCKET_ID_ANY

 

rte_malloc_socket

    從這個函數的入口檢查可以看出,如果傳入的分配內存大小size為0或對其align不是2次方的倍數就返回NULL

 1      void *
 2     rte_malloc_socket(const char *type, size_t size, unsigned align, int socket_arg)
 3     {
 4              struct rte_mem_config *mcfg = rte_eal_get_configuration()->mem_config;
 5              int socket, i;
 6              void *ret;
 7      
 8              /* return NULL if size is 0 or alignment is not power-of-2 */
 9              if (size == 0 || (align && !rte_is_power_of_2(align)))
10                        return NULL;
11      
12              if (!rte_eal_has_hugepages())
13                        socket_arg = SOCKET_ID_ANY;
14         /*如果傳入的socket參數為SOCKET_ID_ANY ,則會先嘗試在當前socket上分配內存*/
15              if (socket_arg == SOCKET_ID_ANY)
16                        socket = malloc_get_numa_socket(); /*獲取當前socket_id*/
17              else
18                        socket = socket_arg;
19      
20              /* Check socket parameter */
21              if (socket >= RTE_MAX_NUMA_NODES)
22                        return NULL;
23         /*嘗試在當前socket上分配內存,如果分配成功則返回*/
24              ret = malloc_heap_alloc(&mcfg->malloc_heaps[socket], type,
25                                          size, 0, align == 0 ? 1 : align, 0);
26              if (ret != NULL || socket_arg != SOCKET_ID_ANY)
27                        return ret;
28         /*嘗試在其他socket上分配內存,直到分配成功或者所有socket都嘗試失敗*/
29              /* try other heaps */
30              for (i = 0; i < RTE_MAX_NUMA_NODES; i++) {
31                        /* we already tried this one */
32                        if (i == socket)
33                                 continue;
34      
35                        ret = malloc_heap_alloc(&mcfg->malloc_heaps[i], type,
36                                                    size, 0, align == 0 ? 1 : align, 0);
37                        if (ret != NULL)
38                                 return ret;
39              }
40      
41              return NULL;
42     }

 

  malloc_heap_alloc

這個函數用來模擬從heap中(也就是struct malloc_heap)分配內存,其調用邏輯圖如下:


 

 

 1      void *
 2     malloc_heap_alloc(struct malloc_heap *heap,
 3                        const char *type __attribute__((unused)), size_t size, unsigned flags,
 4                        size_t align, size_t bound)
 5     {
 6              struct malloc_elem *elem;
 7         /*將size調整為cache line對齊*/
 8              size = RTE_CACHE_LINE_ROUNDUP(size);
 9              align = RTE_CACHE_LINE_ROUNDUP(align);
10      
11              rte_spinlock_lock(&heap->lock);
12         /*找到合適的malloc_elem結構*/
13              elem = find_suitable_element(heap, size, flags, align, bound);
14              if (elem != NULL) {
15                        elem = malloc_elem_alloc(elem, size, align, bound);
16                        /* increase heap's count of allocated elements */
17                        heap->alloc_count++; /*計數加一*/
18              }
19              rte_spinlock_unlock(&heap->lock);
20      
21              return elem == NULL ? NULL : (void *)(&elem[1]);
22     }

 

 

 

     注意最后的返回值,返回的是elem[1]的地址,而不是elem的地址。elem[1]是什么呢?其實就是elem+1。說的直觀點,rte_malloc其實就是分配了一個內存塊,也可以說是分配了一個malloc_elem,這個malloc_elem作為這個內存塊的一部分(存放在開頭),相當於這個內存塊的描述符,真正可以使用的內存是malloc_elem之后的內存區域

  如下圖所示。

在補一張內存初始化中講到的數據結構關系圖。

   

 

     下面看下find_suitable_element函數是如何找到合適的malloc_elem的。

l find_suitable_element

 

 1      static struct malloc_elem *
 2     find_suitable_element(struct malloc_heap *heap, size_t size,
 3                        unsigned flags, size_t align, size_t bound)
 4     {
 5              size_t idx;
 6              struct malloc_elem *elem, *alt_elem = NULL;
 7         /*根據申請內存的大小,在struct malloc_heap->free_head數組中找到合適的idx*/
 8              for (idx = malloc_elem_free_list_index(size);
 9                                 idx < RTE_HEAP_NUM_FREELISTS; idx++) {
10                        /*在heap->free_head[idx]鏈表中找到合適的malloc_elem*/
11                        for (elem = LIST_FIRST(&heap->free_head[idx]);
12                                          !!elem; elem = LIST_NEXT(elem, free_list)) {
13                                 if (malloc_elem_can_hold(elem, size, align, bound)) {
14                                          if (check_hugepage_sz(flags, elem->ms->hugepage_sz))
15                                                    return elem;
16                                          if (alt_elem == NULL)
17                                                    alt_elem = elem;
18                                 }
19                        }
20              }
21      
22              if ((alt_elem != NULL) && (flags & RTE_MEMZONE_SIZE_HINT_ONLY))
23                        return alt_elem;
24      
25              return NULL;
26     }

 

我們知道malloc_elem的組織結構是個二維的鏈表,如下圖所示。所以第一步要找到合適的一維鏈表。也就是在struct malloc_heap->free_head數組中找到合適的idx

    我們在前面介紹過,struct malloc_heap->free_head數組的下標和數組中malloc_elem的大小有類似如下對應關系。所以malloc_elem_free_list_index就是返回能夠滿足申請大小size的最小的idx

heap->free_head[0] - (0   , 2^8]

heap->free_head[1] - (2^8 , 2^10]

heap->free_head[2] - (2^10 ,2^12]

heap->free_head[3] - (2^12, 2^14]

heap->free_head[4] - (2^14, MAX_SIZE]

之后嘗試heap->free_head[idx]上的malloc_elem分配內存,如果分配失敗,再嘗試更大一點的(idx++)

下面malloc_elem_can_hold負責在heap->free_head[idx]找到一個合適的malloc_elem。而其內部只是調用了elem_start_pt

l  elem_start_pt

 

 1      static void *
 2     elem_start_pt(struct malloc_elem *elem, size_t size, unsigned align,
 3                        size_t bound)
 4     {
 5              const size_t bmask = ~(bound - 1);
 6              /*在debug模式下MALLOC_ELEM_TRAILER_LEN為cacheline大小,正常為0*/
 7              uintptr_t end_pt = (uintptr_t)elem +
 8                                 elem->size - MALLOC_ELEM_TRAILER_LEN;
 9              uintptr_t new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align);
10              uintptr_t new_elem_start;
11      
12              /* check boundary */
13              if ((new_data_start & bmask) != ((end_pt - 1) & bmask)) {
14                        end_pt = RTE_ALIGN_FLOOR(end_pt, bound);
15                        new_data_start = RTE_ALIGN_FLOOR((end_pt - size), align);
16                        if (((end_pt - 1) & bmask) != (new_data_start & bmask))
17                                 return NULL;
18              }
19      
20              new_elem_start = new_data_start - MALLOC_ELEM_HEADER_LEN;
21      
22              /* if the new start point is before the exist start, it won't fit */
23              return (new_elem_start < (uintptr_t)elem) ? NULL : (void *)new_elem_start;
24     }

 

代碼中的幾個指針如下如所示,其本質就是在當前malloc_elem中嘗試按照size分配一個新的malloc_elem,看下其起始地址是否越界。如果不越界就將當前malloc_elem返回(不是新的malloc_elem,這時還沒有真的分配新malloc_elem)。

找到合適的malloc_elem后,就調用malloc_elem_alloc從此malloc_elem分配新的滿足size大小的malloc_elem

l  malloc_elem_alloc

 

 1      struct malloc_elem *
 2     malloc_elem_alloc(struct malloc_elem *elem, size_t size, unsigned align,
 3                        size_t bound)
 4     {
 5              struct malloc_elem *new_elem = elem_start_pt(elem, size, align, bound);
 6              const size_t old_elem_size = (uintptr_t)new_elem - (uintptr_t)elem;
 7              /*trailer_size就是align-MALLOC_ELEM_TRAILER_LEN的大小,而MALLOC_ELEM_TRAILER_LEN在debug下為cacheline,否則為0*/
 8              const size_t trailer_size = elem->size - old_elem_size - size -
 9                        MALLOC_ELEM_OVERHEAD;
10         /*將老的elem從鏈表中刪除*/
11              elem_free_list_remove(elem);
12      
13              if (trailer_size > MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
14                        /* split it, too much free space after elem */
15                        struct malloc_elem *new_free_elem =
16                                          RTE_PTR_ADD(new_elem, size + MALLOC_ELEM_OVERHEAD);
17      
18                        split_elem(elem, new_free_elem);
19                        malloc_elem_free_list_insert(new_free_elem);
20              }
21      
22         /*如果old_elem_size太小,就將老的elem狀態設置為ELEM_BUSY*/
23              if (old_elem_size < MALLOC_ELEM_OVERHEAD + MIN_DATA_SIZE) {
24                        /* don't split it, pad the element instead */
25                        elem->state = ELEM_BUSY;
26                        elem->pad = old_elem_size;
27      
28                        /* put a dummy header in padding, to point to real element header */
29                        if (elem->pad > 0){ /* pad will be at least 64-bytes, as everything
30                                             * is cache-line aligned */
31                                 new_elem->pad = elem->pad;
32                                 new_elem->state = ELEM_PAD;
33                                 new_elem->size = elem->size - elem->pad;/*elem->size -old_elem_size*/
34                                 set_header(new_elem);
35                        }
36      
37                        return new_elem;
38              }
39      
40              /* we are going to split the element in two. The original element
41               * remains free, and the new element is the one allocated.
42               * Re-insert original element, in case its new size makes it
43               * belong on a different list.
44               */
45              /*如果old_elem_size足夠大則將原有的elem分隔成兩個elem,分別設置elem,new_elem的size*/
46              split_elem(elem, new_elem);
47              new_elem->state = ELEM_BUSY;/*設置new_elem的狀態*/
48              malloc_elem_free_list_insert(elem);/*根據原有的elem調整后的size再找到合適的idx,將其插入heap->free_head[idx]*/
49      
50              return new_elem;
51     }

 

elem分裂前后對比如下圖所示:

 

分裂前

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM