github地址:https://github.com/wxzz/CSharpFlink
gitee地址:https://gitee.com/wxzz/CSharpFlink
1 概述及背景
我們有一個全國性質的面向工業的公有雲平台,通過專線或4G的鏈路方式實時向平台傳輸數據,每天處理1億條左右的數據量,為現場用戶提供實時的在線服務和離線數據分析服務。現在已經上線穩定運行有將近3年的時間。同時也為工業企業提供私有雲建設服務。
我們計划使用Flink作為雲平台后台的實時計算部分,基本實現數據點的聚合計算、表達式規則計算等業務,進一步實現機器學習或自定義復雜算法的需求。
我們經過將近一年左右時間的研究及開發,已經基本實現了聚合和邏輯等業務,但是感覺Flink比較重,並且應用和運維的水平要求比較高。
基於上述情況,我們自主使用NET 5.0開發一套CSharpFlink實時計算組件,支持自定義數據源、計算和存儲的基本要求。
2 應用場景
主要面向物聯網、工業互聯網私有雲或公有雲平台建設過程中的數據點實時聚合和表達式計算。應用場景包括:
(1)數據點的實時時間窗口范圍內聚合計算,例如:最大值、最小值、平均值、和值、眾數、方差、中位數等,可以自定義二次開發。
(2)數據點的歷史延遲窗口的一段時間范圍內數據補充或更新的重新計算。
(3)數據點的表達式計算,支持自定義C#腳本進行編輯,實時預警或數據深度加工處理。
(4)主從結構的分布式部署,主節點負責計算任務分發,工作節點負責任務計算及結果存儲。
3 框架特點
主要特點主要是根據我們多年的物聯網、工業項目經驗的提煉和總結,滿足實現應用場景,特點包括:
(1)使用最新的NET 5.0進行開發,完全跨平台。
(2)實時數據窗口范圍外的數據補發或更新的重新計算,例如:當前5秒的實時數據窗口,支持5秒以前的數據補充和更新,並且進行重新計算及更新到數據存儲單元。
(3)實時數據表達式計算支持定時計算或數據值改事件變觸發計算,滿足實時表達式或周期性計算。
(4)C#語言的二次開發,對接多種數據源,自定義算子和多種方式數據存儲等。
(5)單節點或分布式部署。
4 框架結構
框架結構組件的基本示意,如下圖:
5 代碼目錄說明
使用VS2019進行工程開發,工程解決方案文件為:CSharpFlink.sln,代碼目錄說明如下:
(1)Cache:主節點和工作節點計算任務本地緩存管理。
(2)Calculate:計算任務輸入、過程、輸出操作及管理。
(3)Channel:主節點和工作節點分布式部署模式的IO通訊操作。
(4)Common:操作公眾類庫。
(5)Config:全局配置文件操作。
(6)Execution:全局工程的執行環境入口。
(7)Expression:表達式計算任務操作。
(8)Log:日志操作及管理。
(9)Model:數據點元數據信息。
(10)Node:主節點和工作節點管理。
(11)Protocol:主節點和工作節點之間分布式部署之間交互的協議。
(12)Sink:計算任務計算結果存儲接口。
(13)Source:對接多種數據源接口,例如:mqtt、kafka、rabbitmq、數據庫等。
(14)Task:窗口或表達任務接口,主節點和工作節點任務操作及管理。
(15)Window:數據窗口任務操作。
(16)Worker:工作節點接口。
6 配置文件說明
配置文件默認為:cfg\global.cfg,可以自定義指定配置文件,參見:命令行操作說明。配置文件說明,如下:
(1)MaxDegreeOfParallelism:任務並行度,主節點生成任務、工作節點處理任務依賴這個參數。
(2)MasterListenPort:主節點偵聽端口,用於工作節點主動連接。
(3)MasterIp:主節點IP,用於工作節點主動連接。
(4)NodeType:節點運行模式,包括:Master、Slave和Both。
(5)RemoteInvokeInterval:遠程調用工作節點間隔時間,單位:毫秒。
(6)RepeatRemoteInvokeInterval:調用工作節點失敗后,重新調用工作節點間隔時間,單位:毫秒。
(7)SlaveExcuteCalculateInterval:工作節點執行計算任務間隔時間,單位:毫秒。
(8)MaxFrameLength:主節點和工作節點之間傳輸數據最大數據偵,單位:字節。
(9)WorkerPower:工作節點能力系數,大於1,會連續發送多個任務。
7 任務部署說明
二次開發參見:二次開發說明。開發好的任務,測試通過后,把程序集(.dll)復制到“tasks”目錄下,例如工程TestTask項目測試、編譯通過后,可以部署到“tasks”目錄下,運行“CSharpFlink”主程序會自動加載和調用。
可以自定義指定任務程序集,參見:命令行操作說明。
8 命令行操作說明
命令行運行“CSharpFlink”程序,支持自定義指定配置文件或任務程序集,說明如下:
-h 顯示命令行幫助。
-c 加載指定配置文件。 例如:CSharpFlink -c c:/my.cfg
-t 加載任務程序集。 例如:CSharpFlink -t c:/mytask.dll
例如:
dotnet CSharpFlink.dll -c c:/master.cfg -t c:/mytask.dll
9 部署說明
“release”目錄下是編譯好的程序,把“CSharpFlink v1.0”分別復制到不同的路徑下,分別修改“cfg\global.cfg”配置文件中“NodeType”參數為:Master和Slave,修改主節點程序“tasks\tasks.cfg”文件中的任務數,分別運行不同目錄下的“dotnet CSharpFlink.dll”。
“TestTask.dll”源代碼,參見:二次開發說明。
10 二次開發說明
二次開發主要針對數據源、計算過程和數據計算結果存儲,大致過程如下:
(1) 數據源對接,可以自定義對接mqtt、kafka、rabbitmq、數據庫等,需要繼承SourceFunction接口,參見:RandomSourceFunction.cs類。
(2) 數據計算過程,可以自定義數據處理或加工,需要繼承Calculate.Calculate接口,參見:聚合計算Avg.cs、表達式計算ExpressionCalculate.cs。通過AddWindowTask或AddExpressionTask函數參數進行實例化。
(3) 數據計算結果存儲,可以自定義存儲任何介質上,需要繼承SinkFunction接口,參見:SinkFunction.cs類。
11 應用事例展示
同一台電腦,CPU:4核 I5-7400 3.0GHz,內存:16G,1個主節點,5個工作節點,生成1000個數據點任務,隨機數據點時間窗口和計算算子,CPU使用率為:20%-30%,內存使用率:30%-40%,主節點CPU和內存使用情況:3%-5%、100MB-300MB, 工作節點CPU和內存使用情況:0.1%-2%、25MB-60MB。運行效果,如下圖:
物聯網&大數據技術 QQ群:54256083
物聯網&大數據合作 QQ群:727664080
聯系QQ:504547114
合作微信:wxzz0151
官方博客:https://www.cnblogs.com/lsjwq
iNeuOS工業互聯網操作系統 公眾號