網游內存數據庫的設計(2)


續第一篇,前兩天對核心存儲做了些修改,以前只打算與關系數據庫的行與表做對應,value類型只能使array或list,

現在把7種基本類型也加入到value支持的類型中,以使得數據庫更通用.

當然,這都不是本文的核心,本篇主要介紹一個測試前端,以及測試的遠程調用協議.

先貼出測試前端的服務器代碼:

#include "netservice.h"
#include "msg_loop.h"
#include "datasocket.h"
#include "SysTime.h"
#include "db_protocal.h"

atomic_32_t wpacket_count = 0;
atomic_32_t rpacket_count = 0;
atomic_32_t buf_count = 0;

global_table_t gtb;
void server_process_packet(datasocket_t s,rpacket_t r)
{
    //執行操作並返回結果
    cache_protocal_t p;
    uint32_t coro_id = rpacket_read_uint32(r);
    uint8_t type = rpacket_read_uint8(r);
    switch(type)
    {
        case CACHE_GET:
            p = create_get();
            break;
        case CACHE_SET:
            p = create_set();
            break;
        case CACHE_DEL:
            p = create_del();
            break;            
    }
    wpacket_t ret = p->execute(gtb,r,coro_id);
    if(NULL != ret)
        data_send(s,ret);
    destroy_protocal(&p);
}

void process_new_connection(datasocket_t s)
{
    printf("w:%u,r:%u,b:%u\n",wpacket_count,rpacket_count,buf_count);
}

void process_connection_disconnect(datasocket_t s,int32_t reason)
{
    release_datasocket(&s);
    printf("w:%u,r:%u,b:%u\n",wpacket_count,rpacket_count,buf_count);
}

void process_send_block(datasocket_t s)
{
    //發送阻塞,直接關閉
    close_datasocket(s);
}


const char *ip;
uint32_t port;
int main(int argc,char **argv)
{
    init_net_service();
    ip = argv[1];
    port = atoi(argv[2]);
    netservice_t n = create_net_service(1);
    gtb = global_table_create(65536);
    
    int32_t i = 0;
    char key[64];
    for( ; i < 1000000; ++i)
    {
        basetype_t a = basetype_create_int32(i);
        snprintf(key,64,"test%d",i);
        a = global_table_insert(gtb,key,a,global_hash(key));
        if(!a)
            printf("error 1\n");
        basetype_release(&a);        
    }
    
    net_add_listener(n,ip,port);
    msg_loop_t m = create_msg_loop(server_process_packet,process_new_connection,process_connection_disconnect,process_send_block);
    while(1)
    {
        msg_loop_once(m,n,100);    
    }

    return 0;
}

前端的網絡模塊使用了在上一篇中介紹的網絡框架,啟動時先插入100W條32位整型的記錄,然后進入消息循環,不斷的處理從客戶端發過來的操作請求.

目前只添加了三個協議,分別是獲取:CACHE_GET;添加/修改:CACHE_SET;刪除:CACHE_DEL.

服務器處理協議並將結果返回給客戶端.

然后是測試客戶端:

#include "db_protocal.h"
#include "dbtype.h"
#include <stdio.h>
#include "SocketWrapper.h"
#include "SysTime.h"
#include "KendyNet.h"
#include "Connector.h"
#include "Connection.h"
#include "common_define.h"
#include "netservice.h"
#include "msg_loop.h"
#include "co_sche.h"

sche_t g_sche = NULL;
uint32_t call_count = 0;

atomic_32_t wpacket_count = 0;
atomic_32_t rpacket_count = 0;
atomic_32_t buf_count = 0;
datasocket_t db_s;

int8_t test_select(const char *key,int32_t i)
{
    coro_t co = get_current_coro();
    wpacket_t wpk = get_wpacket(64);
    wpacket_write_uint32(wpk,(int32_t)co);
    wpacket_write_uint8(wpk,CACHE_GET);//ÉèÖÃ
    wpacket_write_string(wpk,key);
    data_send(db_s,wpk);
    coro_block(co);
    int8_t ret = rpacket_read_uint8(co->rpc_response);
    rpacket_read_uint8(co->rpc_response);
    int32_t val = rpacket_read_uint32(co->rpc_response);
    if(val != i)
        printf("error\n");
    //printf("begin\n");
    rpacket_destroy(&co->rpc_response);
    //printf("end\n");
    return ret;
}

void *test_coro_fun2(void *arg)
{
    coro_t co = get_current_coro();
    while(1)
    {
        char key[64];
        int32_t i = rand()%1000000;
        snprintf(key,64,"test%d",100);        
        if(0 == test_select(key,100))
            ++call_count;
    }
}


void server_process_packet(datasocket_t s,rpacket_t r)
{
    coro_t co = (coro_t)rpacket_read_uint32(r);
    co->rpc_response = rpacket_create_by_rpacket(r);
    coro_wakeup(co);
}

void process_new_connection(datasocket_t s)
{
    printf("connect server\n");
    db_s = s;
    g_sche = sche_create(20000,65536,NULL,NULL);
    int i = 0;
    for(; i < 20000; ++i)
    {
        sche_spawn(g_sche,test_coro_fun2,NULL);
    }
}

void process_connection_disconnect(datasocket_t s,int32_t reason)
{
    release_datasocket(&s);
}

void process_send_block(datasocket_t s)
{
    //·¢ËÍ×èÈû,Ö±½Ó¹Ø±Õ
    close_datasocket(s);
}

int main(int argc,char **argv)
{
    init_net_service();
    const char *ip = argv[1];
    uint32_t port = atoi(argv[2]);
    netservice_t n = create_net_service(1);
    net_connect(n,ip,port);
    msg_loop_t m = create_msg_loop(server_process_packet,process_new_connection,process_connection_disconnect,process_send_block);
    uint32_t tick = GetSystemMs();
    while(1)
    {
        msg_loop_once(m,n,1);
        uint32_t now = GetSystemMs();
        if(now - tick > 1000)
        {
            printf("call_count:%u\n",(call_count*1000)/(now-tick));
            tick = now;
            call_count = 0;
        }
        if(g_sche)
            sche_schedule(g_sche);            
    }
    return 0;
}

操作接口使用用戶級線程實現,以支持同步調用接口,用戶級線程發出請求后就阻塞自己,直到結果返回時才被喚醒:

關鍵部分在test_select,把自己的coro地址作為id打包到協議中,發往服務器,然后調用coro_block阻塞。服務器返回的數據包

中也帶了對應的coro_id,以通知客戶端的調度系統該喚醒哪個coro.coro被喚醒后從結果包中讀取操作結果和數據,返回給上層調用者.

從測試結果來看,啟動1W個coro的客戶端,每秒平均能執行50W次的操作。對於一個萬人在線的MMORPG游戲來說應該已經是夠用的了。

如果還是不夠,可以通過表空間的划分,啟動多個內存數據庫進程來服務請求。

 項目地址:https://github.com/sniperHW/kendylib/tree/master/dbcache


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM