大家好,我是雨樂!
在之前的文章中,我們分析了glibc內存管理相關的內容,里面的是不是邏輯復雜😁,畢竟咱們用幾十行代碼完成的功能,glibc要用上百乃至上千行代碼來實現,畢竟它的受眾太多了,需要考慮跨平台,各種邊界條件等。
其實,glibc的內存分配庫ptmalloc也可以看做是一個內存池,出於性能考慮,每次內存申請都是先從ptmalloc中進行分配,如果沒有合適的則通過系統分配函數進行申請;在釋放的時候,也是將被釋放內存先方式內存池中,內存池根據一定的策略,來決定是否進行shrink以歸還OS。
那么,現一個內存池?我們該怎么實現呢?今天,借助這篇文章,我們一起來設計和實現一個內存池(文末附有github地址)。
背景
首先需要說明的是,該內存池是筆者在10年前完成的,下面先說下當時此項目的背景。
09年,在某所的時候,參與了某個國家級項目,該項目是防DDOS攻擊相關,因此更多的是跟IP相關,所以每次分配和釋放內存都是固定大小,經過測試,性能不是很滿意,所以,經過代碼分析以及性能攻擊分析,發現里面有大量的malloc/free,所以,當時就決定是否從malloc/free入手,能否優化整個項目的性能。
所以,決定實現一個Memory Pool,在做了調研以及研究了相關論文后,決定實現一個內存池,先試試水,所幸運的是,性能確實比glibc自帶的malloc/free要高,所以也就應用於項目上了。
❝本文所講的Memory Pool為C語言實現,旨在讓大家都能看懂,看明白(至少能夠完全理解本文所講的Memory Pool的實現原理)。
❞
概念
首先,我們介紹下什么是內存池?
❝預先在內存中申請一定數量的內存塊留作備用,當有新的內存需求時,就先從內存池中分配內存返回,在釋放的時候,將內存返回給內存池而不是OS,在下次申請的時候,重新返回
❞
那么為什么要有內存池呢?這就需要從傳統內存分配的特點來進行分析,傳統內存分配釋放的優點無非就是 通用性強,應用廣泛,但是傳統的內存分配、釋放在某些特定的項目中,其不一定是最優、效率最高的方案。
傳統內存分配、釋放的缺點總結如下:
1、調用malloc/new,系統需要根據“最先匹配”、“最優匹配”或其他算法在內存空閑塊表中查找一塊空閑內存,調用free/delete,系統可能需要合並空閑內存塊,這些會產生額外開銷
2、頻繁的在堆上申請和釋放內存必然需要大量時間,降低了程序的運行效率。對於一個需要頻繁申請和釋放內存的程序來說,頻繁調用new/malloc申請內存,delete/free釋放內存都需要花費系統時間,頻繁的調用必然會降低程序的運行效率。
3、經常申請小塊內存,會將物理內存“切”得很碎,導致內存碎片。申請內存的順序並不是釋放內存的順序,因此頻繁申請小塊內存必然會導致內存碎片,造成“有內存但是申請不到大塊內存”的現象。
內存分配
從上圖中,可以看出,應用程序會調用glibc運行時庫的malloc函數進行內存申請,而malloc函數則會根據具體申請的內存塊大小,根據實際情況最終從sys_brk或者sys_mmap_pgoff系統調用申請內存,而大家都知道,跟os打交道,_性能損失_是毋庸置疑的。
其次,glibc作為通用的運行時庫,malloc/free需要滿足各種場景需求,比如申請的字節大小不一,多線程訪問等。
沒有比傳統malloc/free性能更優的方案呢?
答案是:有。
在程序啟動的時候,我們預分配特定數量的固定大小的塊,這樣每次申請的時候,就從預分配的塊中獲取,釋放的時候,將其放入預分配塊中以備下次復用,這就是所謂的_內存池技術_,每個內存池對應特定場景,這樣的話,較傳統的傳統的malloc/free少了很多復雜邏輯,性能顯然會提升不少。
結合傳統malloc/free的缺點,我們總結下使用內存池方案的優點:
1、比malloc/free進行內存申請/釋放的方式快
2、不會產生或很少產生堆碎片
3、可避免內存泄漏
分類
根據分配出去的字節大小是否固定,分為 固定大小內存池 和 可變大小內存池 兩類。
而可變大小內存池,可分配任意大小的內存池,比如ptmalloc、jemalloc以及google的tcmalloc。
固定大小內存池,顧名思義,每次申請和釋放的內存大小都是固定的。每次分配出去的內存塊大小都是程序預先定義的值,而在釋放內存塊時候,則簡單的掛回內存池鏈表即可。
❝本文主要講的是固定大小的內存池。
❞
原理
內存池,重點在”池“字上,之所以稱之為內存池,是在真正使用之前,先預分配一定數量、大小預設的塊,如果有新的內存需求時候,就從內存池中根據申請的內存大小,分配一個內存塊,若當前內存塊已經被完全分配出去,則繼續申請一大塊,然后進行分配。
當進行內存塊釋放的時候,則將其歸還內存池,后面如果再有申請的話,則將其重新分配出去。
內存池結構圖
上圖是本文所要設計的結構圖,下面在具體的設計之前,我們先講下本內存池的原理:
- 創建並初始化頭結點MemoryPool
- 通過MemoryPool進行內存分配,如果發現MemoryPool所指向的第一塊MemoryBlock或者現有MemoryPool沒有空閑內存塊,則創建一個新的MemoryBlock初始化之后將其插入MemoryPool的頭
- 在內存分配的時候,遍歷MemoryPool中的單鏈表MemoryBlock,根據地址判斷所要釋放的內存屬於哪個MemoryBlock,然后根據偏移設置MemoryBlock的第一塊空閑塊索引,同時將空閑塊個數+1
上述只是一個簡單的邏輯講解,比較宏觀,下面我們將通過圖解和代碼的方式來進行講解。
設計
在上圖中,我們畫出了內存池的結構圖,從圖中,可以看出,有兩個結構變量,分別為MemoryPool和MemoryBlock。
下面我們將從數據結構和接口兩個部分出發,詳細講解內存池的設計。
數據結構
MemoryBlock
本文中所講述的內存塊的分配和釋放都是通過該結構進行操作,下面是MemoryBlock的示例圖:
MemoryBlock
在上圖中,Header存儲該MemoryBlock的內存塊情況,比如可用的內存塊索引、當前MemoryBlock中可用內存塊的個數等等。
定義如下所示:
struct MemoryBlock {
unsigned int size;
unsigned int free_size;
unsigned int first_free;
struct MemoryBlock *next;
char a_data[0];
};
其中:
- size為MemoryBlock下內存塊的個數
- free_size為MemoryBlock下空閑內存塊的個數
- first_free為MemoryBlock中第一個空閑塊的索引
- next指向下一個MemoryBlock
- a_data是一個柔性數組
❝柔性數組即數組大小待定的數組, C語言中結構體的最后一個元素可以是大小未知的數組,也就是所謂的0長度,所以我們可以用結構體來創建柔性數組。
它的主要用途是為了滿足需要變長度的結構體,為了解決使用數組時內存的冗余和數組的越界問題。
❞
MemoryPool
MemoryPool為內存池的頭,里面定義了該內存池的信息,比如本內存池分配的固定對象的大小,第一個MemoryBlock等
struct MemoryPool {
unsigned int obj_size;
unsigned int init_size;
unsigned int grow_size;
MemoryBlock *first_block;
};
其中:
- obj_size為內存池分配的固定內存塊的大小
- init_size初始化內存池時候創建的內存塊的個數
- grow_size當初始化內存塊使用完后,再次申請內存塊時候的個數
- first_block指向第一個MemoryBlock
接口
memory_pool_create
MemoryPool *memory_pool_create(unsigned int init_size,
unsigned int grow_size,
unsigned int size);
本函數用來創建一個MemoryPool,並對其進行初始化,下面是參數說明:
- init_size 表示第一個MemoryBlock中創建塊的個數
- grow_size 表示當MemoryPool中沒有空閑塊可用,則創建一個新的MemoryBlock時其塊的個數
- size 為塊的大小(即每次分配相同大小的固定size)
memory_alloc
void *memory_alloc(MemoryPool *mp);
本函數用了從mp中申請一塊內存返回
- mp 為MemoryPool類型指針,即內存池的頭
- 如果內存分配失敗,則返回NULL
memory_free
void* memory_free(MemoryPool *mp, void *pfree);
本函數用來釋放內存
- mp 為MemoryPool類型指針,即內存池的頭
- pfree 為要釋放的內存
free_memory_pool
void free_memory_pool(MemoryPool *mp);
本函數用來釋放內存池
實現
在講解整個實現之前,我們先看先內存池的詳細結構圖。
初始化內存池
MemoryPool是整個內存池的入口結構,該函數主要是用來創建MemoryPool對象,並使用參數對其內部的成員變量進行初始化。
函數定義如下:
MemoryPool *memory_pool_create(unsigned int init_size, unsigned int grow_size, unsigned int size)
{
MemoryPool *mp;
mp = (MemoryPool*)malloc(sizeof(MemoryPool));
mp->first_block = NULL;
mp->init_size = init_size;
mp->grow_size = grow_size;
if(size < sizeof(unsigned int))
mp->obj_size = sizeof(unsigned int);
mp->obj_size = (size + (MEMPOOL_ALIGNMENT-1)) & ~(MEMPOOL_ALIGNMENT-1);
return mp;
}
內存分配
void *memory_alloc(MemoryPool *mp) {
unsigned int i;
unsigned int length;
if(mp->first_block == NULL) {
MemoryBlock *mb;
length = (mp->init_size)*(mp->obj_size) + sizeof(MemoryBlock);
mb = malloc(length);
if(mb == NULL) {
perror("memory allocate failed!\n");
return NULL;
}
/* init the first block */
mb->next = NULL;
mb->free_size = mp->init_size - 1;
mb->first_free = 1;
mb->size = mp->init_size*mp->obj_size;
mp->first_block = mb;
char *data = mb->a_data;
/* set the mark */
for(i=1; iinit_size; ++i) {
*(unsigned long *)data = i;
data += mp->obj_size;
}
return (void *)mb->a_data;
}
MemoryBlock *pm_block = mp->first_block;
while((pm_block != NULL) && (pm_block->free_size == 0)) {
pm_block = pm_block->next;
}
if(pm_block != NULL) {
char *pfree = pm_block->a_data + pm_block->first_free * mp->obj_size;
pm_block->first_free = *((unsigned long *)pfree);
pm_block->free_size--;
return (void *)pfree;
} else {
if(mp->grow_size == 0)
return NULL;
MemoryBlock *new_block = (MemoryBlock *)malloc((mp->grow_size)*(mp->obj_size) + sizeof(MemoryBlock));
if(new_block == NULL)
return NULL;
char *data = new_block->a_data;
for(i=1; igrow_size; ++i) {
*(unsigned long *)data = i;
data += mp->obj_size;
}
new_block->size = mp->grow_size*mp->obj_size;
new_block->free_size = mp->grow_size-1;
new_block->first_free = 1;
new_block->next = mp->first_block;
mp->first_block = new_block;
return (void *)new_block->a_data;
}
}
內存塊主要在MemoryBlock結構中,也就是說申請的內存,都是從MemoryBlock中進行獲取,流程如下:
- 獲取MemoryPool中的first_block指針
- 如果該指針為空,則創建一個MemoryBlock,first_block指向新建的MemoryBlock,並返回
- 否則,從first_block進行單鏈表遍歷,查找第一個free_size不為0的MemoryBlock,如果找到,則對該MemoryBlock的相關參數進行設置,然后返回內存塊
- 否則,創建一個新的MemoryBlock,進行初始化分配之后,將其插入到鏈表的頭部(這樣做的目的是為了方便下次分配效率,即減小了鏈表的遍歷)
在上述代碼中,需要注意的是第30-33行或者67-70行,這兩行的功能一樣,都是對新申請的內存塊進行初始化,這幾行的意思,是要將空閑塊連接起來,但是,並沒有使用傳統意義上的鏈表方式,而是通過index方式進行連接,具體如下圖所示:
在上圖中,第0塊空閑塊的下一個空閑塊索引為1,而第1塊空閑塊的索引為2,依次類推,形成了如下鏈表方式
❝1->2->3->4->5
❞
內存分配流程圖如下所示:
內存釋放
void* memory_free(MemoryPool *mp, void *pfree) {
if(mp->first_block == NULL) {
return;
}
MemoryBlock *pm_block = mp->first_block;
MemoryBlock *pm_pre_block = mp->first_block;
/* research the MemoryBlock which the pfree in */
while(pm_block && ((unsigned long)pfree < (unsigned long)pm_block->a_data ||
(unsigned long)pfree>((unsigned long)pm_block->a_data+pm_block->size))) {
//pm_pre_block = pm_block;
pm_block = pm_block->next;
if(pm_block == NULL) {
return pfree;
}
}
unsigned int offset = pfree -(void*) pm_block->a_data;
if((offset&(mp->obj_size -1)) > 0) {
return pfree;
}
pm_block->free_size++;
*((unsigned int *)pfree) = pm_block->first_free;
pm_block->first_free=(unsigned int)(offset/mp->obj_size);
return NULL;
}
內存釋放過程如下:
-
判斷當前MemoryPool的first_block指針是否為空,如果為空,則返回
-
否則,遍歷MemoryBlock鏈表,根據所釋放的指針參數判斷是否在某一個MemoryBlock中
-
如果找到,則對MemoryBlock中的各個參數進行操作,然后返回
-
否則,沒有合適的MemoryBlock,則表明該被釋放的指針不在內存池中,返回
-
在上述代碼中,需要注意第20-29行。
- 第20行,求出被釋放的內存塊在MemoryBlock中的偏移
- 第22行,判斷是否能被整除,即是否在這個內存塊中,算是個double check
- 第26行,將該MemoryBlock中的空閑塊個數加1
- 第27-29行,類似於鏈表的插入,將新釋放的內存塊的索引放入鏈表頭,而其內部的指向下一個可用內存塊
現在舉個例子,以便於理解,假設在一開始有5個空閑塊,其中前三個空閑塊都分配出去了,那么此時,空閑塊鏈表如下
❝4->5,其中first_free = 4
❞
然后在某一個時刻,第1塊釋放了,那么釋放歸還之后,如下
❝1->4->5,其中first_free = 1
❞
內存釋放流程圖如下:
內存釋放
釋放內存池
void free_memory_pool(MemoryPool *mp) {
MemoryBlock *mb = mp->first_block;
if(mb != NULL) {
while(mb->next != NULL) {
s_memory_block *delete_block = mb;
mb = mb->next;
free(delete_block);
}
free(mb);
}
free(mp);
}
上圖是一個完整的分配和釋放示意圖,下面,我結合代碼來分析:
- (a)步,創建了一個MemoryPool結構體
- obj_size = 4代表本內存池分配的內存塊大小為4
- init_size = 5代表創建內存池的時候,第一塊MemoryBlock的空閑內存塊個數為5
- grow_size = 5代表當申請內存的時候,如果沒有空閑內存,則創建的新的MemoryBlock的空閑內存塊個數為5
- (b)步,分配出去一塊內存
- 此時,free_size即該MemoryBlock中可用空閑塊個數為4
- first_free = 1,代表將內存塊分配出去之后,下一個可用的內存塊的index為1
- (c)步,分配出去一塊內存
- 此時,free_size即該MemoryBlock中可用空閑塊個數為3
- first_free = 2,代表將內存塊分配出去之后,下一個可用的內存塊的index為2
- (d)步,分配出去一塊內存
- 此時,free_size即該MemoryBlock中可用空閑塊個數為2
- first_free = 3,代表將內存塊分配出去之后,下一個可用的內存塊的index為3
- (e)步,分配出去一塊內存
- 此時,free_size即該MemoryBlock中可用空閑塊個數為1
- first_free = 4,代表將內存塊分配出去之后,下一個可用的內存塊的index為4
- (f)步,釋放第1個內存塊
- 將free_size進行+1操作
- fire_free值為此次釋放的內存塊的索引,而釋放的內存塊的索引里面的值則為之前first_free的值(此處釋放用的前差法)
- (g)步,釋放第3個內存塊
- 將free_size進行+1操作
- fire_free值為此次釋放的內存塊的索引,而釋放的內存塊的索引里面的值則為之前first_free的值(此處釋放用的前差法)
- (h)步,釋放第3個內存塊
- 將free_size進行+1操作
- fire_free值為此次釋放的內存塊的索引,而釋放的內存塊的索引里面的值則為之前first_free的值(此處釋放用的前差法)
測試
測試代碼如下:
#include "memory_pool.h"
#include <sys/time.h>
#include
#include
int main() {
MemoryPool *mp = memory_pool_create(8);
struct timeval start;
struct timeval end;
int t[] = {20000, 40000, 80000, 100000, 120000, 140000, 160000, 180000, 200000};
int s = sizeof(t)/sizeof(int);
for (int i = 0; i < s; ++i) {
gettimeofday(&start, NULL);
for (int j = 0; j < t[i]; ++j) {
void *p = memory_alloc(mp);
memory_free(mp, p);
//
//void *p = malloc(8);
//free(p);
}
gettimeofday(&end, NULL);
long cost = 1000000 * (end.tv_sec - start.tv_sec) +
end.tv_usec - start.tv_usec;
printf("%ld\n",cost);
}
free_memory_pool(mp);
return 0;
}
數據對比如下:
從上圖可以看出,pool的分配效率高於傳統的malloc方式,性能提高接近100%
❝本測試結果僅針對當時的項目,對其他測試case不具有普遍性
❞
擴展
在文章前面,我們有提過本內存池是_單線程、固定大小的_,但是往往這種還是不能滿足要求,如下幾個場景
- 單線程多固定大小
- 多線程固定大小
- 多線程多固定大小
❝多固定大小,指的是提前預支需要申請的內存大小
❞
單線程多固定大小: 針對此場景,由於已經預知了所申請的size,所以可以針對每個size創建一個內存池
多線程固定大小:針對此場景,有以下兩個方案
- 使用ThreadLocalCache
- 每個線程創建一個內存池
- 使用加鎖,操作全局唯一內存池(每次加鎖解鎖耗時100ns左右)
多線程多固定大小:針對此場景,可以結合上述兩個方案,即
- 使用ThreadCache,每個線程內創建多固定大小的內存池
- 每個線程內創建一個多固定大小的內存池
- 使用加鎖,操作全局唯一內存池(每次加鎖解鎖耗時100ns左右)
❝上述幾種方案,僅僅是在使用固定大小內存池基礎上進行的擴展,具體的方案,需要根據具體情況來具體分析
❞
結語
本文主要講了固定大小內存池的實現方式,因為實現方案的局限性,此內存池設計方案僅適用於每次申請都是特定大小的場景。雖然在擴展部分做了部分思維發散,但因為未做充分的數據對比,所以僅限於思維擴散。
目前,開源的內存分配庫很多,比較優秀的有谷歌的tcmalloc以及微軟的mimalloc,大家可以根據自己項目的需求場景,選擇合適的內存分配庫。
今天的文章就到這里,下期見。
❝本文所講的內存池源碼地址:
https://github.com/namelij/fixedsize_memorypool
別忘了給個star哦😁
❞
作者:高性能架構探索
掃描下方二維碼關注公眾號【高性能架構探索】,回復【pdf】免費獲取計算機必備經典書籍