異步網絡消息處理框架


最近一段時間將原來寫的kendynet網絡框架重寫了大部分的代碼,讓提供的接口更清晰,對用戶更友好。

整個框架的架構分層3層:

1)單線程,基於原始數據流的網絡接口,在這一層上,沒有提供封包的處理,定時器事件等等。使用者可以在此之上按自己的需求做進一步的封裝。

2)單線程,提供connection,封包處理,接收發送超時處理。

3)網絡邏輯分離的異步網絡框架,抽象出三個主要的類型:asynnet_t,sock_ident和msgdisp_t.

asynnet_t:網絡處理引擎,使用者創建實例的時候可以傳入pollercount參數,其中每一個poller都會在單獨的線程中運行.

sock_ident:邏輯層操作的套接口封裝,可以安全的在多線程環境下使用.

msgdisp_t:消息分離器,每個分離器有一個對應的消息隊列用於接收從網絡線程傳遞過來的消息.

msgdisp_t提供了兩種使用模式:

第一種:典型的線程池模式。在這種模式下,可以創建一個消息分離器,多個邏輯線程對這個消息分離器調用

msg_loop.

第二種:共用網絡層模式。在一個進程中啟動N個線程,每個線程運行一個不同服務,所有這些服務共用網絡通信層.

在這種情況下,網絡消息需要路由到正確的服務那里.可以每個線程都創建一個消息分離器,各線程在自己的消息分離器上

調用msg_loop處理只屬於自己的消息.

下面是一個異步網絡服務器的示例:

#include <stdio.h>
#include <stdlib.h>
#include "core/msgdisp.h"
#include "testcommon.h"

uint32_t recvsize = 0;
uint32_t recvcount = 0;

///int32_t asynnet_bind(msgdisp_t disp,sock_ident sock,void *ud,int8_t raw,uint32_t send_timeout,uint32_t recv_timeout)
void asynconnect(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port)
{
    printf("asynconnect\n");
    disp->bind(disp,sock,1,0,30*1000);
}

void asynconnected(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port)
{
    printf("asynconnected\n");
   ++client_count;
}

void asyndisconnected(msgdisp_t disp,sock_ident sock,const char *ip,int32_t port,uint32_t err)
{
    --client_count;
}


int32_t asynprocesspacket(msgdisp_t disp,sock_ident sock,rpacket_t rpk)
{
    recvsize += rpk_len(rpk);
    recvcount++;
    asyn_send(sock,wpk_create_by_other((struct packet*)rpk));
    return 1;
}


void asynconnectfailed(msgdisp_t disp,const char *ip,int32_t port,uint32_t reason)
{

}


int main(int argc,char **argv)
{
    setup_signal_handler();
    InitNetSystem();
    asynnet_t asynet = asynnet_new(1);
    msgdisp_t  disp = new_msgdisp(asynet,
                                  asynconnect,
                                  asynconnected,
                                  asyndisconnected,
                                  asynprocesspacket,
                                  asynconnectfailed);
    int32_t err = 0;
    disp->listen(disp,argv[1],atoi(argv[2]),&err);
    uint32_t tick,now;
    tick = now = GetSystemMs();
    while(!stop){
        msg_loop(disp,50);
        now = GetSystemMs();
        if(now - tick > 1000)
        {
            uint32_t elapse = now-tick;
            recvsize = (recvsize/elapse)/1000;
            printf("client_count:%d,recvsize:%d,recvcount:%d\n",client_count,recvsize,recvcount);
            tick = now;
            packet_send_count = 0;
            recvcount = 0;
            recvsize = 0;
        }
    }
    CleanNetSystem();
    return 0;
}

項目代碼在:https://github.com/sniperHW/luanet

目前只實現了對linux,tcp的網絡支持,后續將會先完善這部分代碼,並在此之上提供基於user level thread的RPC支持.

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM