(二): 基於ZeroMQ的實時通訊平台


  基於ZeroMQ的實時通訊平台

  上篇:C++分布式實時應用框架 (Cpp Distributed Real-time Application Framework)----(一):整體介紹

 

版權聲明:本文版權及所用技術歸屬smartguys團隊所有,對於抄襲,非經同意轉載等行為保留法律追究的權利!

 

  通訊平台作為C++分布式實時應用框架(Cpp Distributed Real-time Application Framework)的最核心模塊,承擔了分布式實時框架的基礎通訊功能。通訊平台框架具備了基於Reactor模式的網絡通訊能力,並且依賴於ZeroMQ庫,因此支持非持久化的message queue的功能。基於配置文件來自動建立鏈接關系的功能,可以和狀態中心一起配合,實現無需重啟節點的動態擴容縮容等功能。強大的實時監控能力,可以實時上報每個通訊子節點的TPS和時延等關鍵性能數據。管控業務進程的能力,業務進程的心跳檢測,故障時自動重啟、保證系統正常運行。完善的平台工具,可以通過通訊平台向業務進程發送各種命令,如:調整日志級別,刷新業務參數,啟停業務進程等等。下面將逐一介紹通訊平台的功能細節。

  一、根據配置文件自動建立通訊鏈接拓撲關系

  常見的分布式系統通常將進程間、節點間的各種通訊關系寫死在業務代碼中,這是導致代碼復雜難以理解的原因。我們創新地將所有的通訊關系提取到AppInit.json配置文件中,業務代碼中不再包含任何與通訊連接相關的內容,使業務代碼可以更專注於業務處理,而不用分心於復雜的分布式節點通訊當中。下面我們將帶大家看下圖所示通訊關系的配置。

 

  OLC作為數據分發節點,給多個業務處理節點分發消息。業務處理節點內部由OCDis接收外部消息,轉發給內部的OCPro業務處理進程,並負責處理完后的回包。

OLC配置部分:

   "OLC" : {
      "AUTO_START" : "YES",
      "ENDPOINTS" : [
         {  // 用於與SmartMonitor建立心跳
            "name" : "MonitorSUB",   
            "zmq_socket_action" : "CONNECT",  // ZMQ的連接模式
            "zmq_socket_type" : "ZMQ_SUB"     // ZMQ的通訊模式
         },
         { // 下發消息給OCDis,這邊存在轉發功能,支持業務實現按條件轉發
            "downstream" : [ "OCDis2OLC"],
            "name" : "NE2OLC",                // 根據這個名字在業務代碼中實現轉發
            "zmq_socket_action" : "BIND",
            "zmq_socket_type" : "ZMQ_STREAM" 
         },
         { // OLC到OCDis的鏈路
            "name" : "OCDis2OLC",
            "statistics_on" : true,
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER"
         },
         { // OCDis回OLC的鏈路,之所以來去分開,主要用於實現優雅啟停功能(啟停節點保證不丟消息)
            "name" : "OCDis2OLC_Backway",
            "statistics_on" : true,
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER",
            "backway_pair" : "OCDis2OLC"
         },
         {  // 用於與SmartMonitor的命令消息鏈路
            "name" : "OLC2Monitor",
            "zmq_socket_action" : "CONNECT",
            "zmq_socket_type" : "ZMQ_DEALER"
         },
      ],
      "ENDPOINT_TO_MONITOR" : "OLC2Monitor",
      "INSTANCE_GROUP" : [
         {
            "instance_endpoints_address" : [
               {
                  "endpoint_name" : "NE2OLC",
                  "zmq_socket_address" : "tcp://*:6701"
               },
               {
                  "endpoint_name" : "OCDis2OLC",
                  "zmq_socket_address" : [
                     "tcp://127.0.0.1:7201"   // 跨機的IP地址與端口,配合狀態中心可實現自動管理,無需人工參與配置
                  ]
               },
               {
                  "endpoint_name" : "OCDis2OLC_Backway",
                  "zmq_socket_address" : [
                     "tcp://127.0.0.1:7202"
                  ]
               },
               {
                  "endpoint_name" : "OLC2Monitor",
                  "zmq_socket_address" : "ipc://Monitor2Business_IPC"
               },
               {
                  "endpoint_name" : "MonitorSUB",
                  "zmq_socket_address" : "ipc://MonitorPUB"
               }
            ],
            "instance_group_name" : "1"
         }
      ]
   },

 OLC程序:

static const char * ENDPOINT_NE2OLC = "NE2OLC";
static const char * ENDPOINT_OLC2OCDIS = "OCDis2OLC";
static const char * ENDPOINT_MONITORSUB = "MonitorSUB";

int main(int argc, char * argv[]) {

    SmartUtilities::Daemonize();
    OLCProxyServer server(argc, argv);

    if (!server.Initialize(logger))
        return -1;
  
// OLC與OCDis的消息處理 server.SetCallbackOnReceivingMessage(ENDPOINT_OLC2OCDIS, bind(&OLCProxyServer::ReceiveFromOCDis, &server, _1, _2, _3));

  // OLC與SmartMonitor的消息處理 server.SetCallbackOnReceivingMessage(ENDPOINT_MONITORSUB, bind(&OLCProxyServer::ReceiveFromMonitorSUB, &server, _1, _2, _3));
  // 解析消息包實現業務功能 server.SetPacketParserFunction(ENDPOINT_NE2OLC, bind(&OLCProxyServer::ParseStreamCCR, &server, _1, _2, _3));
  // 設置消息轉發具體規則 server.SetDownstreamSelector(ENDPOINT_NE2OLC, bind(&OLCProxyServer::StreamSelector, &server, _1, _2)); server.Run(); return 0; }

  二、在線更新鏈接拓撲能力

  通訊平台支持在線重新讀取更新的配置文件,更新網絡拓撲,自動建立新鏈接、斷開舊鏈接的能力。配合狀態中心可以實現無需重啟節點的動態擴容縮容等功能。

  

  三、SmartMonitor進程監控管理業務進程與SmartTool工具進程

  業務進程可以跟SmartMonitor建立通訊聯系,SmartMonitor可以檢測業務進程的心跳,以保證業務進程的可用。SmartMonitor通過AppCount.json來管理節點業務進程,實現統一啟停等功能。

{
  "OCPro": {
    "IN":  2,      // 業務進程可以有不同的種類,后面代表進程數
    "PS":  3,
    "SMS": 4,
  },
  "OCDis": 3,
  "SERVER_TYPE":"OCS"  // 節點的類型
}

  還可以通過SmartTool工具進程,來給業務進程發送各種命令,如:調整日志級別,刷新業務參數,啟停業務進程等等。

 

     1. 啟動平台

      SmartMonitor

 

      2. 停平台

      SmartTool stop all

      停指定進程(停止后會被SmartMonitor重新拉起)

      SmartTool stop OCPro 停止所有業務的OCPro進程

      SmartTool stop  OCPro.IN 停止IN業務的OCPro進程

      SmartTool stop 4829 停止PID為4829的進程

 

      3. 調整應用層、框架層日志級別

      其中,日志級別為error,warn,info,debug,trace

      SmartTool log 進程名 level=日志級別,flush=日志級別

      比如: SmartTool log  OCPro level=debug,flush=debug

  四、通訊平台性能數據 

 

 

 進程Z負載控制消息流量,進程A負責發、收消息,統計時延數據。進程B收到消息后負責回消息。

 

 性能瓶頸主要在A機,既要負責收發包,又要統計時延數據,還要控制流量。

 

未完待續...

 

 技術交流合作QQ群:436466587 歡迎討論交流


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM