前面的文章介紹了一個基於微線程的調度器框架,並測試了使用微線程實現遠程調用的效率。
本文將微線程和網絡事件框架結合起來,在微線程中處理所有的事件(網絡消息,用戶定時器事件),
這樣,在事件回調函數中可以放心的發起遠程調用而不用擔心阻塞整個線程,只要還有未被阻塞的
微線程,就可以切換到那個微線程上去執行,繼續等待在事件隊列上處理新到的事件.
typedef struct coronet { netservice_t nets; msg_loop_t msgl; sche_t coro_sche; uint32_t last_check_timer; TimingWheel_t timer_ms;//精度50ms TimingWheel_t timer_s; //精度1s TimingWheel_t timer_m; //精度1分鍾 }*coronet_t;
首先看下事件框架的定義,事件框架結構封裝了一個微線程調度器,一個網絡服務引擎,一個消息分發器,
還有3種不同精度的用戶定時器.
使用者創建coronet后調用coronet_init_net綁定事件回調函數,之后調用coronet_init_coro創建微線程池,
然后調用coronet_run就可以進入事件循環.
下面看下微線程的主處理函數,這個函數要作為coronet_init_coro的第三個參數傳入,跟線程的主函數有點類似,
微線程運行之后就會進入這個主函數,如下面的實現,這個主函數只是在一個循環中不斷的提取消息並執行,如果
沒有消息則會執行50ms的可被打斷休眠(如果網絡層有事件到達,休眠會被打斷).
void* uthread_main_function(void *arg) { coronet_t coron = (coronet_t)arg; while(0 == isstop) { //等待事件的到來並處理,如果沒有事件休眠50ms peek_msg(coron,50); } }
下面開看下peek_msg的實現:
void peek_msg(coronet_t coron,uint32_t ms) { assert(coron); assert(coron->nets); assert(coron->msgl); assert(coron->coro_sche); coro_t co = get_current_coro(); //首先檢查是否有超時的coro,如果有喚醒,被喚醒后的coro會被投入到active_list_1 check_time_out(coron->coro_sche,GetCurrentMs()); //查看active_list_1中是否有coro等待執行,如果有,優先先執行coro _sche_next_1(coron->coro_sche,co); //檢查用戶定時器,如果有超時事件會觸發一個消息並投遞到消息隊列中,后面的msg_loop_once會提取並執行 coronet_check_user_timer(coron); //等待消息的到來 msg_loop_once(coron->msgl,coron->nets,ms); }
這里先介紹下微線程調度器中的兩個激活隊列:active_list_1,active_list_2.激活隊列分成兩個優先級1為最高,2
為最低.其中2級隊列存放所有普通的激活隊列,例如微線程被創建之后,就會被投入到2級激活隊列中。1級優先隊列
存放的是優先級更高的微線程,例如rpc調用超時,sleep超時,還有收到rpc回應的微線程.調度器選擇下一個被處理
的微線程時首先會從1級隊列中提取,當1級隊列為空時才調度2級隊列中的微線程.
好了回到peek_msg,首先是檢查是否有超時的微線程,然后執行_sche_next_1,_sche_next_1會嘗試調度1級隊列中的
微線程,如果1級隊列中沒有可以調度的對象,則調用msg_loop_once繼續處理事件.
(在這里必須介紹下_sche_next_1的調度策略,它僅僅查看1級隊列中是否有可調度對象,如果有則調度運行,沒有立即返回,
這樣做大大減少了不必要的微線程切換)
下面再來看下rpc的處理:
void process_rpc_return(rpacket_t r) { coro_t co = get_current_coro(); coro_t co_wakeup = (coro_t)rpacket_read_uint32(r); if(rpk_check(co_wakeup,r)) { co_wakeup->rpc_response = r; co_wakeup->_goback = co; //直接跳過去執行co_wakeup set_current_coro(co_wakeup); uthread_switch(co->ut,co_wakeup->ut,co); } }
process_rpc_return在網絡包處理回調函數中被調用,當發現一個網絡包代表了rpc回應時,就可以調用process_rpc_return
來處理這個rpc回應.這里的處理是設置一些成員變量,然后直接切換到需要被喚醒的微線程的上下文中繼續執行.這里着重介紹
_goback成員,_goback成員被設置成從哪個微線程切換到目標微線程的。當目標微線程在執行的過程中,需要主動把執行權切換
出去的時候會首先切換到_goback所指向的微線程去.
項目地址如下:
https://github.com/sniperHW/kendylib/blob/master/netframework/coronet.h
