消息隊列的設計


 

在網絡服務器的設計中,經常使用多進程/多線程.這就涉及到在進程/線程間共享數據.

現在我們假設一個場景,一個進程/線程負責處理網絡收發,一個或多個進程/線程處理

收到的網絡數據包.

 

顯然,我們可以在每一對協作進程/線程間添加一個隊列,將數據添加到隊列中,以實現

兩個進程/線程的協作.

 

我們的消息隊列主要的設計目標有三個:

1)要可以使用在進程與進程和線程與線程之間.當在進程之間通信時,我們的消息隊列

將會被放在共享內存中.

 

2)避免使用鎖機制,其一方面原因是鎖的開銷較大,另一方面是因為,對於在共享內存中

使用消息隊列時.如果一個進程獲得鎖之后崩潰,另一個進程將得不到任何的通知.當它

要獲得鎖的時候,將會阻塞在永遠不會被釋放的鎖上(unix like的系統).

3)可向隊列中加入變長的消息,減少memcopy的次數.

 

基於以上目標,每一個消息隊列只能有一個寫者,一個讀者,以避免鎖的使用.

 

消息隊列用數組實現循環隊列.數組中的每個元素是以下結構:

template<int MsgSize>
struct msgBlock
{
    int next;     //下一個塊的下標,如果是最后一個block,則==-1
    int totalSize;//整個消息的大小,僅對消息中的第一個塊有效
    int size;     //本塊有效數據大小
    char msg[MsgSize];
    int getAvaSize(char *p)
    {
        return MsgSize - int(p - msg);
    }
    msgBlock():next(-1),totalSize(0),size(0){}
};

 

每個完整的消息都是由1個或連續數個msgBlock組成.這樣就可以實現在消息隊列中傳遞

變長的消息包.

  

消息隊列的接口如下: 

struct wPos
{
    int blk;
    char *pos;
    wPos():blk(-1),pos(NULL){}
};
template<int MsgSize = 1024,int QueueSize = 4096>  //MsgSize,每個block的大小,QueueSize,最大消息數
class MsgQueue
{
public:
    template<typename T>
    int WriteNum(const T val);
    template<typename T>
    int ReWriteNum(wPos *pos,const T val);
    int WriteString(char *str);
    int WriteBin(void *bin,int len);
    wPos getWPos();
    /*
    * brief: 向隊列提交一個完整的消息
    */
    void MsgPush();
    /*
    * brief : 讀出一條完整的消息
    */
    int MsgPop(void *buf);
    //返回隊列中第一個msg的大小
    int GetFirstMsgSize();
};

 

為了減少內存的考貝,可以通過write函數簇直接向消息隊列中寫入數值型,string,和二進制數據.

當所有的數據都寫完后,調用MsgPush將把這個消息包提交到隊列中.

 

通過MsgPop可以獲取一個完成的消息.還提供了rewrite接口,以修改已經寫入隊列的數值型數據.

以下是完整的程序:

