想實現個循環緩沖區(Circular Buffer),搜了些資料多數是基於循環隊列的實現方式。使用一個變量存放緩沖區中的數據長度或者空出來一個空間來判斷緩沖區是否滿了。偶然間看到分析Linux內核的循環緩沖隊列kfifo
的實現,確實極其巧妙。kfifo
主要有以下特點:
- 保證緩沖空間的大小為2的次冪,不是的向上取整為2的次冪。
- 使用無符號整數保存輸入(in)和輸出(out)的位置,在輸入輸出時不對in和out的值進行模運算,而讓其自然溢出,並能夠保證
in-out
的結果為緩沖區中已存放的數據長度,這也是最能體現kfifo
實現技巧的地方; - 使用內存屏障(Memory Barrier)技術,實現單消費者和單生產者對
kfifo
的無鎖並發訪問,多個消費者、生產者的並發訪問還是需要加鎖的。
本文主要以下三個部分:
- 關於2的次冪問題,判斷是不是2的次冪以及向上取整為2的次冪
- Linux內核中
kfifo
的實現及簡要分析 - 根據
kfifo
實現的循環緩沖區,並進行一些測試
關於內存屏障的本文不作過多分析,可以參考WikiMemory Barrier。另外,本文所涉及的整數都默認為無符號整數,不再做一一說明。
1. 2的次冪
- 判斷一個數是不是2的次冪
kfifo
要保證其緩存空間的大小為2的次冪,如果不是則向上取整為2的次冪。其對於2的次冪的判斷方式也是很巧妙的。如果一個整數n是2的次冪,則二進制模式必然是1000...
,而n-1的二進制模式則是0111...
,也就是說n和n-1的每個二進制位都不相同,例如:8(1000)和7(0111);n不是2的次冪,則n和n-1的二進制必然有相同的位都為1的情況,例如:7(0111)和6(0110)。這樣就可以根據n & (n-1)
的結果來判斷整數n是不是2的次冪,實現如下:
/*
判斷n是否是2的冪
若n為2的次冪, 則 n & (n-1) == 0,也就是n和n-1的各個位都不相同。例如 8(1000)和7(0111)
若n不是2的次冪, 則 n & (n-1) != 0,也就是n和n-1的各個位肯定有相同的,例如7(0111)和6(0110)
*/
static inline bool is_power_of_2(uint32_t n)
{
return (n != 0 && ((n & (n - 1)) == 0));
}
- 將數字向上取整為2的次冪
如果設定的緩沖區大小不是2的次冪,則向上取整為2的次冪,例如:設定為5,則向上取為8。上面提到整數n是2的次冪,則其二進制模式為100...
,故如果正數k不是n的次冪,只需找到其最高的有效位1所在的位置(從1開始計數)pos,然后1 << pos
即可將k向上取整為2的次冪。實現如下:
static inline uint32_t roundup_power_of_2(uint32_t a)
{
if (a == 0)
return 0;
uint32_t position = 0;
for (int i = a; i != 0; i >>= 1)
position++;
return static_cast<uint32_t>(1 << position);
}
2. Linux實現kfifo及分析
Linux內核中kfifo
實現技巧,主要集中在放入數據的put
方法和取數據的get
方法。代碼如下:
unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
/*
* Ensure that we sample the fifo->out index -before- we
* start putting bytes into the kfifo.
*/
smp_mb();
/* first put the data starting from fifo->in to buffer end */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);
/*
* Ensure that we add the bytes to the kfifo -before-
* we update the fifo->in index.
*/
smp_wmb();
fifo->in += len;
return len;
}
unsigned int __kfifo_get(struct kfifo *fifo,unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
/*
* Ensure that we sample the fifo->in index -before- we
* start removing bytes from the kfifo.
*/
smp_rmb();
/* first get the data from fifo->out until the end of the buffer */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);
/*
* Ensure that we remove the bytes from the kfifo -before-
* we update the fifo->out index.
*/
smp_mb();
fifo->out += len;
return len;
}
put
返回實際保存到緩沖區中的數據長度,get
返回的是實際取到的數據長度。在上面代碼中,需要注意到在寫入、取出時候的兩次min
運算。關於kfifo
的分析,已有很多資料了,也可參考眉目傳情之匠心獨運的kfifo 。
Linux內核實現的kfifo的有以下特點:
- 使用內存屏障 Memory Barrier
- 初始化緩沖區空間時要保證緩沖區的大小為2的次冪
- 使用無符號整數保存in和out(輸入輸出的指針),並且在放入取出數據的時候不做模運算,讓其自然溢出。
優點:
-
實現單消費者和單生產者的無鎖並發訪問。多消費者和多生產者的時候還是需要加鎖的。
-
使用與運算
in & (size-1)
代替模運算 -
在更新in或者out的值時不做模運算,而是讓其自動溢出。這應該是kfifo實現最牛叉的地方了,利用溢出后的值參與運算,並且能夠保證結果的正確。溢出運算保證了以下幾點:
- in - out為緩沖區中的數據長度
- size - in + out 為緩沖區中空閑空間
- in == out時緩沖區為空
- size == (in - out)時緩沖區滿了
3.模仿kfifo
實現的循環緩沖
主要是模仿其無符號溢出的運算方法,並沒有利用內存屏障實現單生產者和單消費者的無鎖並發訪問。初始化及輸入輸出的代碼如下:
struct kfifo{
uint8_t *buffer;
uint32_t in; // 輸入指針
uint32_t out; // 輸出指針
uint32_t size; // 緩沖區大小,必須為2的次冪
kfifo(uint32_t _size)
{
if (!is_power_of_2(_size))
_size = roundup_power_of_2(_size);
buffer = new uint8_t[_size];
in = 0;
out = 0;
size = _size;
}
// 返回實際寫入緩沖區中的數據
uint32_t put(const uint8_t *data, uint32_t len)
{
// 當前緩沖區空閑空間
len = min(len,size - in + out);
// 當前in位置到buffer末尾的長度
auto l = min(len, size - (in & (size - 1)));
// 首先復制數據到[in,buffer的末尾]
memcpy(buffer + (in & (size - 1)), data, l);
// 復制剩余的數據(如果有)到[buffer的起始位置,...]
memcpy(buffer, data + l, len - l);
in += len; // 直接加,不作模運算。當溢出時,從buffer的開始位置重新開始
return len;
}
// 返回實際讀取的數據長度
uint32_t get(uint8_t *data, uint32_t len)
{
// 緩沖區中的數據長度
len = min(len, in - out);
// 首先從[out,buffer end]讀取數據
auto l = min(len, size - (out & (size - 1)));
memcpy(data, buffer + (out & (size - 1)), l);
// 從[buffer start,...]讀取數據
memcpy(data + l, buffer, len - l);
out += len; // 直接加,不錯模運算。溢出后,從buffer的起始位置重新開始
return len;
}
在初始化緩沖空間的時候要驗證size
是否為2的次冪,如果不是則向上取整為2的次冪。下面着重分析下在放入取出數據時對指針in
和out
的處理,以及在溢出后怎么能夠保證in - out
仍然為緩沖區中的已有的數據長度。
put和get方法詳解
在向緩沖區中put數據的時候,需要兩個參數:要put的數據指針data
和期望能夠put的數據長度len
,返回值是實際存放到緩沖區中的數據長度(當緩沖區中空間不足時該值小於len
)。下面詳細的解釋下put
中每個語句的作用。
put
函數中的第一句是len = min(len,size - in + out)
計算實際向緩沖區中寫入數據的大小。如果想要寫入的數據len
大於緩沖區中的空閑空間size - in + out
,則只填充滿緩沖空間。
因為是循環緩沖區,所以其空閑空間有兩部分:從in到緩沖空間的末尾->[in,buffer end]和緩沖空間的起始位置到out->[buffer start,out]。
auto l = min(len, size - (in & (size - 1)));
這個是判斷[in,buffer end]這部分空間是否足夠寫入數據memcpy(buffer + (in & (size - 1)), data, l);
向[in,buffer end]這部分空間寫入數據memcpy(buffer, data + l, len - l);
如果數據還沒有寫完,則向[buffer start,out]這部分空間寫入數據。in += len
更新in,不做模運算,讓其自然溢出。
get
和put很類似,首先判斷是否有足夠的數據取出;在取數據時首先從out取到buffer的末尾,如果不夠則從buffer的開始位置取;最后更新out時也是不做模運算,讓其溢出。看參看上面put的語句解釋,這里就不再多說。
無符號溢出運算
kfifo
之所以如次的簡潔,很大一部分要歸功於其in和out的溢出運算。這里就解釋下在溢出的情況下,如何保證in - out
仍然為緩沖區中的數據長度。首先來看圖:
-
緩沖區為空
-
put 一堆數據后
-
get 一堆數據后
-
put的數據長度超過in到buffer末尾的長度,有一部分從put到buffer的起始位置
以上圖片引用自linux內核數據結構之kfifo,其對
kfifo
的分析也很詳細。
前三種情況下從圖中可以很清晰的看出in - out
為緩沖區中的已有的數據長度,但是最后一種發現in跑到了out的前面,這時候in - out
不是應該為負的么,怎么能是數據長度?這正是kfifo
的高明之處,in和out都是無符號整數,那么在in < out 時in - out
就是負數,把這個負數當作無符號來看時,其值仍然是緩沖區中的數據長度。這和in累加到溢出的情況基本一致,這里放在一起說。
這里使用8位無符號整數來保存in和out,方便溢出。這里假設out = 100,in = 255,size = 256,如下圖
/*
--------------------------------------
| | | |
--------------------------------------
out = 100 in = 250
這時緩沖區中已有的數據為:in - out = 150,空閑空間為:size - (in - out) = 106
向緩沖區中put10個數據后
--------------------------------------
| | | |
--------------------------------------
in out
這時候 in + 10 = 260 溢出變為in = 4;這是 in - out = 4 - 100 = -96,仍然溢出-96十六進制為`0xA0`,將其直接轉換為有符號數`0xA0 = 160`,在沒put之前的數據為150,put10個后,緩沖區中的數據剛好為160,剛好為溢出計算結果。
*/
進行上述運算的前提是,size必須為2的次冪。假如size = 257,則上述的運行就不會成功。
測試實例
上面描述都是基於運算推導的,下面據結合本文中的代碼進行下驗證。
測試代碼如下:設置空間大小為128,in和out為8位無符號整數
int main()
{
uint8_t output[512] = { 0 };
uint8_t data[256] = { 0 };
for (int i = 0; i < 256; i++)
data[i] = i;
kfifo fifo(128);
fifo.put(data, 100);
fifo.get(output, 50);
fifo.put(data, 30);
auto c = fifo.put(data + 10, 92);
cout << "Empty:" << fifo.isEmpty() << endl;
cout << "Left Space:" << fifo.left() << endl;
cout << "Length:" << fifo.length() << endl;
uint8_t a = fifo.size - fifo.in + fifo.out;
uint8_t b = fifo.in - fifo.out;
cout << "=======================================" << endl;
fifo.get(output, 128);
cout << "Empty:" << fifo.isEmpty() << endl;
cout << "Left Space:" << fifo.left() << endl;
cout << "Length:" << fifo.length() << endl;
cout << "======================================" << endl;
fifo.put(output, 100);
cout << "Empty:" << fifo.isEmpty() << endl;
auto d = static_cast<uint8_t>(fifo.left());
auto e = static_cast<uint8_t>(fifo.length());
printf("Left Space:%d\n", d);
printf("Length:%d\n", e);
getchar();
return 0;
}
執行結果:
- 第一個輸出是將緩沖區填滿的狀態
- 第二個輸出是將緩沖區取空的狀態
- 第三個是in溢出的情況,具體來看看:
在第二個輸出將緩沖區取空的時候,in = out = 178。接着,向緩沖區put了100個數據,這時候in += 100
會溢出,溢出后in = 22。看輸出結果:put前緩沖區為空,put100個數據后,緩沖區的空閑空間為28,數據長度為100,是正確的。