基於ZeroMQ的實時通訊平台
上篇:C++分布式實時應用框架 (Cpp Distributed Real-time Application Framework)----(一):整體介紹
版權聲明:本文版權及所用技術歸屬smartguys團隊所有,對於抄襲,非經同意轉載等行為保留法律追究的權利!
通訊平台作為C++分布式實時應用框架(Cpp Distributed Real-time Application Framework)的最核心模塊,承擔了分布式實時框架的基礎通訊功能。通訊平台框架具備了基於Reactor模式的網絡通訊能力,並且依賴於ZeroMQ庫,因此支持非持久化的message queue的功能。基於配置文件來自動建立鏈接關系的功能,可以和狀態中心一起配合,實現無需重啟節點的動態擴容縮容等功能。強大的實時監控能力,可以實時上報每個通訊子節點的TPS和時延等關鍵性能數據。管控業務進程的能力,業務進程的心跳檢測,故障時自動重啟、保證系統正常運行。完善的平台工具,可以通過通訊平台向業務進程發送各種命令,如:調整日志級別,刷新業務參數,啟停業務進程等等。下面將逐一介紹通訊平台的功能細節。
一、根據配置文件自動建立通訊鏈接拓撲關系
常見的分布式系統通常將進程間、節點間的各種通訊關系寫死在業務代碼中,這是導致代碼復雜難以理解的原因。我們創新地將所有的通訊關系提取到AppInit.json配置文件中,業務代碼中不再包含任何與通訊連接相關的內容,使業務代碼可以更專注於業務處理,而不用分心於復雜的分布式節點通訊當中。下面我們將帶大家看下圖所示通訊關系的配置。
OLC作為數據分發節點,給多個業務處理節點分發消息。業務處理節點內部由OCDis接收外部消息,轉發給內部的OCPro業務處理進程,並負責處理完后的回包。
OLC配置部分:
"OLC" : { "AUTO_START" : "YES", "ENDPOINTS" : [ { // 用於與SmartMonitor建立心跳 "name" : "MonitorSUB", "zmq_socket_action" : "CONNECT", // ZMQ的連接模式 "zmq_socket_type" : "ZMQ_SUB" // ZMQ的通訊模式 }, { // 下發消息給OCDis,這邊存在轉發功能,支持業務實現按條件轉發 "downstream" : [ "OCDis2OLC"], "name" : "NE2OLC", // 根據這個名字在業務代碼中實現轉發 "zmq_socket_action" : "BIND", "zmq_socket_type" : "ZMQ_STREAM" }, { // OLC到OCDis的鏈路 "name" : "OCDis2OLC", "statistics_on" : true, "zmq_socket_action" : "CONNECT", "zmq_socket_type" : "ZMQ_DEALER" }, { // OCDis回OLC的鏈路,之所以來去分開,主要用於實現優雅啟停功能(啟停節點保證不丟消息) "name" : "OCDis2OLC_Backway", "statistics_on" : true, "zmq_socket_action" : "CONNECT", "zmq_socket_type" : "ZMQ_DEALER", "backway_pair" : "OCDis2OLC" }, { // 用於與SmartMonitor的命令消息鏈路 "name" : "OLC2Monitor", "zmq_socket_action" : "CONNECT", "zmq_socket_type" : "ZMQ_DEALER" }, ], "ENDPOINT_TO_MONITOR" : "OLC2Monitor", "INSTANCE_GROUP" : [ { "instance_endpoints_address" : [ { "endpoint_name" : "NE2OLC", "zmq_socket_address" : "tcp://*:6701" }, { "endpoint_name" : "OCDis2OLC", "zmq_socket_address" : [ "tcp://127.0.0.1:7201" // 跨機的IP地址與端口,配合狀態中心可實現自動管理,無需人工參與配置 ] }, { "endpoint_name" : "OCDis2OLC_Backway", "zmq_socket_address" : [ "tcp://127.0.0.1:7202" ] }, { "endpoint_name" : "OLC2Monitor", "zmq_socket_address" : "ipc://Monitor2Business_IPC" }, { "endpoint_name" : "MonitorSUB", "zmq_socket_address" : "ipc://MonitorPUB" } ], "instance_group_name" : "1" } ] },
OLC程序:
static const char * ENDPOINT_NE2OLC = "NE2OLC"; static const char * ENDPOINT_OLC2OCDIS = "OCDis2OLC"; static const char * ENDPOINT_MONITORSUB = "MonitorSUB"; int main(int argc, char * argv[]) { SmartUtilities::Daemonize(); OLCProxyServer server(argc, argv); if (!server.Initialize(logger)) return -1;
// OLC與OCDis的消息處理 server.SetCallbackOnReceivingMessage(ENDPOINT_OLC2OCDIS, bind(&OLCProxyServer::ReceiveFromOCDis, &server, _1, _2, _3));
// OLC與SmartMonitor的消息處理 server.SetCallbackOnReceivingMessage(ENDPOINT_MONITORSUB, bind(&OLCProxyServer::ReceiveFromMonitorSUB, &server, _1, _2, _3));
// 解析消息包實現業務功能 server.SetPacketParserFunction(ENDPOINT_NE2OLC, bind(&OLCProxyServer::ParseStreamCCR, &server, _1, _2, _3));
// 設置消息轉發具體規則 server.SetDownstreamSelector(ENDPOINT_NE2OLC, bind(&OLCProxyServer::StreamSelector, &server, _1, _2)); server.Run(); return 0; }
二、在線更新鏈接拓撲能力
通訊平台支持在線重新讀取更新的配置文件,更新網絡拓撲,自動建立新鏈接、斷開舊鏈接的能力。配合狀態中心可以實現無需重啟節點的動態擴容縮容等功能。
三、SmartMonitor進程監控管理業務進程與SmartTool工具進程
業務進程可以跟SmartMonitor建立通訊聯系,SmartMonitor可以檢測業務進程的心跳,以保證業務進程的可用。SmartMonitor通過AppCount.json來管理節點業務進程,實現統一啟停等功能。
{ "OCPro": { "IN": 2, // 業務進程可以有不同的種類,后面代表進程數 "PS": 3, "SMS": 4, }, "OCDis": 3, "SERVER_TYPE":"OCS" // 節點的類型 }
還可以通過SmartTool工具進程,來給業務進程發送各種命令,如:調整日志級別,刷新業務參數,啟停業務進程等等。
1. 啟動平台
SmartMonitor
2. 停平台
SmartTool stop all
停指定進程(停止后會被SmartMonitor重新拉起)
SmartTool stop OCPro 停止所有業務的OCPro進程
SmartTool stop OCPro.IN 停止IN業務的OCPro進程
SmartTool stop 4829 停止PID為4829的進程
3. 調整應用層、框架層日志級別
其中,日志級別為error,warn,info,debug,trace
SmartTool log 進程名 level=日志級別,flush=日志級別
比如: SmartTool log OCPro level=debug,flush=debug
四、通訊平台性能數據
進程Z負載控制消息流量,進程A負責發、收消息,統計時延數據。進程B收到消息后負責回消息。
性能瓶頸主要在A機,既要負責收發包,又要統計時延數據,還要控制流量。
未完待續...
技術交流合作QQ群:436466587 歡迎討論交流