單線程寫多線程讀安全的結構體


大型網絡游戲服務器的邏輯大多采用單線程設計,典型的就是一個線程處理一個區域(地圖),跨區域通過跳轉實現,這樣,不同區域的對象在邏輯上是不發生交互的。

這樣在一台服務器上開啟N個線程就可以處理N個區域。但一個線程處理一個區域畢竟有其瓶頸,如果一個區域內擠進了過多的玩家就會導致為那個區域服務的線程

不負重和,表現就是那個區域中的玩家發現操作響應變得不及時.

最近一段時間在思考如何能並行的利用多進程多機器為同一片區域服務,這樣可以通過多開幾個進程提升單區域的承載量。一種方式是類似bigworld全分布式設計,

同一區域中的對象可以分布在N個進程/機器中。以一個玩家為例,在一個進程中其是主動對象,所有對這個玩家對象的屬性操作都必須在主動對象上執行.在其它進

程中的對象要跟這個玩家交互,例如戰斗,所以這個玩家對象必須在其主動對象所在進程以外有副本對象,這樣在另外一個進程中的對象可以根據這個玩家的副本

對象的屬性值計算戰斗傷害,抵抗等等,然后將屬性變更請求發回給玩家所在的主動對象,執行實際的屬性變更.以這種方式,主動對象的屬性變更必須通知到所有

的副本對象。由於主動對象的屬性變更后需要通知所有的副本對象,當這個區域中對象數量變得很大時,屬性變更帶來的通信量將會非常大.

現在換個思路,結合多線程和多進程,在一個進程中啟動多個線程,主動對象位於一個線程中,屬性數據在多個線程中可見,只有主動對象所在線程能修改那個對象

的屬性,其它線程只能讀取,這樣,同一個進程中的多個線程就免除了屬性同步的需要.

對於32/64位的基本屬性,其讀寫本身就是原子的,所以可以安全的實現一個線程寫,N個線程讀.但對於一些結構型的屬性,最典型的就是坐標,由x,y,z三個分量

構成,對其讀/寫都不是原子的.為了實現原子的讀寫坐標,最簡單的做法就是在SetPos/GetPos中加鎖。但再想一想,我們要讀取的只是一份完整,正確的坐標數據,

 

卻可以容忍其不一定是最新的數據。所以,下面實現了一個無鎖的算法實現安全的對結構體的1寫N讀。

#include "atomic.h"
volatile int get_count;
volatile int set_count;
volatile int miss_count;

struct atomic_st
{
    volatile int32_t version;
    char data[];
};

struct atomic_type
{
    uint32_t g_version;
    int32_t index;
    volatile struct atomic_st *ptr;
    int32_t data_size;
    struct atomic_st* array[2];    
};

struct atomic_type *create_atomic_type(uint32_t size);
void destroy_atomic_type(struct atomic_type **_at);


#define GET_ATOMIC_ST(NAME,TYPE)\
TYPE NAME(struct atomic_type *at)\
{\
    TYPE ret;\
    while(1)\
    {\
        struct atomic_st *ptr_p = (struct atomic_st *)at->ptr;\
        int save_version = ptr_p->version;\
        if(ptr_p == at->ptr && save_version == ptr_p->version)\
        {\
            memcpy(ret.base.data,ptr_p->data,at->data_size);\
            __asm__ volatile("" : : : "memory");\
            if(ptr_p == at->ptr && save_version == ptr_p->version)\
                break;\
            ATOMIC_INCREASE(&miss_count);\
        }\
        else\
            ATOMIC_INCREASE(&miss_count);\
    }\
    ATOMIC_INCREASE(&get_count);\
    return ret;\
}

#define SET_ATOMIC_ST(NAME,TYPE)\
void NAME(struct atomic_type *at,TYPE p)\
{\
    struct atomic_st *new_p = at->array[at->index];\
    at->index = (at->index + 1)%2;\
    memcpy(new_p->data,p.base.data,at->data_size);\
    __asm__ volatile("" : : : "memory");\
    new_p->version = ++at->g_version;\
    __asm__ volatile("" : : : "memory");\
    at->ptr = new_p;\
    ATOMIC_INCREASE(&set_count);\
}
#include "util/thread.h"
#include "util/SysTime.h"
#include "util/atomic.h"
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "util/sync.h"
#include "util/atomic_st.h"

struct point
{
    struct atomic_st base;
    volatile int x;
    volatile int y;
    volatile int z;
};

GET_ATOMIC_ST(GetPoint,struct point);    
SET_ATOMIC_ST(SetPoint,struct point);    
    
struct atomic_type *g_points[1000];

void *SetRotine(void *arg)
{
    int idx = 0;
    int pos = 0;
    while(1)
    {
        struct point p;
        ++pos;
        p.x = p.y = p.z = pos+1;
        SetPoint(g_points[idx],p);
        idx = (idx + 1)%100;
    }
}

void *GetRoutine(void *arg)
{
    int idx = 0;
    while(1)
    {
        struct point ret = GetPoint(g_points[idx]);
        if(ret.x != ret.y || ret.x != ret.z || ret.y != ret.z)
        {
            printf("%d,%d,%d\n",ret.x,ret.y,ret.z);
            assert(0);
        }            
        idx = (idx + 1)%100;
    }
}

int main()
{
    struct point p;
    p.x = p.y = p.z = 1;
    int i = 0;
    for(; i < 1000; ++i)
    {
        g_points[i] = create_atomic_type(sizeof(p));
        SetPoint(g_points[i],p);
    }
    thread_t t1 = CREATE_THREAD_RUN(1,SetRotine,NULL);
    thread_t t2 = CREATE_THREAD_RUN(1,GetRoutine,(void*)1);    
    thread_t t3 = CREATE_THREAD_RUN(1,GetRoutine,(void*)1);    
    thread_t t4 = CREATE_THREAD_RUN(1,GetRoutine,(void*)1);    
    uint32_t tick = GetSystemMs();
    while(1)
    {
        uint32_t new_tick = GetSystemMs();
        if(new_tick - tick >= 1000)
        {
            printf("get:%d,set:%d,miss:%d\n",get_count,set_count,miss_count);
            get_count = set_count = miss_count = 0;
            tick = new_tick;
        }
        sleepms(50);
    }
}

 

 

上面是測試代碼,開啟1個寫線程和3個讀線程,對一個atomic_type進行爭搶測試,下面是測試數據的對比,先是無鎖方式的,然后是加鎖方式:

get:32231360,set:8129332,miss:4922677
get:30698439,set:7218725,miss:5229885
get:30248904,set:7256191,miss:5270275
get:30127294,set:7302881,miss:5312710
get:30450684,set:7325376,miss:5291387
get:30602374,set:7210568,miss:5226397
get:30542229,set:7231159,miss:5212140

get:6829928,set:1897596,miss:0
get:7005253,set:1897336,miss:0
get:7037773,set:1893310,miss:0
get:7121759,set:1907072,miss:0
get:7136176,set:1896144,miss:0
get:7118790,set:1914569,miss:0
get:7096869,set:1913391,miss:0

可以看到如果讀寫線程都在爭搶同一個atomic_type,無鎖方式比加鎖方式快了4倍左右.

下面是對100個atomic_type爭搶:

get:55162216,set:11529743,miss:199049
get:56026601,set:11259984,miss:205584
get:57597092,set:11386090,miss:213338
get:57144449,set:11237366,miss:208721
get:56939791,set:11119571,miss:209850
get:56983208,set:11180208,miss:209922
get:56720621,set:11169635,miss:208338

get:17567303,set:5803621,miss:0
get:17742257,set:5636563,miss:0
get:17702530,set:5621941,miss:0
get:17179876,set:5492159,miss:0
get:16825500,set:5371242,miss:0
get:17936650,set:5715384,miss:0
get:17912971,set:5743810,miss:0
get:17050807,set:5420599,miss:0

加鎖方式讀的效率只有無鎖方式的1/3左右,寫效率大概是無鎖方式的1/2.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM