感謝分享原文-http://bjbsair.com/2020-04-03/tech-info/29914.html
Kubeedge除了在kubernetes的方面做了各種異步通信通道,保障offline后的業務連續性之外;還定義了一系列的設備抽象,用來管理邊緣設備。而且,其v1.0版本正朝着邊緣端服務網格,以及函數式計算等方向發展。
官方文檔:https://docs.kubeedge.io/en/latest/
架構
整體架構圖比較明了,在不考慮edgesite的情況下,其架構分為了雲端和邊緣端。其實可以理解為kubernetes的管理側和kubelet節點側(對應edge端)。但是請注意,這里的場景是邊緣計算,意味着edge端的網絡環境難以保障。
雲邊通信
於是就衍生出了cloud端的cloud Hub與edge端的Edge Hub。這兩個模塊之間通過websocket或者quic通信,相當於建立了一條底層通信隧道,供k8s和其他應用通信。當然,使用什么協議通信不是重點,重點是如何保障當着之間的鏈路都無法保障的時候,業務不受到影響,這就是MetaManager的要解決的問題了。
- CloudHub前面提到cloud端的cloudHub就是一個隧道的server端,用於大量的edge端基於websocket或者quic協議連接上來;沒錯,這貨才是正兒八經的二傳手,每天就負責拉皮條。
- EdgeHub位於edge端運行,是隧道的client端,負責將接收到的信息轉發到各edge端的模塊處理;同時將來自個edge端模塊的消息通過隧道發送到cloud端。
邊緣端
- MetaManagerMetaManager模塊后端對應一個本地的數據庫(sqlLite),所有其他模塊需要與cloud端通信的內容都會被保存到本地DB中一份,當需要查詢數據時,如果本地DB中存在該數據,就會從本地獲取,這樣就避免了與cloud端之間頻繁的網絡交互;同時,在網絡中斷的情況下,本地的緩存的數據也能夠保障其穩定運行(比如你的智能汽車進入到沒有無線信號的隧道中),在通信恢復之后,重新同步數據。
- Edged之前提到過kubernetes的kubelet,它相當於k8s的核心。這塊其實簡單做了一些裁剪,去掉一些用不上的功能,然后就成為Edged模塊,該模塊就是保障cloud端下發的pod以及其對應的各種配置、存儲(后續會支持函數式計算)能夠在edge端穩定運行,並在異常之后提供自動檢測、故障恢復等能力。當然,由於k8s本身運行時的發展,該模塊對應支持各種CRI應該也比較容易。
- EventBus/ServiceBus/Mappper前面講到的模塊都與k8s直接或間接相關,接下來說下與設備(或者說真正IOT業務)相關的設備管理側。外部設備的接入當前支持MQTT和Rest-API,這里分別對應EventBus和ServiceBus。EventBus就是一個MQTT broker的客戶端,主要功能是將edge端各模塊通信的message與設備mapper上報到MQTT的event做轉換的組件;而ServiceBus就是對應當外部是Rest-API接入時的轉換組件。說到這里,就有必要提一下MQTT broker,其實搞互聯網的基本都用過類似於rabbitmq、activeMQ之類的消息中間件,其實他們就支持MQTT協議啦(可以理解為AMQP的精簡版)。IOT的各種設備可能直接支持MQTT,但有的只支持藍牙或者其他近場通信的協議。沒關系,Mappper可以實現將各種協議轉換為對MQTT的訂閱與發布,從而實現與edge端的通信。當然,ServiceBus對應就適用於支持http協議的服務了。
- DeviceTwinedge端最后就剩下一個DeviceTwin模塊了,要理解這個名詞,就得提一下數字孿生這個概念。這里來科幻一下,假設人類要實現乾坤大挪移,但是有點難度的是,這下是要把你移到火星上。怎么辦?這里有一個解決方案:在地球上通過掃描你的所有生物信息,生成擁有你完整生物特征的數據包之后,然后在地球上就把你毀滅了。再將描述你完整信息的數據包通過電波光速發送到火星上,讓火星的設備再使用接收到的生物特征造出一個你。是不是挺可行!_ 回過頭來,我們要說的數字孿生就是那個用來傳輸到火星的用於描述你所有生物特征的數據包;當然,這里對應就是接入設備信息。所以,DeviceTwin就是將這些信息保存到本地DB中,並處理基於cloud端的操作來修改device的某些屬性(也就是操作設備);同時,將設備基於eventBus上報的狀態信息同步到本地DB和cloud端的中間人。
雲端
Controller
然后再說controller,其實准確的說controller是包括了用於edge端與API-Server同步信息的edgeController與用於DeviceTwin與API-Server同步device CRD信息的deviceController組成。這兩個模塊相對也比較簡單,后面具體講解。
各個模塊實現
邊緣端
入口與beehive
beehive模塊在整個kubeedge中扮演了非常重要的作用,它實現了一套Module管理的接口,程序中各個模塊的啟動、運行、模塊間的通信等都是由其統一封裝管理。下圖是kubeedge的edge端代碼的main啟動流程,這里涉及到的modules就是由beehive提供。
可以看到,在初始化的時候,分別加載了各個edge端modules的init函數,用來注冊其modules到heehive框架中。然后在core.Run中遍歷啟動(StartModules)。
另外,值得提及的是,用於模塊間通信,發送message到group/module的功能,在beehive中,其實是通過channel來通信的。這也是golang推薦的goroutine間通信的方式。
EdgeHub
重點是啟動了兩個go routine,用來實現往兩個方向的消息接收和分發。這里go ehc.routeToEdge對應從隧道端點接收cloud端發往edge端的消息,然后調用ehc.dispatch解析出消息的目標module並基於heehive模塊module間消息的通信機制來轉發出去。
同理,go ehc.routeToCloud實現將edge端消息基於隧道轉發到cloud端的cloudHub模塊處理。當然,該模塊中實現了對同步消息的response等到超時處理的邏輯,當在未超時期間收到response消息,會轉發到消息發送端模塊。比較暴力的是,一旦發送消息到cloud失敗,該goroutine會退出,通知所有模塊,當前與cloud端是未連接狀態,然后重新發起連接。
metaManager在與cloud的連接斷開期間,會使用本地DB中的數據,不會發起往cloud端的查詢。
Edged
這塊基本是調用kubelet的代碼,實現較多的是啟動流程。另外,將之前kubelet的client作為fake的假接口,轉而將數據都通過metaClient來存儲數據到metaManager,從而代理之前直接訪問api-server的操作。這塊的學習可以參考之前分析kubelet架構的一篇文章Kubelet源碼架構簡介。
這里差異化的一塊代碼在e.syncPod的實現,通過讀取metaManager和EdgeController的pod任務列表,來執行對本地pod的操作。同時,這些pod關聯的configmap和secret也會隨着處理pod的過程而一並處理。對pod的操作也是基於一個操作類別的queue,比如e.podAddWorkerRun就啟動了一個用於消費添加pod的queue的goroutine。外部的封裝基本就這樣,內部完全通過引用kubelet原生的包來處理。
MetaManager
從代碼架構看起來,該模塊比較簡單,首先在外層按照一定周期給自己發送消息,觸發定時同步pod狀態到cloud端。另外,在mainLoop中啟動一個獨立的goroutine接收外部消息,並執行處理邏輯。
處理邏輯基於消息類型分類,分別包括:
- cloud端發起的增、刪、查、改
- edge端模塊發起的查詢請求(前面提到,當狀態為disconnect的時候不發起remote查詢)
- cloud端返回的查詢響應的結果
- edgeHub發來的用於更新與cloudHub見連接狀態的消息
- 自己給自己發送的,定期同步edge端pod狀態到cloud端的消息
- 函數式計算相關的消息
重點來說增刪查改,拿添加舉例。當接收到要添加某個資源時,會將資源解析出來,組織成為key、type、value的三元組,以一種類似於模擬NoSQL的方式保存到本地的SqlLite數據庫中。這樣保存的目的也是為了方便快速檢索和增刪。保存完之后,需要對應發送response消息到請求消息的源模塊。
EventBus與ServiceBus
- EventBus
eventBus用於對接MQTT Broker與beehive,MQTT broker有幾種啟動模式,從代碼實現的角度分為:
- 使用內嵌MQTT broker
- 使用外部MQTT broker
在內嵌MQTT broker模式下,eventBus啟動了golang實現的broker包gomqtt用來作為外部MQTT設備的接入,具體用法請參考其github項目首頁。兩種模式下eventBus都做了一些共性的操作,具體包括:
- 向broker訂閱關注的topic,如下:
SubTopics = []string{
"$hw/events/upload/#",
"$hw/events/device/+/state/update",
"$hw/events/device/+/twin/+",
"$hw/events/node/+/membership/get",
"SYS/dis/upload_records",
}
2.當接收到對應的event時,觸發回調函數onSubscribe
3.回調函數中,對event做了簡單的分類,分別發送到不同的目的地(DeviceTwin或EventHub)
所有 $hw/events/device/+/twin/+和 $hw/events/node/+/membership/gettopic 的event發送到DeviceTwin,其他的event直接發送到EventHub再同步到Cloud端。
當然,該部分也包括了創建客戶端,往MQTT broker發布events的接口,這里就不展開了。
- ServiceBus
ServiceBus啟動一個goroutine來接受來自beehive的消息,然后基於消息中帶的參數,通過調用http client將消息通過REST-API發送到本地127.0.0.1上的目標APP。這相當於一個客戶端,而APP是一個http Rest-API server,所有的操作和設備狀態都需要客戶端調用接口來下發和獲取。
DeviceTwin
DeviceTwin包含一下幾個部分的功能:
- 數據存儲方面,將設備數據存儲到本地存儲sqlLite,包括三張表:device、deviceAttr和deviceTwin。
- 處理其他模塊發送到twin module的消息,然后調用 dtc.distributeMsg來處理消息。在消息處理邏輯里面,消息被分為了四個類別,並分別發送到這四個類別的action執行處理(每一個類別又包含多個action):membershipdevicecommunicationtwin
由於這個部分和設備更緊密相關,為何要分着幾個類別,都是如何抽象,這塊的理解方面還不夠透徹,我們暫時只關注其主業務邏輯,官方文檔對這塊有比較詳細的描述devicetwin。
雲端
入口
這里重點關注在init中加載了cloudHub、controller(也就是edgeController)和devicecontroller三個部分。然后和edge端一樣,都是beehive的套路,調用StartModules來啟動所有的模塊。
CloudHub
handler.WebSocketHandler.ServeEvent在websocket server上接收新邊緣node的連接,然后為新node分配channel queue。再進一步將消息交給負責內容讀寫的邏輯處理。
channelq.NewChannelEventQueue為每一個邊緣node維護了一個對應的channel queue(這里默認有10個消息的緩存),然后調用go q.dispatchMessage 來接收由controller發送到clouHub的消息,基於消息內容解析其目的node,然后將消息發送到node對應的channel排隊處理。
clouHub的核心邏輯包括這兩部分,讀和寫:
- 前面講到,需要發送到邊緣node的消息會發送到了node對應的channel隊列上,這里通過handler.WebSocketHandler.EventWriteLoop在channel中讀取到,並負責基於隧道發送處理(這里也很多判斷,比如如果找不到對應的node節點,或者該node節點為offline狀態等都會終止發送)。
- 另一方面,handler.WebSocketHandler.EventReadLoop函數從隧道上讀取來自於edge端的消息,然后將消息發送到controller模塊處理(如果是keepalive的心跳消息直接忽略)。
如果cloudHub往node發送消息失敗,就會觸發EventHandler的CancelNode操作;如果結合edgeHub端的行為的話,我們知道edgeHub會重新發起到cloud端的新連接,然后重新走一遍同步流程。
Controller(EdgeController)
controller的核心邏輯是upstream和downstream。
- upstream接收由beehive發送到controller的消息,然后基於消息資源類型,通過go uc.dispatchMessage轉發到不同的的goroutine處理。這里包括nodeStatus、podStatus、queryConfigMap、querySecret、queryService、queryEndpoints等;各種類別的操作都是調用k8s的client代碼來將節點狀態寫到API-Server。
- downstream通過調用k8s client代碼來監聽各資源的變化情況,比如對於pod是通過 dc.podManager.Events來讀取消息,然后調用dc.messageLayer.Send將消息發送到edge端處理。這里也同upstream,包括pod、configmap、secret、nodes、services和endpoints這些資源。
DeviceController
deviceController同edgeController,只是其關心的資源不再是k8s的workload的子資源,而是為device定義的CRD,包括:device和deviceModel。由於主要邏輯都通edgeControler,這里不再做詳細介紹。感謝分享原文-http://bjbsair.com/2020-04-03/tech-info/29914.html
Kubeedge除了在kubernetes的方面做了各種異步通信通道,保障offline后的業務連續性之外;還定義了一系列的設備抽象,用來管理邊緣設備。而且,其v1.0版本正朝着邊緣端服務網格,以及函數式計算等方向發展。
官方文檔:https://docs.kubeedge.io/en/latest/
架構
整體架構圖比較明了,在不考慮edgesite的情況下,其架構分為了雲端和邊緣端。其實可以理解為kubernetes的管理側和kubelet節點側(對應edge端)。但是請注意,這里的場景是邊緣計算,意味着edge端的網絡環境難以保障。
雲邊通信
於是就衍生出了cloud端的cloud Hub與edge端的Edge Hub。這兩個模塊之間通過websocket或者quic通信,相當於建立了一條底層通信隧道,供k8s和其他應用通信。當然,使用什么協議通信不是重點,重點是如何保障當着之間的鏈路都無法保障的時候,業務不受到影響,這就是MetaManager的要解決的問題了。
- CloudHub前面提到cloud端的cloudHub就是一個隧道的server端,用於大量的edge端基於websocket或者quic協議連接上來;沒錯,這貨才是正兒八經的二傳手,每天就負責拉皮條。
- EdgeHub位於edge端運行,是隧道的client端,負責將接收到的信息轉發到各edge端的模塊處理;同時將來自個edge端模塊的消息通過隧道發送到cloud端。
邊緣端
- MetaManagerMetaManager模塊后端對應一個本地的數據庫(sqlLite),所有其他模塊需要與cloud端通信的內容都會被保存到本地DB中一份,當需要查詢數據時,如果本地DB中存在該數據,就會從本地獲取,這樣就避免了與cloud端之間頻繁的網絡交互;同時,在網絡中斷的情況下,本地的緩存的數據也能夠保障其穩定運行(比如你的智能汽車進入到沒有無線信號的隧道中),在通信恢復之后,重新同步數據。
- Edged之前提到過kubernetes的kubelet,它相當於k8s的核心。這塊其實簡單做了一些裁剪,去掉一些用不上的功能,然后就成為Edged模塊,該模塊就是保障cloud端下發的pod以及其對應的各種配置、存儲(后續會支持函數式計算)能夠在edge端穩定運行,並在異常之后提供自動檢測、故障恢復等能力。當然,由於k8s本身運行時的發展,該模塊對應支持各種CRI應該也比較容易。
- EventBus/ServiceBus/Mappper前面講到的模塊都與k8s直接或間接相關,接下來說下與設備(或者說真正IOT業務)相關的設備管理側。外部設備的接入當前支持MQTT和Rest-API,這里分別對應EventBus和ServiceBus。EventBus就是一個MQTT broker的客戶端,主要功能是將edge端各模塊通信的message與設備mapper上報到MQTT的event做轉換的組件;而ServiceBus就是對應當外部是Rest-API接入時的轉換組件。說到這里,就有必要提一下MQTT broker,其實搞互聯網的基本都用過類似於rabbitmq、activeMQ之類的消息中間件,其實他們就支持MQTT協議啦(可以理解為AMQP的精簡版)。IOT的各種設備可能直接支持MQTT,但有的只支持藍牙或者其他近場通信的協議。沒關系,Mappper可以實現將各種協議轉換為對MQTT的訂閱與發布,從而實現與edge端的通信。當然,ServiceBus對應就適用於支持http協議的服務了。
- DeviceTwinedge端最后就剩下一個DeviceTwin模塊了,要理解這個名詞,就得提一下數字孿生這個概念。這里來科幻一下,假設人類要實現乾坤大挪移,但是有點難度的是,這下是要把你移到火星上。怎么辦?這里有一個解決方案:在地球上通過掃描你的所有生物信息,生成擁有你完整生物特征的數據包之后,然后在地球上就把你毀滅了。再將描述你完整信息的數據包通過電波光速發送到火星上,讓火星的設備再使用接收到的生物特征造出一個你。是不是挺可行!_ 回過頭來,我們要說的數字孿生就是那個用來傳輸到火星的用於描述你所有生物特征的數據包;當然,這里對應就是接入設備信息。所以,DeviceTwin就是將這些信息保存到本地DB中,並處理基於cloud端的操作來修改device的某些屬性(也就是操作設備);同時,將設備基於eventBus上報的狀態信息同步到本地DB和cloud端的中間人。
雲端
Controller
然后再說controller,其實准確的說controller是包括了用於edge端與API-Server同步信息的edgeController與用於DeviceTwin與API-Server同步device CRD信息的deviceController組成。這兩個模塊相對也比較簡單,后面具體講解。
各個模塊實現
邊緣端
入口與beehive
beehive模塊在整個kubeedge中扮演了非常重要的作用,它實現了一套Module管理的接口,程序中各個模塊的啟動、運行、模塊間的通信等都是由其統一封裝管理。下圖是kubeedge的edge端代碼的main啟動流程,這里涉及到的modules就是由beehive提供。
可以看到,在初始化的時候,分別加載了各個edge端modules的init函數,用來注冊其modules到heehive框架中。然后在core.Run中遍歷啟動(StartModules)。
另外,值得提及的是,用於模塊間通信,發送message到group/module的功能,在beehive中,其實是通過channel來通信的。這也是golang推薦的goroutine間通信的方式。
EdgeHub
重點是啟動了兩個go routine,用來實現往兩個方向的消息接收和分發。這里go ehc.routeToEdge對應從隧道端點接收cloud端發往edge端的消息,然后調用ehc.dispatch解析出消息的目標module並基於heehive模塊module間消息的通信機制來轉發出去。
同理,go ehc.routeToCloud實現將edge端消息基於隧道轉發到cloud端的cloudHub模塊處理。當然,該模塊中實現了對同步消息的response等到超時處理的邏輯,當在未超時期間收到response消息,會轉發到消息發送端模塊。比較暴力的是,一旦發送消息到cloud失敗,該goroutine會退出,通知所有模塊,當前與cloud端是未連接狀態,然后重新發起連接。
metaManager在與cloud的連接斷開期間,會使用本地DB中的數據,不會發起往cloud端的查詢。
Edged
這塊基本是調用kubelet的代碼,實現較多的是啟動流程。另外,將之前kubelet的client作為fake的假接口,轉而將數據都通過metaClient來存儲數據到metaManager,從而代理之前直接訪問api-server的操作。這塊的學習可以參考之前分析kubelet架構的一篇文章Kubelet源碼架構簡介。
這里差異化的一塊代碼在e.syncPod的實現,通過讀取metaManager和EdgeController的pod任務列表,來執行對本地pod的操作。同時,這些pod關聯的configmap和secret也會隨着處理pod的過程而一並處理。對pod的操作也是基於一個操作類別的queue,比如e.podAddWorkerRun就啟動了一個用於消費添加pod的queue的goroutine。外部的封裝基本就這樣,內部完全通過引用kubelet原生的包來處理。
MetaManager
從代碼架構看起來,該模塊比較簡單,首先在外層按照一定周期給自己發送消息,觸發定時同步pod狀態到cloud端。另外,在mainLoop中啟動一個獨立的goroutine接收外部消息,並執行處理邏輯。
處理邏輯基於消息類型分類,分別包括:
- cloud端發起的增、刪、查、改
- edge端模塊發起的查詢請求(前面提到,當狀態為disconnect的時候不發起remote查詢)
- cloud端返回的查詢響應的結果
- edgeHub發來的用於更新與cloudHub見連接狀態的消息
- 自己給自己發送的,定期同步edge端pod狀態到cloud端的消息
- 函數式計算相關的消息
重點來說增刪查改,拿添加舉例。當接收到要添加某個資源時,會將資源解析出來,組織成為key、type、value的三元組,以一種類似於模擬NoSQL的方式保存到本地的SqlLite數據庫中。這樣保存的目的也是為了方便快速檢索和增刪。保存完之后,需要對應發送response消息到請求消息的源模塊。
EventBus與ServiceBus
- EventBus
eventBus用於對接MQTT Broker與beehive,MQTT broker有幾種啟動模式,從代碼實現的角度分為:
- 使用內嵌MQTT broker
- 使用外部MQTT broker
在內嵌MQTT broker模式下,eventBus啟動了golang實現的broker包gomqtt用來作為外部MQTT設備的接入,具體用法請參考其github項目首頁。兩種模式下eventBus都做了一些共性的操作,具體包括:
- 向broker訂閱關注的topic,如下:
SubTopics = []string{
"$hw/events/upload/#",
"$hw/events/device/+/state/update",
"$hw/events/device/+/twin/+",
"$hw/events/node/+/membership/get",
"SYS/dis/upload_records",
}
2.當接收到對應的event時,觸發回調函數onSubscribe
3.回調函數中,對event做了簡單的分類,分別發送到不同的目的地(DeviceTwin或EventHub)
所有 $hw/events/device/+/twin/+和 $hw/events/node/+/membership/gettopic 的event發送到DeviceTwin,其他的event直接發送到EventHub再同步到Cloud端。
當然,該部分也包括了創建客戶端,往MQTT broker發布events的接口,這里就不展開了。
- ServiceBus
ServiceBus啟動一個goroutine來接受來自beehive的消息,然后基於消息中帶的參數,通過調用http client將消息通過REST-API發送到本地127.0.0.1上的目標APP。這相當於一個客戶端,而APP是一個http Rest-API server,所有的操作和設備狀態都需要客戶端調用接口來下發和獲取。
DeviceTwin
DeviceTwin包含一下幾個部分的功能:
- 數據存儲方面,將設備數據存儲到本地存儲sqlLite,包括三張表:device、deviceAttr和deviceTwin。
- 處理其他模塊發送到twin module的消息,然后調用 dtc.distributeMsg來處理消息。在消息處理邏輯里面,消息被分為了四個類別,並分別發送到這四個類別的action執行處理(每一個類別又包含多個action):membershipdevicecommunicationtwin
由於這個部分和設備更緊密相關,為何要分着幾個類別,都是如何抽象,這塊的理解方面還不夠透徹,我們暫時只關注其主業務邏輯,官方文檔對這塊有比較詳細的描述devicetwin。
雲端
入口
這里重點關注在init中加載了cloudHub、controller(也就是edgeController)和devicecontroller三個部分。然后和edge端一樣,都是beehive的套路,調用StartModules來啟動所有的模塊。
CloudHub
handler.WebSocketHandler.ServeEvent在websocket server上接收新邊緣node的連接,然后為新node分配channel queue。再進一步將消息交給負責內容讀寫的邏輯處理。
channelq.NewChannelEventQueue為每一個邊緣node維護了一個對應的channel queue(這里默認有10個消息的緩存),然后調用go q.dispatchMessage 來接收由controller發送到clouHub的消息,基於消息內容解析其目的node,然后將消息發送到node對應的channel排隊處理。
clouHub的核心邏輯包括這兩部分,讀和寫:
- 前面講到,需要發送到邊緣node的消息會發送到了node對應的channel隊列上,這里通過handler.WebSocketHandler.EventWriteLoop在channel中讀取到,並負責基於隧道發送處理(這里也很多判斷,比如如果找不到對應的node節點,或者該node節點為offline狀態等都會終止發送)。
- 另一方面,handler.WebSocketHandler.EventReadLoop函數從隧道上讀取來自於edge端的消息,然后將消息發送到controller模塊處理(如果是keepalive的心跳消息直接忽略)。
如果cloudHub往node發送消息失敗,就會觸發EventHandler的CancelNode操作;如果結合edgeHub端的行為的話,我們知道edgeHub會重新發起到cloud端的新連接,然后重新走一遍同步流程。
Controller(EdgeController)
controller的核心邏輯是upstream和downstream。
- upstream接收由beehive發送到controller的消息,然后基於消息資源類型,通過go uc.dispatchMessage轉發到不同的的goroutine處理。這里包括nodeStatus、podStatus、queryConfigMap、querySecret、queryService、queryEndpoints等;各種類別的操作都是調用k8s的client代碼來將節點狀態寫到API-Server。
- downstream通過調用k8s client代碼來監聽各資源的變化情況,比如對於pod是通過 dc.podManager.Events來讀取消息,然后調用dc.messageLayer.Send將消息發送到edge端處理。這里也同upstream,包括pod、configmap、secret、nodes、services和endpoints這些資源。
DeviceController
deviceController同edgeController,只是其關心的資源不再是k8s的workload的子資源,而是為device定義的CRD,包括:device和deviceModel。由於主要邏輯都通edgeControler,這里不再做詳細介紹。