為了在各線程之間高效的傳遞消息,必須設計一種高效率的消息隊列,傳統的做法是mutex加queue,這種做法在每次執行push和pop時都要加鎖,
效率相對較低。其次還有使用循環隊列,可以做到完全無鎖,但只能實現1:1的消息傳遞。還有一些lock-free隊列的實現,但基於其實現的相對復雜
性,我不打算使用。
我的隊列設計是使用tls維護一個local list,每個線程執行push時,首先將元素放入屬於本線程的local list中,此時是無需加鎖的,然后檢查隊列中元素
的總數,如果發現總數超過一個閥值,則將local list中的所有元素一次性提交到share list中,此時需要加鎖,share list中的元素是對全局可見的。
當讀者執行pop操作時,首先從檢查自己的local list中是否有元素,如果有就返回一個,如果沒有則嘗試從share list中將所有元素同步到自己的local list
中.
local list和message queue的結構如下:
struct per_thread_struct { list_node next; struct double_link_node block; struct link_list *local_q; condition_t cond; }; struct mq { uint32_t push_size; pthread_key_t t_key; mutex_t mtx; struct double_link blocks; struct link_list *share_list; struct link_list *local_lists; };
對於push操作,提供了兩個接口:
void mq_push(mq_t,struct list_node*); void mq_push_now(mq_t,struct list_node*);
mq_push將元素插入local list但只有當local list中的元素到達一定閥值時才會執行提交操作mq_sync_push.
而mq_push_now將元素插入local list之后馬上就會執行提交操作.
然后還有一個問題,如果local list中的元素較長時間內都達不到閥值,會導致消息傳遞的延時,所以提供了mq_force_sync函數,此函數的作用是
強制將執行一次提交操作,將local list中的所有元素提交到share list中去。producer線程可在其主循環內以固定的頻率執行mq_force_sync,將一個
時間循環內剩余未被提交的消息提交出去.
下面貼下測試代碼:
#include <stdio.h> #include <stdlib.h> #include "KendyNet.h" #include "thread.h" #include "SocketWrapper.h" #include "atomic.h" #include "SysTime.h" #include "mq.h" list_node *node_list1[5]; list_node *node_list2[5]; mq_t mq1; void *Routine1(void *arg) { int j = 0; for( ; ; ) { int i = 0; for(; i < 10000000; ++i) { mq_push(mq1,&node_list1[j][i]); } mq_force_sync(mq1); j = (j + 1)%5; sleepms(100); } } void *Routine3(void *arg) { int j = 0; for( ; ; ) { int i = 0; for(; i < 10000000; ++i) { mq_push(mq1,&node_list2[j][i]); } mq_force_sync(mq1); j = (j + 1)%5; sleepms(100); } } void *Routine2(void *arg) { uint64_t count = 0; uint32_t tick = GetCurrentMs(); for( ; ; ) { list_node *n = mq_pop(mq1,50); if(n) { ++count; } uint32_t now = GetCurrentMs(); if(now - tick > 1000) { printf("recv:%d\n",(count*1000)/(now-tick)); tick = now; count = 0; } } } int main() { int i = 0; for( ; i < 5; ++i) { node_list1[i] = calloc(10000000,sizeof(list_node)); node_list2[i] = calloc(10000000,sizeof(list_node)); } mq1 = create_mq(4096); init_system_time(10); thread_t t1 = create_thread(0); start_run(t1,Routine1,NULL); thread_t t3 = create_thread(0); start_run(t3,Routine3,NULL); thread_t t2 = create_thread(0); start_run(t2,Routine2,NULL); getchar(); return 0; }
因為主要是測試mq的效率,所以預先生成了1億個消息,分為兩個寫者一個讀者,兩個寫者循環不斷的發消息,每發送1000W休眠一小會.
讀者僅僅是從mq中pop一個消息出來,然后更新統計值.在我的i5 2.93雙核台式機上運行rhel 6虛擬機,每秒pop出來的消息數量大概在8000W上下。
這個數據足已滿足任何高性能的應用需求了.
https://github.com/sniperHW/KendyNet/blob/master/src/kn_msgque.c
