環形緩沖區是生產者和消費者模型中常用的數據結構。生產者將數據放入數組的尾端,而消費者從數組的另一端移走數據,當達到數組的尾部時,生產者繞回到數組的頭部。如果只有一個生產者和一個消費者,那么就可以做到免鎖訪問環形緩沖區(Ring Buffer)。寫入索引只允許生產者訪問並修改,只要寫入者在更新索引之前將新的值保存到緩沖區中,則讀者將始終看到一致的數據結構。同理,讀取索引也只允許消費者訪問並修改。
環形緩沖區實現原理圖

如圖所示,當讀者和寫者指針相等時,表明緩沖區是空的,而只要寫入指針在讀取指針后面時,表明緩沖區已滿。
清單 9. 2.6.10 環形緩沖區實現代碼
/*
* __kfifo_put - puts some data into the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->size - fifo->in + fifo->out);
/* first put the data starting from fifo->in to buffer end */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);
fifo->in += len;
return len;
}
/*
* __kfifo_get - gets some data from the FIFO, no locking version
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;
len = min(len, fifo->in - fifo->out);
/* first get the data from fifo->out until the end of the buffer */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);
fifo->out += len;
return len;
}
需要注意的是
使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)時,如果返回參數與傳入參數len不相等時,則操作失敗
我們定義一個
//注意student_info 共17字節 按照內存排列占24字節
typedef struct student_info
{
uint64_t stu_id; //8個字節
uint32_t age; //4字節
uint32_t score;//4字節
char sex;//1字節
}student_info;
我們建立一個環形緩沖區,里面只有64字節大小(雖然我們實際使用時大小遠大於此),向里面多次存入24字節student_info,看有什么反應
//打印學生信息
void print_student_info(const student_info *stu_info)
{
assert(stu_info);
printf("id:%lu\t",stu_info->stu_id);
printf("age:%u\t",stu_info->age);
printf("sex:%d\t",stu_info->sex);
printf("score:%u\n",stu_info->score);
}
student_info * get_student_info(time_t timer)
{
student_info *stu_info = (student_info *)malloc(sizeof(student_info));
srand(timer);
stu_info->stu_id = 10000 + rand() % 9999;
stu_info->age = rand() % 30;
stu_info->score = rand() % 101;
stu_info->sex=rand() % 2;
print_student_info(stu_info);
return stu_info;
}
void print_ring_buffer_len(struct ring_buffer *ring_buf)
{
//用於打印緩沖區長度
uint32_t ring_buf_len = 0;
//取得已經使用緩沖區長度 size-ring_buf_len為未使用緩沖區的長度
ring_buf_len=ring_buffer_len(ring_buf);
printf("no use ring_buf_len:%d\n",(ring_buf->size-ring_buf_len));
}
int main(int argc, char *argv[])
{
uint32_t size = 0;
//用於判斷存儲或者取得數據的字節數
uint32_t oklen = 0;
struct ring_buffer *ring_buf = NULL;
//64字節
size=BUFFER_SIZE;
ring_buf = ring_buffer_alloc(size);
printf("input student\n");
{
student_info *stu_info;
student_info stu_temp;
uint32_t student_len=sizeof(student_info);
printf("ring_buf_len:%d\n",ring_buf->size);
printf("student_len:%d\n",student_len);
//此時環形緩沖區沒有數據我們去取數據當然為空
memset(&stu_temp,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)(&stu_temp), student_len);
if(oklen==student_len)
{
printf("get student data\n");
}
else
{
printf("no student data\n");
}
printf("\n");
//第一次調用時用字節結束后還有64-24 =40字節
stu_info = get_student_info(976686458);
oklen = ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("1 put student data success\n");
}
else
{
printf("1 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第二次調用時用字節結束后還有64-48 =16字節
stu_info = get_student_info(976686464);
oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("2 put student data success\n");
}
else
{
printf("2 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第三次調用時需要用字節但只有字節失敗
//把字節都寫滿了
//驗證了在調用__kfifo_put函數或者__kfifo_get函數時,如果返回參數與傳入參數len不相等時,則操作失敗
stu_info = get_student_info(976686445);
oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("3 put student data success\n");
}
else
{
printf("3 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第四次調用時需要用字節但無字節
////驗證了在調用__kfifo_put函數或者__kfifo_get函數時,如果返回參數與傳入參數len不相等時,則操作失敗
stu_info = get_student_info(976686421);
oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
printf("4 put student data success\n");
}
else
{
printf("4 put student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//現在開始取學生數據里面保存了個學生數據我們取三次看效果
printf("output student\n");
printf("\n");
//第一次取得數據並打印
memset(stu_info,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
print_student_info(stu_info);
printf("1 get student data success\n");
}
else
{
printf("1 get student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
////第二次取得數據並打印
memset(stu_info,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
print_student_info(stu_info);
printf("2 get student data success\n");
}
else
{
printf("2 get student data failure\n");
}
print_ring_buffer_len(ring_buf);
printf("\n");
//第三次取得數據失敗
memset(stu_info,0,student_len);
oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);
if(oklen==student_len)
{
print_student_info(stu_info);
printf("3 get student data success\n");
}
else
{
printf("3 get student data failure\n");
}
print_ring_buffer_len(ring_buf);
}
return 1;
}

結論:在使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)時,如果返回參數與傳入參數len不相等時,則操作失敗。代碼下載:tessc.rar(http://files.cnblogs.com/dragonsuc/tessc.rar)
需要注意的地方:
1.只有一個線程負責讀,另一個線程負責寫的時候,數據是線程安全的。上面的實現是基於這個原理實現的,當有多個線程讀或者多個線程寫的時候,不保證數據的正確性。
所以使用的時候,一個線程寫,一個線程讀。網絡應用中比較常用,就是開一個線程接口數據,然后把數據寫入隊列。然后開一個調度線程讀取網絡數據,然后分發到處理線程。
2.數據長度默認宏定義了一個長度,超過這個長度的時候,后續的數據會寫入失敗。
本文參考文章:

