總述
文檔介紹
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協議),是一種基於發布/訂閱(publish/subscribe)模式的"輕量級"通訊協議。
在軟件開發中,常使用MQTT協議進行消息廣播,因為MQTT是一個協議,所以我們需要搭建一個支持MQTT協議的服務器,使服務端和客戶端能夠通過這個MQTT服務器(broker)進行消息轉發、通信。
部署架構概述
MQTT單節點:192.168.244.84:1883
部署
安裝docker
查看內核版本(需要CentOS7或以上版本)
uname -r
把yum包更新到最新
sudo yum update
安裝需要的軟件包, yum-util 提供yum-config-manager功能,另外兩個是devicemapper驅動依賴的。
sudo yum install -y yum-utils device-mapper-persistent-data lvm2
設置yum源
sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
查看倉庫中docker版本
yum list docker-ce --showduplicates | sort -r
安裝docker
sudo yum install docker-ce
命令測試:
啟動Docker、設置開機啟動、停止Docker
sudo systemctl start docker
sudo systemctl enable docker
sudo systemctl stop docker
查看版本
docker version
使用一下確認是否啟動成功,使用search命令
docker search mysql
查看日志狀態成功日志
systemctl status docker.service
下載MQTT服務器emqx
EMQ X (Erlang/Enterprise/Elastic MQTT Broker) 是基於 Erlang/OTP 平台開發的開源物聯網 MQTT 消息服務器。官方有提供Docker版本,可以直接使用docker pull 進行下載。
它是目前MQTT服務器中,最優秀的產品之一,其優點有:
- 穩定承載大規模的 MQTT 客戶端連接,單服務器節點支持50萬到100萬連接。
- 分布式節點集群,快速低延時的消息路由,單集群支持1000萬規模的路由。
- 消息服務器內擴展,支持定制多種認證方式、高效存儲消息到后端數據庫。
- 完整物聯網協議支持,MQTT、MQTT-SN、CoAP、LwM2M、WebSocket 或私有協議支持。
下載命令:
docker pull emqx
啟動容器
docker run --name=mqtt --net=host --restart=always -d emqx-docker-v3.0-rc.2
啟動后,登錄mqtt管理頁面:http://{ip}:18083,確認部署成功。
默認賬號:admin
默認密碼:public
接下來我們可以通過編寫client端和server端的腳本,來模擬訂閱和發布。
client端測試腳本:
var mqtt = require('mqtt');
var client = mqtt.connect('mqtt://192.168.244.84:1883', {
username: "admin",
password: "public",
clientId: 'client9'
});
function getYYYYMMDDhhmmssByDate() {
let date = new Date();
let value = date.getFullYear() * 10000000000 +
(date.getMonth() + 1) * 100000000 +
date.getDate() * 1000000 +
date.getHours() * 10000 +
date.getMinutes() * 100 +
date.getSeconds();
return value;
};
client.on('connect', function() {
console.log("connect success");
client.subscribe('/server/task/roleId/1/update_task_data');
});
client.on('message', function(topic, message, packet) {
console.log(" ");
console.log("time: ", getYYYYMMDDhhmmssByDate());
var jsonStr = message.toString()
console.log("jsonStr: " + jsonStr);
console.log(" ");
});
server端腳本:
var mqtt = require('mqtt');
var client = mqtt.connect('mqtt://192.168.244.84:1883', {
username: 'admin',
password: 'public',
clientId: 'server1'
});
var jsonStr = JSON.stringify({ "event": "update_task_data", "data": { "arrTaskId": [], "arrTaskInst": [{ "id": 1, "roleId": 1, "moduleId": "0", "userId": 0, "taskId": 999, "finish": 0, "taskNodeId": 15, "taskNodeStatus": 0, "taskNodeStartTime": 20210811, "beforeNodeId": 14 }] } });
// 推送的頻道和數據
//client.publish("/server/task/roleId/1/update_task_data", jsonStr, { qos: 2, retain: false });
運行結果: