轉自:http://blog.chinaunix.net/uid-28458801-id-4262445.html
操作系統:ubuntu10.04
前言:
在嵌入式開發中,只要是帶操作系統的,在其上開發產品應用,基本都需要用到多線程。
為了提高效率,盡可能的提高並發率。因此,線程之間的通信就是問題的核心。
根據當前產品需要,使用 環形緩沖區 解決。
一,環形緩沖區的實現
1,cbuf.h
點擊(此處)折疊或打開
- #ifndef __CBUF_H__
- #define __CBUF_H__
-
- #ifdef __cplusplus
- extern "C" {
- #endif
-
- /* Define to prevent recursive inclusion
- -------------------------------------*/
- #include "types.h"
- #include "thread.h"
-
-
- typedef struct _cbuf
- {
- int32_t size; /* 當前緩沖區中存放的數據的個數 */
- int32_t next_in; /* 緩沖區中下一個保存數據的位置 */
- int32_t next_out; /* 從緩沖區中取出下一個數據的位置 */
- int32_t capacity; /* 這個緩沖區的可保存的數據的總個數 */
- mutex_t mutex; /* Lock the structure */
- cond_t not_full; /* Full -> not full condition */
- cond_t not_empty; /* Empty -> not empty condition */
- void *data[CBUF_MAX];/* 緩沖區中保存的數據指針 */
- }cbuf_t;
-
-
- /* 初始化環形緩沖區 */
- extern int32_t cbuf_init(cbuf_t *c);
-
- /* 銷毀環形緩沖區 */
- extern void cbuf_destroy(cbuf_t *c);
-
- /* 壓入數據 */
- extern int32_t cbuf_enqueue(cbuf_t *c,void *data);
-
- /* 取出數據 */
- extern void* cbuf_dequeue(cbuf_t *c);
-
-
- /* 判斷緩沖區是否為滿 */
- extern bool cbuf_full(cbuf_t *c);
-
- /* 判斷緩沖區是否為空 */
- extern bool cbuf_empty(cbuf_t *c);
-
- /* 獲取緩沖區可存放的元素的總個數 */
- extern int32_t cbuf_capacity(cbuf_t *c);
-
-
- #ifdef __cplusplus
- }
- #endif
-
- #endif
- /* END OF FILE
- ---------------------------------------------------------------*/
2,cbuf.c
點擊(此處)折疊或打開
- #include "cbuf.h"
-
-
-
- /* 初始化環形緩沖區 */
- int32_t cbuf_init(cbuf_t *c)
- {
- int32_t ret = OPER_OK;
-
- if((ret = mutex_init(&c->mutex)) != OPER_OK)
- {
- #ifdef DEBUG_CBUF
- debug("cbuf init fail ! mutex init fail !\n");
- #endif
- return ret;
- }
-
- if((ret = cond_init(&c->not_full)) != OPER_OK)
- {
- #ifdef DEBUG_CBUF
- debug("cbuf init fail ! cond not full init fail !\n");
- #endif
- mutex_destroy(&c->mutex);
- return ret;
- }
-
- if((ret = cond_init(&c->not_empty)) != OPER_OK)
- {
- #ifdef DEBUG_CBUF
- debug("cbuf init fail ! cond not empty init fail !\n");
- #endif
- cond_destroy(&c->not_full);
- mutex_destroy(&c->mutex);
- return ret;
- }
-
- c->size = 0;
- c->next_in = 0;
- c->next_out = 0;
- c->capacity = CBUF_MAX;
-
- #ifdef DEBUG_CBUF
- debug("cbuf init success !\n");
- #endif
-
- return ret;
- }
-
-
- /* 銷毀環形緩沖區 */
- void cbuf_destroy(cbuf_t *c)
- {
- cond_destroy(&c->not_empty);
- cond_destroy(&c->not_full);
- mutex_destroy(&c->mutex);
-
- #ifdef DEBUG_CBUF
- debug("cbuf destroy success \n");
- #endif
- }
-
-
-
- /* 壓入數據 */
- int32_t cbuf_enqueue(cbuf_t *c,void *data)
- {
- int32_t ret = OPER_OK;
-
- if((ret = mutex_lock(&c->mutex)) != OPER_OK) return ret;
-
- /*
- * Wait while the buffer is full.
- */
- while(cbuf_full(c))
- {
- #ifdef DEBUG_CBUF
- debug("cbuf is full !!!\n");
- #endif
- cond_wait(&c->not_full,&c->mutex);
- }
-
- c->data[c->next_in++] = data;
- c->size++;
- c->next_in %= c->capacity;
-
- mutex_unlock(&c->mutex);
-
- /*
- * Let a waiting consumer know there is data.
- */
- cond_signal(&c->not_empty);
-
- #ifdef DEBUG_CBUF
- // debug("cbuf enqueue success ,data : %p\n",data);
- debug("enqueue\n");
- #endif
-
- return ret;
- }
-
-
-
- /* 取出數據 */
- void* cbuf_dequeue(cbuf_t *c)
- {
- void *data = NULL;
- int32_t ret = OPER_OK;
-
- if((ret = mutex_lock(&c->mutex)) != OPER_OK) return NULL;
-
- /*
- * Wait while there is nothing in the buffer
- */
- while(cbuf_empty(c))
- {
- #ifdef DEBUG_CBUF
- debug("cbuf is empty!!!\n");
- #endif
- cond_wait(&c->not_empty,&c->mutex);
- }
-
- data = c->data[c->next_out++];
- c->size--;
- c->next_out %= c->capacity;
-
- mutex_unlock(&c->mutex);
-
-
- /*
- * Let a waiting producer know there is room.
- * 取出了一個元素,又有空間來保存接下來需要存儲的元素
- */
- cond_signal(&c->not_full);
-
- #ifdef DEBUG_CBUF
- // debug("cbuf dequeue success ,data : %p\n",data);
- debug("dequeue\n");
- #endif
-
- return data;
- }
-
-
- /* 判斷緩沖區是否為滿 */
- bool cbuf_full(cbuf_t *c)
- {
- return (c->size == c->capacity);
- }
-
- /* 判斷緩沖區是否為空 */
- bool cbuf_empty(cbuf_t *c)
- {
- return (c->size == 0);
- }
-
- /* 獲取緩沖區可存放的元素的總個數 */
- int32_t cbuf_capacity(cbuf_t *c)
- {
- return c->capacity;
- }
二,輔助文件
為了提高程序的移植性,對線程相關進行封裝。
1,thread.h
點擊(此處)折疊或打開
- #ifndef __THREAD_H__
- #define __THREAD_H__
-
- #ifdef __cplusplus
- extern "C" {
- #endif
-
- /* Define to prevent recursive inclusion
- -------------------------------------*/
- #include "types.h"
-
-
-
-
-
- typedef struct _mutex
- {
- pthread_mutex_t mutex;
- }mutex_t;
-
-
- typedef struct _cond
- {
- pthread_cond_t cond;
- }cond_t;
-
-
- typedef pthread_t tid_t;
- typedef pthread_attr_t attr_t;
- typedef void* (* thread_fun_t)(void*);
-
-
- typedef struct _thread
- {
- tid_t tid;
- cond_t *cv;
- int32_t state;
- int32_t stack_size;
- attr_t attr;
- thread_fun_t fun;
- }thread_t;
-
-
-
- /* mutex */
- extern int32_t mutex_init(mutex_t *m);
- extern int32_t mutex_destroy(mutex_t *m);
- extern int32_t mutex_lock(mutex_t *m);
- extern int32_t mutex_unlock(mutex_t *m);
-
-
- /* cond */
- extern int32_t cond_init(cond_t *c);
- extern int32_t cond_destroy(cond_t *c);
- extern int32_t cond_signal(cond_t *c);
- extern int32_t cond_wait(cond_t *c,mutex_t *m);
-
-
-
- /* thread */
- /* 線程的創建,其屬性的設置等都封裝在里面 */
- extern int32_t thread_create(thread_t *t);
- //extern int32_t thread_init(thread_t *t);
-
- #define thread_join(t, p) pthread_join(t, p)
- #define thread_self() pthread_self()
- #define thread_sigmask pthread_sigmask
-
-
- #ifdef __cplusplus
- }
- #endif
-
- #endif
- /* END OF FILE
- ---------------------------------------------------------------*/
2,thread.c
點擊(此處)折疊或打開
- #include "thread.h"
-
-
-
-
- /* mutex */
- int32_t mutex_init(mutex_t *m)
- {
- int32_t ret = OPER_OK;
-
- if((ret = pthread_mutex_init(&m->mutex, NULL)) != 0)
- ret = -THREAD_MUTEX_INIT_ERROR;
-
- return ret;
- }
-
-
- int32_t mutex_destroy(mutex_t *m)
- {
- int32_t ret = OPER_OK;
-
- if((ret = pthread_mutex_destroy(&m->mutex)) != 0)
- ret = -MUTEX_DESTROY_ERROR;
-
- return ret;
- }
-
-
-
- int32_t mutex_lock(mutex_t *m)
- {
- int32_t ret = OPER_OK;
-
- if((ret = pthread_mutex_lock(&m->mutex)) != 0)
- ret = -THREAD_MUTEX_LOCK_ERROR;
-
- return ret;
- }
-
-
-
- int32_t mutex_unlock(mutex_t *m)
- {
- int32_t ret = OPER_OK;
-
- if((ret = pthread_mutex_unlock(&m->mutex)) != 0)
- ret = -THREAD_MUTEX_UNLOCK_ERROR;
-
- return ret;
- }
-
-
-
-
-
-
- /* cond */
- int32_t cond_init(cond_t *c)
- {
- int32_t ret = OPER_OK;
-
- if((ret = pthread_cond_init(&c->cond, NULL)) != 0)
- ret = -THREAD_COND_INIT_ERROR;
-
- return ret;
- }
-
-
-
- int32_t cond_destroy(cond_t *c)
- {
- int32_t ret = OPER_OK;
-
- if((ret = pthread_cond_destroy(&c->cond)) != 0)
- ret = -COND_DESTROY_ERROR;
-
- return ret;
- }
-
-
-
- int32_t cond_signal(cond_t *c)
- {
- int32_t ret = OPER_OK;
-
-
- if((ret = pthread_cond_signal(&c->cond)) != 0)
- ret = -COND_SIGNAL_ERROR;
-
- return ret;
- }
-
-
-
-
- int32_t cond_wait(cond_t *c,mutex_t *m)
- {
- int32_t ret = OPER_OK;
-
- if((ret = pthread_cond_wait(&c->cond, &m->mutex)) != 0)
- ret = -COND_WAIT_ERROR;
-
- return ret;
- }
三,測試
1,測試代碼
點擊(此處)折疊或打開
- /*
- * cbuf begin
- */
- #define OVER (-1)
-
- static cbuf_t cmd;
- static int line_1[200];
- static int line_2[200];
- //static int temp = 0;
-
- static bool line1_finish = false;
- static bool line2_finish = false;
-
- void* producer_1(void *data)
- {
- int32_t i = 0;
-
- for(i = 0; i < 200; i++)
- {
- line_1[i] = i+1000;
- cbuf_enqueue(&cmd, &line_1[i]);
-
- if(0 == (i % 9)) sleep(1);
- }
-
- line1_finish = true;
-
- return NULL;
- }
-
- void* producer_2(void *data)
- {
- int32_t i = 0;
-
- for(i = 0; i < 200; i++)
- {
- line_2[i] = i+20000;
- cbuf_enqueue(&cmd, &line_2[i]);
-
- if(0 == (i % 9)) sleep(1);
- }
-
- line2_finish = true;
-
- return NULL;
- }
-
-
- void* consumer(void *data)
- {
- int32_t *ptr = NULL;
-
- while(1)
- {
- ptr = cbuf_dequeue(&cmd);
- printf("%d\n",*ptr);
-
- if(cbuf_empty(&cmd) && line2_finish && line1_finish)
- {
- printf("quit\n");
- break;
- }
- }
-
- return NULL;
- }
-
-
- void test_cbuf_oper(void)
- {
- pthread_t l_1;
- pthread_t l_2;
- pthread_t c;
-
- cbuf_init(&cmd);
-
- pthread_create(&l_1,NULL,producer_1,0);
- pthread_create(&l_2,NULL,producer_2,0);
- pthread_create(&c,NULL,consumer,0);
-
- pthread_join(l_1,NULL);
- pthread_join(l_2,NULL);
- pthread_join(c,NULL);
-
- cbuf_destroy(&cmd);
- }
-
-
- void test_cbuf(void)
- {
- test_cbuf_oper();
- }
-
-
- /*
- * cbuf end
- */
2,測試結果
四,參考文件
1,《bareos-master》源碼
2,《nginx》源碼