在網絡服務器的設計中,經常使用多進程/多線程.這就涉及到在進程/線程間共享數據.
現在我們假設一個場景,一個進程/線程負責處理網絡收發,一個或多個進程/線程處理
收到的網絡數據包.
顯然,我們可以在每一對協作進程/線程間添加一個隊列,將數據添加到隊列中,以實現
兩個進程/線程的協作.
我們的消息隊列主要的設計目標有三個:
1)要可以使用在進程與進程和線程與線程之間.當在進程之間通信時,我們的消息隊列
將會被放在共享內存中.
2)避免使用鎖機制,其一方面原因是鎖的開銷較大,另一方面是因為,對於在共享內存中
使用消息隊列時.如果一個進程獲得鎖之后崩潰,另一個進程將得不到任何的通知.當它
要獲得鎖的時候,將會阻塞在永遠不會被釋放的鎖上(unix like的系統).
3)可向隊列中加入變長的消息,減少memcopy的次數.
基於以上目標,每一個消息隊列只能有一個寫者,一個讀者,以避免鎖的使用.
消息隊列用數組實現循環隊列.數組中的每個元素是以下結構:
template<int MsgSize> struct msgBlock { int next; //下一個塊的下標,如果是最后一個block,則==-1 int totalSize;//整個消息的大小,僅對消息中的第一個塊有效 int size; //本塊有效數據大小 char msg[MsgSize]; int getAvaSize(char *p) { return MsgSize - int(p - msg); } msgBlock():next(-1),totalSize(0),size(0){} };
每個完整的消息都是由1個或連續數個msgBlock組成.這樣就可以實現在消息隊列中傳遞
變長的消息包.
消息隊列的接口如下:
struct wPos { int blk; char *pos; wPos():blk(-1),pos(NULL){} }; template<int MsgSize = 1024,int QueueSize = 4096> //MsgSize,每個block的大小,QueueSize,最大消息數 class MsgQueue { public: template<typename T> int WriteNum(const T val); template<typename T> int ReWriteNum(wPos *pos,const T val); int WriteString(char *str); int WriteBin(void *bin,int len); wPos getWPos(); /* * brief: 向隊列提交一個完整的消息 */ void MsgPush(); /* * brief : 讀出一條完整的消息 */ int MsgPop(void *buf); //返回隊列中第一個msg的大小 int GetFirstMsgSize(); };
為了減少內存的考貝,可以通過write函數簇直接向消息隊列中寫入數值型,string,和二進制數據.
當所有的數據都寫完后,調用MsgPush將把這個消息包提交到隊列中.
通過MsgPop可以獲取一個完成的消息.還提供了rewrite接口,以修改已經寫入隊列的數值型數據.
以下是完整的程序:
#ifndef _MSGQUEUE_H #define _MSGQUEUE_H #include <assert.h> /* * brief : 消息隊列,作為線程/進程間通信的消息隊列,實現單讀單寫,無需加鎖. * 對於進程間通信,使用共享內存實現. * */ template<int MsgSize> struct msgBlock { int next; int totalSize;//整個消息的大小,僅對消息中的第一個塊有效 int size;//本塊有效數據大小 char msg[MsgSize]; int getAvaSize(char *p) { return MsgSize - int(p - msg); } msgBlock():next(-1),totalSize(0),size(0){} }; struct wPos { int blk; char *pos; wPos():blk(-1),pos(NULL){} }; template<int MsgSize = 1024,int QueueSize = 4096> class MsgQueue { public: MsgQueue():idx_read(0),idx_write(0),curblk(0),pCurWrite(msgQueue[0].msg),saveTotalSize(0){} template<typename T> int WriteNum(const T val) { return Write(&val,(int)sizeof(T)); } template<typename T> int ReWriteNum(wPos *pos,const T val) { return ReWrite(pos,&val,(int)sizeof(T)); } int WriteString(char *str) { return Write(str,(int)strlen(str)+1); } int WriteBin(void *bin,int len) { return Write(bin,len); } wPos getWPos() { wPos ret; if((curblk + 1)%QueueSize == idx_read) return ret; ret.blk = curblk; ret.pos = pCurWrite; return ret; } /* * brief: 一條完整的消息已經完成寫入隊列 */ void MsgPush() { if((idx_write+1)%QueueSize == idx_read) return; msgQueue[idx_write].totalSize = saveTotalSize; msgQueue[curblk].next = -1; idx_write = (curblk+1)%QueueSize; pCurWrite = msgQueue[idx_write].msg; curblk = idx_write; saveTotalSize = msgQueue[idx_write].size = 0; msgQueue[idx_write].next = -1; } /* * brief : 讀出一條完整的消息 */ int MsgPop(void *buf) { assert(buf); if(idx_read == idx_write) return 0; int tmp_cur = idx_read; char *pWrite = (char*)buf; int totalSize = msgQueue[tmp_cur].totalSize; for( ; ; ) { msgBlock<MsgSize> &curBlock = msgQueue[tmp_cur]; memcpy(pWrite,curBlock.msg,curBlock.size); pWrite += curBlock.size; tmp_cur = (tmp_cur+1)%QueueSize; if(curBlock.next == -1) break; } idx_read = tmp_cur; return totalSize; } //返回隊列中第一個msg的大小 int GetFirstMsgSize() { if(idx_read == idx_write) return 0; return msgQueue[idx_read].totalSize; } private: int ReWrite(wPos *pos,const void *buf,int size) { assert(buf); assert(size>0); int tSize = size; msgBlock<MsgSize> *msgBlk = &msgQueue[pos->blk]; char *pRead = (char *)buf; while(tSize) { int avaSize = msgBlk->getAvaSize(pos->pos); //當前block空間已經用完,需要使用第二個block的空間 if(avaSize == 0) { pos->blk = (pos->blk + 1) % QueueSize; msgBlk = &msgQueue[pos->blk]; avaSize = MsgSize; pos->pos = msgBlk->msg; } int writesize = avaSize > size ? size : avaSize; memcpy(pos->pos,pRead,writesize); pos->pos += writesize; pRead += writesize; tSize -= writesize; } return size; } /* * brief: 向隊列中寫入數據,這些數據只是消息中的一部分,當所有數據都寫完,調用MsgWrite. */ int Write(const void *buf,int size) { assert(buf); assert(size>0); //已經沒有空間可供寫入 if((curblk + 1)%QueueSize == idx_read) return 0; int tSize = size; msgBlock<MsgSize> *msgBlk = &msgQueue[curblk]; char *pRead = (char *)buf; while(tSize) { int avaSize = msgBlk->getAvaSize(pCurWrite); //當前block空間已經用完,需要使用第二個block的空間 if(avaSize == 0) { int next = (curblk + 1) % QueueSize; if(next == idx_read)//空間用完了 { curblk = idx_write; pCurWrite = msgQueue[idx_write].msg; saveTotalSize = 0; return 0; } msgBlk->next = curblk = next; msgBlk = &msgQueue[curblk]; avaSize = MsgSize; msgBlk->size = 0; pCurWrite = msgBlk->msg; } int writesize = avaSize > size ? size : avaSize; memcpy(pCurWrite,pRead,writesize); pCurWrite += writesize; pRead += writesize; saveTotalSize += writesize; msgBlk->size += writesize; tSize -= writesize; } return size; } private: int idx_read;//讀下標 int idx_write;//寫下標 char *pCurWrite;//當前寫指針 int curblk;//當前寫所在的塊 int saveTotalSize; msgBlock<MsgSize> msgQueue[QueueSize]; };