message queue的設計


為了在各線程之間高效的傳遞消息,必須設計一種高效率的消息隊列,傳統的做法是mutex加queue,這種做法在每次執行push和pop時都要加鎖,

效率相對較低。其次還有使用循環隊列,可以做到完全無鎖,但只能實現1:1的消息傳遞。還有一些lock-free隊列的實現,但基於其實現的相對復雜

性,我不打算使用。

我的隊列設計是使用tls維護一個local list,每個線程執行push時,首先將元素放入屬於本線程的local list中,此時是無需加鎖的,然后檢查隊列中元素

的總數,如果發現總數超過一個閥值,則將local list中的所有元素一次性提交到share list中,此時需要加鎖,share list中的元素是對全局可見的。

當讀者執行pop操作時,首先從檢查自己的local list中是否有元素,如果有就返回一個,如果沒有則嘗試從share list中將所有元素同步到自己的local list

中.

local list和message queue的結構如下:

struct per_thread_struct
{
    list_node   next;
    struct double_link_node block;
    struct link_list *local_q;
    condition_t cond;
};

struct mq
{
    uint32_t           push_size;
    pthread_key_t      t_key;
    mutex_t            mtx;
    struct double_link blocks;
    struct link_list  *share_list;
    struct link_list  *local_lists;

};

對於push操作,提供了兩個接口:

void mq_push(mq_t,struct list_node*);
void mq_push_now(mq_t,struct list_node*);

mq_push將元素插入local list但只有當local list中的元素到達一定閥值時才會執行提交操作mq_sync_push.

而mq_push_now將元素插入local list之后馬上就會執行提交操作.

然后還有一個問題,如果local list中的元素較長時間內都達不到閥值,會導致消息傳遞的延時,所以提供了mq_force_sync函數,此函數的作用是

強制將執行一次提交操作,將local list中的所有元素提交到share list中去。producer線程可在其主循環內以固定的頻率執行mq_force_sync,將一個

時間循環內剩余未被提交的消息提交出去.

下面貼下測試代碼:

#include <stdio.h>
#include <stdlib.h>
#include "KendyNet.h"
#include "thread.h"
#include "SocketWrapper.h"
#include "atomic.h"
#include "SysTime.h"
#include "mq.h"

list_node *node_list1[5];
list_node *node_list2[5];
mq_t mq1;

void *Routine1(void *arg)
{
    int j = 0;
    for( ; ; )
    {
        int i = 0;
        for(; i < 10000000; ++i)
        {
            mq_push(mq1,&node_list1[j][i]);
        }
        mq_force_sync(mq1);
        j = (j + 1)%5; 
        sleepms(100);

    }
}

void *Routine3(void *arg)
{
    int j = 0;
    for( ; ; )
    {
        int i = 0;
        for(; i < 10000000; ++i)
        {
            mq_push(mq1,&node_list2[j][i]);
        }
        mq_force_sync(mq1);
        j = (j + 1)%5; 
        sleepms(100);

    }
}

void *Routine2(void *arg)
{
    uint64_t count = 0;
    uint32_t tick = GetCurrentMs();
    for( ; ; )
    {
        list_node *n = mq_pop(mq1,50);
        if(n)
        {
            ++count;
        }
        uint32_t now = GetCurrentMs();
        if(now - tick > 1000)
        {
            printf("recv:%d\n",(count*1000)/(now-tick));
            tick = now;
            count = 0;
        }
    }
}


int main()
{
    int i = 0;
    for( ; i < 5; ++i)
    {
        node_list1[i] = calloc(10000000,sizeof(list_node));
        node_list2[i] = calloc(10000000,sizeof(list_node));
    }
    mq1 = create_mq(4096);
    init_system_time(10);
    thread_t t1 = create_thread(0);
    start_run(t1,Routine1,NULL);

    thread_t t3 = create_thread(0);
    start_run(t3,Routine3,NULL);    

    thread_t t2 = create_thread(0);
    start_run(t2,Routine2,NULL);

    getchar();

    return 0;
}

因為主要是測試mq的效率,所以預先生成了1億個消息,分為兩個寫者一個讀者,兩個寫者循環不斷的發消息,每發送1000W休眠一小會.

讀者僅僅是從mq中pop一個消息出來,然后更新統計值.在我的i5 2.93雙核台式機上運行rhel 6虛擬機,每秒pop出來的消息數量大概在8000W上下。

這個數據足已滿足任何高性能的應用需求了.

https://github.com/sniperHW/KendyNet/blob/master/src/kn_msgque.c


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM