應用場景:
利用GitHub上的溫度傳感器的例子作為講解,實現從雲端獲取設備終端狀態及使用Java模擬設備數據。其實和官網給的視頻一樣,只需要將終端設備的數據轉換為支持MQTT協議傳輸的數據,雲端就可以拿到數據了。
需要使用的文件及技術
1.雲端:創建文件,及開啟cloudCore
vim device.yaml //案例中的設備配置,直接使用請刪除所有注釋
#apiVersion,該屬性定義了我們從k8s獲取改設備數據的url路徑
apiVersion: devices.kubeedge.io/v1alpha1
kind: Device
metadata:
name: temperature3
labels:
description: 'temperature3'
manufacturer: 'test'
spec:
deviceModelRef:
name: temperature3-model #與設備模板名稱進行綁定
nodeSelector:
nodeSelectorTerms:
- matchExpressions:
- key: ''
operator: In
values:
- sunsheen-edge #部署該設備的節點
# status中的屬性為我們可以定義的屬性,屬性名為propertyName的屬性與初始期望值
status:
twins:
- propertyName: temperatureState
desired:
metadata:
type: string
value: 'on'
- propertyName: temperature
desired:
metadata:
type: string
value: ''
vim devicemodel.yaml //設備模板文件,直接使用請刪除所有注釋。
#apiVersion與設備端保持一致
apiVersion: devices.kubeedge.io/v1alpha1
kind: DeviceModel
metadata:
name: temperature3-model
namespace: default
spec:
#屬性與設備的保持一致,這里可以設備權限,這里我們只能修改溫度狀態,無法控制實際溫度
properties:
- name: temperatureState
description: Temperature collected from the edge device
type:
string:
accessMode: ReadWrite
defaultValue: 'on'
- name: temperature
description: Temperature collected from the edge device
type:
string:
accessMode: ReadOnly
defaultValue: ''
vim deployment.yaml //使用deployment控制器(k8s內容), 創建POD,邊緣節點會自動去拉取鏡像(很慢,建議手動拉取,或配置私有鏡像倉庫)
apiVersion: apps/v1
kind: Deployment
metadata:
name: temperature3-mapper
labels:
app: temperature
spec:
replicas: 1
selector:
matchLabels:
app: temperature3
template:
metadata:
labels:
app: temperature3
spec:
hostNetwork: true
nodeSelector:
name: "sunsheen-edge"
containers:
- name: temperature3
image: kubeedge-mapper:v2.2 #需要部署的鏡像
imagePullPolicy: IfNotPresent
securityContext:
privileged: true
2.邊緣端:開啟 mosquitto,啟動edgeCore
mosquitto -d -p 1883 //邊緣端開啟mosquitto,用於傳輸消息
JAVA代碼模擬設備推送接收與推送消息:
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.13</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<!-- http請求 -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient-cache</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.0</version>
</dependency>
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @author wanchen.chen
* @ClassName KubeedageClient
* @Despriction: MQTTP 連接類,用於推送/訂閱 消息
* @date 2020/4/15 9:20
* @Version 1.0
**/
public class KubeedageClient {
private MqttMessage message;
private MqttClient client;
private MqttConnectOptions options;
private MqttTopic clientTopic;
private MqttTopic serverTopic;
//定義主題,document為雲端反饋的主題;update為邊緣向雲端推送的主題。temperature3為設備名稱,其他都固定。
private static String clientTopicStr ="$hw/events/device/temperature3/twin/update/document";
private static String serverTopicStr ="$hw/events/device/temperature3/twin/update";
private static final String url ="tcp://0.0.0.0:1883";
//我這里是要打包為鏡像部署,所有需要配置邊緣節點的用戶及密碼
private static final String userName ="xxx";
private static final String password ="xxx";
private ScheduledExecutorService scheduler;
public KubeedageClient(){
}
/**
* 初始化
*/
public void start() {
try {
// host為主機名,clientid即連接MQTT的客戶端ID,一般以唯一標識符表示,MemoryPersistence設置clientid的保存形式,默認為以內存保存
client = new MqttClient(url, "KubeEdgeClient", new MemoryPersistence());
// MQTT的連接設置
options = new MqttConnectOptions();
// 設置是否清空session,這里如果設置為false表示服務器會保留客戶端的連接記錄,這里設置為true表示每次連接到服務器都以新的身份連接
options.setCleanSession(true);
// 設置連接的用戶名
options.setUserName(userName);
// 設置連接的密碼
options.setPassword(password.toCharArray());
// 設置超時時間 單位為秒
options.setConnectionTimeout(10);
// 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制
options.setKeepAliveInterval(20);
// 設置回調
client.setCallback(new PushCallback());
clientTopic = client.getTopic(clientTopicStr);
serverTopic = client.getTopic(serverTopicStr);
//setWill方法,如果項目中需要知道客戶端是否掉線可以調用該方法。設置最終端口的通知消息
// options.setWill(clientTopoc, "close".getBytes(), 2, true);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 訂閱主題消息
*/
public void listerData(){
//訂閱消息
int[] Qos = {1};
String[] topic1 = {clientTopicStr};
try {
client.subscribe(topic1, Qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* push 消息到主題
* @param topic
* @param message
* @throws MqttPersistenceException
* @throws MqttException
*/
public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
MqttException {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
// System.out.println("message is published completely! "
// + token.isComplete());
}
/**
* 發送消息
* @param deviceInfo
*/
public void putData(String deviceInfo){
message = new MqttMessage();
message.setQos(2);
message.setRetained(true);
message.setPayload(deviceInfo.getBytes());
try {
publish(serverTopic,message);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* @author wanchen.chen
* @ClassName PushCallback 發布消息的回調類
* @Despriction: 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。
* 每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。
* 在回調中,將它用來標識已經啟動了該回調的哪個實例。
* 必須在回調類中實現三個方法:
* @date 2020/4/15 9:17
* @Version 1.0
**/
public class PushCallback implements MqttCallback {
public void connectionLost(Throwable cause) {
// 連接丟失后,一般在這里面進行重連
System.out.println("連接斷開,可以做重連");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息會執行到這里面,消息只能被消費一次
System.out.println("接收消息主題 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收的消息為:"+str);
}
}
發送的消息結構:
.yaml文件,用於json數據結構
event_id: 0
timestamp: 0
twin:
temperature:
actual:
value: 0
metadata:
type: Updated
temperatureState:
actual:
value: height
metadata:
type: Updated
通過Java代碼將其轉換為JSON,將數據put進JSON中就可以發送了:
import org.yaml.snakeyaml.Yaml;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.net.URL;
import java.util.Map;
/**
* @author wanchen.chen
* @ClassName AnalysisYAML
* @Despriction: 解析YAML文件的內容
* @date 2020/4/28 9:32
* @Version 1.0
**/
public class AnalysisYAML {
/**
* 傳參解析
* @param urlStr
* @return
*/
public Map<String,Object> getYamlData(String urlStr){
URL url = AnalysisYAML.class.getClassLoader().getResource(urlStr);
return analysisData(url);
}
/**
* 默認解析
* @return
*/
public Map<String,Object> getYamlData(){
URL url = AnalysisYAML.class.getClassLoader().getResource("attribute.yaml");
return analysisData(url);
}
/**
* 獲取URL 解析內容
* @param url
* @return
*/
public Map<String,Object> analysisData(URL url){
InputStream input = null;
try {
input = new FileInputStream(url.getFile());
} catch (FileNotFoundException e) {
e.printStackTrace();
}
Yaml yaml = new Yaml();
Map<String,Object> map = (Map<String,Object>)yaml.load(input);
return map;
}
}
鏡像打包:
通過DockerFile將jar文件打包為鏡像:
FROM java:latest
RUN mkdir -p /usr
RUN mkdir -p /usr/local
COPY . /usr/local/
WORKDIR /usr/local
EXPOSE 8892
ENTRYPOINT ["java","-jar","xxx.jar"]
//在DockerFile 文件目錄下創建鏡像
docker build -t kubeedge-mapper:v2.0 .