#ifndef _MSGQUEUE_H
#define _MSGQUEUE_H
#include <assert.h>
/*
* brief : 消息隊列,作為線程/進程間通信的消息隊列,實現單讀單寫,無需加鎖.
*         對於進程間通信,使用共享內存實現.
*
*/
template<int MsgSize>
struct msgBlock
{
    int next;
    int totalSize;//整個消息的大小,僅對消息中的第一個塊有效
    int size;//本塊有效數據大小
    char msg[MsgSize];
    int getAvaSize(char *p)
    {
        return MsgSize - int(p - msg);
    }
    msgBlock():next(-1),totalSize(0),size(0){}
};
struct wPos
{
    int blk;
    char *pos;
    wPos():blk(-1),pos(NULL){}
};
template<int MsgSize = 1024,int QueueSize = 4096>
class MsgQueue
{
public:
    MsgQueue():idx_read(0),idx_write(0),curblk(0),pCurWrite(msgQueue[0].msg),saveTotalSize(0){}
    template<typename T>
    int WriteNum(const T val)
    {
        return Write(&val,(int)sizeof(T));
    }
    template<typename T>
    int ReWriteNum(wPos *pos,const T val)
    {
        return ReWrite(pos,&val,(int)sizeof(T));
    }
    int WriteString(char *str)
    {
        return Write(str,(int)strlen(str)+1);
    }
    int WriteBin(void *bin,int len)
    {
        return Write(bin,len);
    }
    wPos getWPos()
    {
        wPos ret;
        if((curblk + 1)%QueueSize == idx_read)
            return ret;
        ret.blk = curblk;
        ret.pos = pCurWrite;
        return ret;
    }
    /*
    * brief: 一條完整的消息已經完成寫入隊列
    */
    void MsgPush()
    {
        if((idx_write+1)%QueueSize == idx_read)
            return;
        msgQueue[idx_write].totalSize = saveTotalSize;
        msgQueue[curblk].next = -1;
        idx_write = (curblk+1)%QueueSize;
        pCurWrite = msgQueue[idx_write].msg;
        curblk = idx_write;
        saveTotalSize = msgQueue[idx_write].size = 0;
        msgQueue[idx_write].next = -1;
    }
    /*
    * brief : 讀出一條完整的消息
    */
    int MsgPop(void *buf)
    {
        assert(buf);
        if(idx_read == idx_write)
            return 0;
        int tmp_cur = idx_read;
        
        char *pWrite = (char*)buf;
        int totalSize = msgQueue[tmp_cur].totalSize;
        for( ; ; )
        {
            msgBlock<MsgSize> &curBlock = msgQueue[tmp_cur];
            memcpy(pWrite,curBlock.msg,curBlock.size);
            pWrite += curBlock.size;
            tmp_cur = (tmp_cur+1)%QueueSize;
            if(curBlock.next == -1)
                break;
        }
        idx_read = tmp_cur;
        return totalSize;
    }
    //返回隊列中第一個msg的大小
    int GetFirstMsgSize()
    {
        if(idx_read == idx_write)
            return 0;
        return msgQueue[idx_read].totalSize;
    }
private:
    int ReWrite(wPos *pos,const void *buf,int size)
    {
        assert(buf);
        assert(size>0);
        int tSize = size;
        msgBlock<MsgSize> *msgBlk = &msgQueue[pos->blk];    
        char *pRead = (char *)buf;
        while(tSize)
        {        
            int avaSize = msgBlk->getAvaSize(pos->pos);
            //當前block空間已經用完,需要使用第二個block的空間
            if(avaSize == 0)
            {
                pos->blk = (pos->blk + 1) % QueueSize;
                msgBlk = &msgQueue[pos->blk];
                avaSize = MsgSize;
                pos->pos = msgBlk->msg;
            }
            int writesize = avaSize > size ? size : avaSize;
            
            memcpy(pos->pos,pRead,writesize);
            pos->pos += writesize;
            pRead += writesize;
            tSize -= writesize;
        }
        
        return size;
    
    }
    /*
    * brief: 向隊列中寫入數據,這些數據只是消息中的一部分,當所有數據都寫完,調用MsgWrite.
    */
    int Write(const void *buf,int size)
    {
        assert(buf);
        assert(size>0);
        //已經沒有空間可供寫入
        if((curblk + 1)%QueueSize == idx_read)
            return 0;
        int tSize = size;
        msgBlock<MsgSize> *msgBlk = &msgQueue[curblk];    
        char *pRead = (char *)buf;
        while(tSize)
        {        
            int avaSize = msgBlk->getAvaSize(pCurWrite);
            //當前block空間已經用完,需要使用第二個block的空間
            if(avaSize == 0)
            {
                int next = (curblk + 1) % QueueSize; 
                if(next == idx_read)//空間用完了
                {
                    curblk = idx_write;
                    pCurWrite = msgQueue[idx_write].msg;
                    saveTotalSize = 0;
                    return 0;
                }
                msgBlk->next = curblk = next;
                msgBlk = &msgQueue[curblk];
                avaSize = MsgSize;
                msgBlk->size = 0;
                pCurWrite = msgBlk->msg;
            }
            int writesize = avaSize > size ? size : avaSize;
            
            memcpy(pCurWrite,pRead,writesize);
            pCurWrite += writesize;
            pRead += writesize;
            saveTotalSize += writesize;
            msgBlk->size += writesize;
            tSize -= writesize;
        }
        return size;
    }
private:
    int idx_read;//讀下標
    int idx_write;//寫下標
    
    char *pCurWrite;//當前寫指針
    int   curblk;//當前寫所在的塊
    int   saveTotalSize;
    msgBlock<MsgSize> msgQueue[QueueSize];
};

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM