基於共享內存和多重哈希實現分布式緩存系統


寫在前面:
前三篇文字<<基於MQTT協議談談物聯網開發-華佗寫代碼>>,<<基於MQTT協議實現Broker-華佗寫代碼>>,<<基於WebSocket實現Broker-華佗寫代碼>>主要敘述了MQTT協議的編解碼以及基於MQTT協議的一些常見應用場景,並以一個簡單的消息推送系統作為例子具體闡述了Mqtt Broker部分的實現,具體包括通過Mqtt,以及Mqtt Over WebSocket的方式打通各個端的數據通信,本篇文字繼續以消息推送系統作為例子,闡述其狀態系統的實現,具體的消息推送系統架構草圖,需要參考回看第一篇文字,不再贅述.

 

1.緩存技術:

常見緩存技術主要有redis,memcached等,甚至也可使用mongodb或者mq,leveldb或者rocksdb等作為緩存,不同技術都有其優缺點,以redis為例,其可以通過代理和集群,實現分布式緩存系統,可以根據業務需要選擇是否持久化,支持多種數據結構以及事務,但其服務器內存會是瓶頸,以及在處理一些復雜數據結構時,比如緩存用戶登錄狀態,包括用戶的Appid,設備token,是否切換為后台等等,這就可能需要不停地序列化和反序列化,比如redis的hash和sortedset結構等.再比如查找時,以sortedset為例,其底層是基於跳躍表來實現的,查找時間復雜度O(lgn),存儲的數據越多,平均查詢時間也會越長.所以,需要根據自身業務需求來選用合適技術,作為例子,這里基於共享內存和多重哈希來實現這樣一個分布式緩存系統.

 

2.狀態系統具體實現:

2.1狀態系統架構草圖:

 

 

2.2狀態系統實現細節說明:

(1)整體架構主要包括負載均衡器,服務發現,MQ隊列,緩存集群主從節點,DB;

(2)不同集群節點跨機房或者跨IDC部署,MQ隊列用於不同集群之間寫數據同步;

(3)每個集群節點包括一主多從節點,圖示以一主一從為例示意,主和從節點都包括gm模塊形成鏈表結構進行數據同步,具體可參考RabbitMQ鏡像隊列相關原理;

(4)主從節點都包括shm模塊,即共享內存模塊,用於存儲狀態數據;

(5)基於哈希結構存儲數據,以達O(1)時間復雜度,通過多重哈希和最老節點覆蓋寫機制,解決哈希沖突;

(6)用戶登錄存儲狀態數據,登出刪除狀態數據,每個用戶狀態都同時存儲其存取時間戳,用於過期淘汰或者擴容時做相關判斷;

(7)以用戶的Appid,Token等為例進行數據緩存;

(8)DB根據業務需要決定是否分庫分表;

(9)其他...

 

2.3狀態系統代碼實現(shm模塊,跟之前幾篇文字基於golang編寫不同,這里基於C/C++編寫):

2.3.1定義用戶狀態數據結構體,以及哈希結構體頭部,如下:

const std::string GROCACHE_SHMFILE= "/shm/HashTableShmGroCache"; //存儲的用戶數量,作為例子,這里設置為100,實際應用需要根據自身應用的用戶數量合理設置 const uint32_t GROCACHE_HASHNODE_CNT = 100;
//計算每個用戶狀態節點占用的字節數: 4+8+4+4+4+4+4+4+4+4+32 const uint32_t GROCACHE_HASHNODE_SIZE = 76;
const uint32_t GROCACHE_MAXROW_CNT = 4;
//定義用戶狀態數據結構體
#pragma pack (1) struct GroCacheNode { uint32_t hashKey; //計算哈希值 uint64_t tinyid; uint32_t appid; uint32_t settoken; uint32_t background; uint32_t sdkappid; uint32_t busiid; uint32_t unreadcnt; uint32_t accessTime; //數據存取時間 uint32_t tokenlen; uint8_t token[0]; };
//定義哈希結構體頭部
#pragma pack (1) struct GroCacheHeader { uint32_t hashMemSize; // hashMemSize uint32_t hashNodeSize; // size of each hash node uint32_t hashNodeCnt; // num of all hash node uint32_t rowCnt; // row`s num uint32_t rowsNodeCnt[GROCACHE_MAXROW_CNT]; // node num of each row's uint32_t rowsStartNode[GROCACHE_MAXROW_CNT];// each row's starting node index uint32_t maxUsedRow; // max row index (start at 0) in use uint32_t usedCnt; };

 

2.3.2根據預估用戶數量開辟共享內存大小,並計算多重哈希每一級存儲的節點數量比例,第一級哈希數組會存儲將近70%的數據節點:

//開辟並初始化共享內存,保存共享內存元數據
bool
MultiHashTable::initGC() { bool iRet = true; do { void* pShm = NULL; string shmFileName = GROCACHE_SHMFILE;
   //計算需要分配的共享內存大小,以字節為單位
uint32_t hashMemSize = sizeof(GroCacheHeader) + (GROCACHE_HASHNODE_CNT * GROCACHE_HASHNODE_SIZE); bool exist = false;
     //真正申請開辟共享內存
if(false == ShareMemory::Instance()->create(&pShm, shmFileName, hashMemSize, exist)) { iRet = false; break; } _grocacheHeader = (GroCacheHeader*)pShm; ...
  
if(false == exist) { printf("share memory first create, hashtable need initial\n"); memset(_grocacheHeader, 0, sizeof(*_grocacheHeader)); _grocacheHeader->hashMemSize = hashMemSize; _grocacheHeader->hashNodeSize = GROCACHE_HASHNODE_SIZE; _grocacheHeader->rowCnt = GROCACHE_MAXROW_CNT;
       //通過CalcNodeCntForEachRow函數計算每一級哈希存儲的節點數量,以及每一級哈希的起始索引位置 _grocacheHeader
->hashNodeCnt = CalcNodeCntForEachRow(GROCACHE_MAXROW_CNT, GROCACHE_HASHNODE_CNT, _grocacheHeader->rowsNodeCnt, _grocacheHeader->rowsStartNode); _grocacheHeader->maxUsedRow = 0; _grocacheHeader->usedCnt = 0; } //header+ht _grocacheHt = _grocacheHeader + 1; printf("_grocacheHeader hashMemSize:%u,hashNodeSize:%u,hashNodeCnt:%u,maxUsedRow:%u\n", _grocacheHeader->hashMemSize,_grocacheHeader->hashNodeSize,_grocacheHeader->hashNodeCnt,_grocacheHeader->maxUsedRow); }while(false); return iRet; }

 

2.3.3開辟共享內存,不同系統共享內存大小有限制,根據需要決定是否修改相關系統配置:

bool ShareMemory::create(void** pShm, const std::string& shmFileName, const uint32_t& shmSize, bool& exist, bool reset) { exist = false; printf("create shmFileName:%s, shmSize:%u\n", shmFileName.c_str(), shmSize); bool iRet = false; do { uint32_t shmKey = 0; if(false == getShmKey(shmFileName, shmKey)) { printf("create getShmKey failed,shmFileName:%s\n", shmFileName.c_str()); break; } int shmFlag = 0666 | IPC_CREAT; int shmId = shmget(shmKey, 0, 0); if(-1 == shmId) { //no existed shm, need to create a new one
            printf("create a new share memory shmSize:%d\n",shmSize); shmId = shmget(shmKey, shmSize, shmFlag); if(-1 == shmId) { printf("create shmget failed errno=%d , errmsg=%s\n", errno, strerror(errno)); break; } } ...
     if(reset) { memset(*pShm, 0, shmSize); exist = false; } iRet = true; }while(false); return iRet; }

 

3.測試例子:

#include <stdio.h> #include "MultiHashTable.h" using namespace std; int main() {
   //初始化用戶狀態節點 GroCacheNode gcSetNode; gcSetNode.appid
= 666; gcSetNode.settoken = 1; gcSetNode.background = 1; gcSetNode.sdkappid = 999; gcSetNode.busiid = 9999; gcSetNode.unreadcnt = 0; string token = "user_device_token";   
  //模擬用戶登錄,存儲用戶狀態數據 uint64_t uin
= 888888888; uint32_t hashKey = 99; MultiHashTable::instance()->SetGroCache(hashKey, uin, gcSetNode, token);
//修改用戶推送消息未讀計數
bool iRet = false; uint32_t unreadcnt = 99; iRet = MultiHashTable::instance()->SetGroCacheUnReadCnt(hashKey, uin, unreadcnt); printf("[main]SetGroCacheUnReadCnt iRet:%d\n", iRet);
   //讀取用戶推送消息未讀計數 uint32_t getunreadcnt
= 0; iRet = MultiHashTable::instance()->GetGroCacheUnReadCnt(hashKey, uin, getunreadcnt); printf("[main]GetGroCacheUnReadCnt iRet:%d, getunreadcnt:%d\n", iRet, getunreadcnt); return 0; }

 

4.運行測試結果,如下圖所示,表明創建了共享內存文件,並計算了每一級哈希的節點數量,最后插入一條用戶狀態數據,並存取相關未讀計數值:

 

出於篇幅考慮,上述使用到的具體一些函數,如MaxPrime,CalcNodeCntForEachRow等,其具體實現就不一一列舉出來了,也有一些成員變量,用到了也不一一具體注釋了,主要通過架構圖和具體代碼關鍵路徑敘述實現的一些細節,闡明主要設計思路以及解決問題的主要矛盾,如有錯誤,懇請指出,轉載也請注明出處!!!

 

未完待續...

參考文字: RabbitMQ


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM