引言:前面的一系列文章都在說了事件模型,也就是簡單的做一個介紹,然后貼出了一些代碼作為Demo,上次說到了在反應堆中的超時管理。今天就來說說關於利用最小堆來管理超時的問題。
NOTICE:判斷一個事件是否超時的方法是, 事件超時時間 減 當前時間 ,如果大於零,說明沒有超時,如果小於零,說明該事件超時了。
一般做法是怎樣來管理所有的超時事件呢?以前的用法都是利用鏈表來保存所有的超時事件,輪訓查看是否事件超時,若超時就采取相應的措施,比如移除事件等。后來的Nginx和libevent采取了更好的措施,Nginx是利用紅黑樹來管理,而libevent是利用最小堆來管理。
今天的主題是最小堆,所以下面的內容都是圍繞最小堆來展開。
這里說說最小堆的好處,然后接着說明為什么要利用最小堆來管理超時事件。
最小堆的好處:
(1)最小堆的設計簡單,易於實現
(2)插入和刪除操作都是在log(n)基礎上完成
(3)獲取最小值是O(1)
為什么要利用最小堆:
大家都知道在IO多路復用中比如 select() 和 epoll() 都有一個超時的最大時間,也就是說,如果沒有監聽到事件,最大的阻塞時間就是這參數。
當我們在Reactor中加入了一個超時事件的時候,我們就利用這個最小堆中的根節點的時間減去當前時間作為select/epoll超時的最大值。所以,當select/epoll返回的時候得到函數執行的消耗的時間,查看最小堆的根是否超時(NOTICE),如果沒有超時,我們繼續正常步驟,如果超時就采取相應措施,比如超時事件的回調函數等,然后繼續查看新的堆結構的根節點,直到處理不超時的情況出現,因為堆是自適應的,所以根部必然是值最小的,如果這個事件沒有超時,就可以停止超時事件的處理 了。
所以總體來看,每次都會找到最小堆中最小的元素作為比較,事件復雜度是O(1)
最小堆的性質:
所有父節點總是小於或等於所有的子節點(遞歸定義),看做二叉樹的話是完全二叉樹
看看最小堆的實現:
結構表示:
typedef struct heap_node_s { int index; int key ; }mc_minheap_node_t ; typedef struct min_heap_s { mc_minheap_node_t *node_list ; int headindex ; int lastindex ; size_t max_num ; }mc_minheap_t;
幾個宏:
#define MAX_INT 0x0fffffff #define PARENT(i) ((i)/(2)) #define LEFTCHILD(i) ((i)*(2)) #define RIGHTCHILD(i) (((i)*(2))+(1))
插入:
每次插入到堆的末尾,然后逐層比較,直到大於父節點。
int mc_minheap_insert( mc_minheap_t *mh ,int key ) { if( mh == NULL ) return -1; int lastindex = mh->lastindex; mh->node_list[lastindex].key = key ; mh->node_list[lastindex].index = lastindex ; int tindex = lastindex ; //heap top if( lastindex == 1 ) { mh->lastindex++ ; return 1; } else { if( lastindex >= mh->max_num -1 ) return -1; while( tindex != 1 ) { if( key < mh->node_list[PARENT( tindex )].key ) { swap_node( &(mh->node_list[tindex]), &(mh->node_list[PARENT( tindex )]) ); } tindex = PARENT( tindex ); } } mh->lastindex++ ; return 1; }
刪除:
刪除的時候,把需要刪除的節點與堆的最后一個節點交換,刪除這個節點。然后交換后的這個節點與子節點中較小的一個比較,如果大於它就交換,如果小於就結束,直到沒有子節點。
int mc_minheap_rm( mc_minheap_t *mh ) { if( mh == NULL ) return -1; int ret = mh->node_list[1].key ; mh->node_list[1].key = MAX_INT ; mc_minheap_node_t * pnode = &(mh->node_list[1]) ; mc_minheap_node_t *minnode ; swap_node( pnode , &(mh->node_list[mh->lastindex])); int i = 1 ; for( ; i < mh->lastindex ; i++ ) { minnode = mh->node_list[LEFTCHILD(i)].key <= mh->node_list[RIGHTCHILD(i)].key ? &(mh->node_list[LEFTCHILD(i)]) : &(mh->node_list[RIGHTCHILD(i)]); if( i == mh->max_num -1 ) break ; if( minnode->key == MAX_INT ) break ; if( pnode.key > minnode.key ) swap_node( pnode , minnode ); else break; pnode = minnode ; } mh->lastindex-- ; return ret ; } int mc_minheap_rm_index( mc_minheap_t *mh ,int index ) { if( mh == NULL ) return -1; if( index > mh->lastindex ) return -1; int ret = mh->node_list[index].key ; mh->node_list[index].key = MAX_INT ; mc_minheap_node_t * pnode = &(mh->node_list[index]) ; mc_minheap_node_t *minnode ; swap_node( pnode , &(mh->node_list[mh->lastindex])); int i = index ; for( ; i < mh->lastindex ; i++ ) { minnode = mh->node_list[LEFTCHILD(i)].key <= mh->node_list[RIGHTCHILD(i)].key ? &(mh->node_list[LEFTCHILD(i)]) : &(mh->node_list[RIGHTCHILD(i)]); if( i == mh->max_num -1 ) break ; if( minnode->key == MAX_INT ) break ; if( pnode.key > minnode.key ) swap_node( pnode , minnode ); else break; pnode = minnode ; } mh->lastindex-- ; return ret ; }
初始化和交換節點:
mc_minheap_t * mc_minheap_ini(int nodenum ) { int i = 0 ; if( nodenum <= 0 ) return NULL; mc_minheap_t * mh = ( mc_minheap_t *)malloc( sizeof( mc_minheap_t )); if( mh == NULL ) { return NULL; } mh->node_list = (mc_minheap_node_t *)malloc( sizeof(mc_minheap_node_t )*nodenum ); mh->headindex = 1 ; mh->lastindex= 1 ; for( i = 0 ; i < nodenum ; i++ ) { mh->node_list[i].key = MAX_INT ; mh->node_list[i].index = i+1; } mh->max_num = nodenum ; return mh; } static int swap_node( mc_minheap_node_t * n1 , mc_minheap_node_t *n2 ) { if( n1 == NULL || n2 == NULL ) return -1; mc_minheap_node_t temp; temp.key = n1->key ; n1->key = n2->key; n2->key = temp.key ; }
總結:最小堆的設計方式比較簡單,但是功能不錯,有一個缺點就是初始化的時候需要固定大小的節點個數,如果超時事件過多,需要采取一定的措施來保證堆的代碼質量。或許可以采用多個對的方式,然后比較每一個堆的最小值,這樣也就是O(N)復雜度,N= 堆的個數,這里是我的一廂情願罷了..
文章如有錯誤請指正,我會在收到留言的第一時間修改。歡迎交流