1 什么是MQTT?
mqtt (Message Queuing Telemetry Transport,消息隊列遙測傳輸)是 IBM 開發的一個即時通訊協議,有可能成為物聯網的重要組成部分。MQTT 是基於二進制消息的發布/訂閱編程模式的消息協議,如今已經成為 OASIS 規范,由於規范很簡單,非常適合需要低功耗和網絡帶寬有限的 IoT 場景。
2 MQTTnet
MQTTnet 是一個基於MQTT協議高度專業的.net庫,它同時提供MQTT client和MQTT server(broke),支持v3.1.0,v3.1.1和v5.0.0的標准MQTT協議.
3 MQTTnet支持范圍
.Net Standard 1.3+
.Net Core 1.1+
.Net Core App 1.1+
.Net Framework 4.5.2+(x86,x64,AnyCPU)
Mono 5.2+
Universal Windows Platform(UWP) 10.0.1024+(x86,x64,ARM,AnyCPU,Windwos 10 IoT Core)
Xamarin.Android 7.5+
Xamarin.iOS 10.14+
4 創建服務器
MQTT服務器以稱為"消息代理"(Broker),可以是一個應用程序或一台設備。它是位於消息發布者和訂閱者之間,它可以:
(1)接受來自客戶的網絡連接;
(2)接受客戶發布的應用信息;
(3)處理來自客戶端的訂閱和退訂請求;
(4)向訂閱的客戶轉發應用程序消息。
服務器創建一個控制台應用,可選>>控制台應用(.NET Core)創建新項目MqttNetServer,代碼如下:
1 sing MQTTnet; 2 using MQTTnet.Protocol; 3 using MQTTnet.Server; 4 using Newtonsoft.Json; 5 using System; 6 using System.Collections.Generic; 7 using System.IO; 8 using System.Reflection; 9 using System.Security.Cryptography.X509Certificates; 10 using System.Text; 11 using System.Threading.Tasks; 12 13 namespace MqttServerTest 14 { 15 class Program 16 { 17 public static IMqttServer mqttServer; 18 static void Main(string[] args) 19 { 20 mqttServer = new MQTTnet.MqttFactory().CreateMqttServer(); 21 mqttServer.UseClientConnectedHandler(e => 22 { 23 Console.WriteLine("***new connect:" + e.ClientId); 24 25 }); 26 mqttServer.UseClientDisconnectedHandler(e => 27 { 28 Console.WriteLine("*** disconnect:" + e.ClientId); 29 }); 30 31 //var options = new MqttServerOptions(); 32 //await mqttServer.StartAsync(options); 33 34 //var currentPath = Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location); 35 //var certificate = new X509Certificate2(Path.Combine(currentPath, "certificate.pfx"), "yourPassword", X509KeyStorageFlags.Exportable); 36 37 38 var optionsBuilder = new MqttServerOptionsBuilder() 39 .WithConnectionBacklog(100) 40 .WithDefaultEndpointPort(1884) 41 .WithConnectionValidator(c=> { 42 //c.SessionItems. 43 //if (c.ClientId.Length < 10) 44 //{ 45 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; 46 // //c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid; 47 // return; 48 //} 49 //if (c.Username != "mySecretUser") 50 //{ 51 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; 52 // return; 53 //} 54 55 //if (c.Password != "mySecretPassword") 56 //{ 57 // c.ReturnCode = MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; 58 // return; 59 //} 60 61 c.ReturnCode = MqttConnectReturnCode.ConnectionAccepted; 62 Console.WriteLine("***connect validator:"+c.ClientId); 63 }) 64 //.WithEncryptionCertificate(certificate.Export(X509ContentType.Pfx)) 65 //.WithEncryptionSslProtocol(SslProtocols.Tls12) 66 .WithApplicationMessageInterceptor(context=> { 67 //if (context.ApplicationMessage.Topic == "my/custom/topic") 68 //{ 69 // context.ApplicationMessage.Payload = Encoding.UTF8.GetBytes("The server injected payload."); 70 //} 71 //// It is possible to disallow the sending of messages for a certain client id like this: 72 //if (context.ClientId != "Someone") 73 //{ 74 // context.AcceptPublish = false; 75 // return; 76 //} 77 // It is also possible to read the payload and extend it. For example by adding a timestamp in a JSON document. 78 // This is useful when the IoT device has no own clock and the creation time of the message might be important. 79 80 context.AcceptPublish = true; 81 Console.WriteLine("***Message:" + context.ApplicationMessage.Payload); 82 }) 83 .WithSubscriptionInterceptor(context=> 84 { 85 //if (context.TopicFilter.Topic.StartsWith("admin/foo/bar") && context.ClientId != "theAdmin") 86 //{ 87 // context.AcceptSubscription = false; 88 //} 89 90 //if (context.TopicFilter.Topic.StartsWith("the/secret/stuff") && context.ClientId != "Imperator") 91 //{ 92 // context.AcceptSubscription = false; 93 // context.CloseConnection = true; 94 //} 95 96 context.AcceptSubscription = true; 97 Console.WriteLine("***Subscript:" + context.TopicFilter); 98 }) 99 //.WithStorage(new RetainedMessageHandler()) 100 ; 101 var options = optionsBuilder.Build(); 102 103 //// Setting the options 104 //options.Storage=new RetainedMessageHandler(); 105 106 StartServer(options); 107 108 109 110 Console.WriteLine("Press any key to exit."); 111 Console.ReadLine(); 112 113 //await mqttServer.StopAsync(); 114 } 115 116 public static async void StartServer(IMqttServerOptions options) 117 { 118 await mqttServer.StartAsync(options); 119 } 120 121 } 122 123 // The implementation of the storage: 124 // This code uses the JSON library "Newtonsoft.Json". 125 public class RetainedMessageHandler : IMqttServerStorage 126 { 127 private const string Filename = "C:\\MQTT\\RetainedMessages.json"; 128 129 public Task SaveRetainedMessagesAsync(IList<MqttApplicationMessage> messages) 130 { 131 File.WriteAllText(Filename, JsonConvert.SerializeObject(messages)); 132 return Task.FromResult(0); 133 } 134 135 public Task<IList<MqttApplicationMessage>> LoadRetainedMessagesAsync() 136 { 137 IList<MqttApplicationMessage> retainedMessages; 138 if (File.Exists(Filename)) 139 { 140 var json = File.ReadAllText(Filename); 141 retainedMessages = JsonConvert.DeserializeObject<List<MqttApplicationMessage>>(json); 142 } 143 else 144 { 145 retainedMessages = new List<MqttApplicationMessage>(); 146 } 147 148 return Task.FromResult(retainedMessages); 149 } 150 } 151 152 }
代碼直接運行起來,就是一個簡單的Mqtt server。
5 創建xamarin APP
一個使用MQTT協議的應用程序或者設備,它總是建立到服務器的網絡連接。客戶端可以:
(1)發布其他客戶端可能會訂閱的信息;
(2)訂閱其它客戶端發布的消息;
(3)退訂或刪除應用程序的消息;
(4)斷開與服務器連接。
在VS中新建一個xamarin.Forms的移動應用,創建好后在Nuget上搜索mqttnet,添加對MQTTnet包的引用。更改代碼如下:
1 <?xml version="1.0" encoding="utf-8" ?> 2 <ContentPage xmlns="http://xamarin.com/schemas/2014/forms" 3 xmlns:x="http://schemas.microsoft.com/winfx/2009/xaml" 4 xmlns:local="clr-namespace:CatShell" 5 x:Class="CatShell.MainPage"> 6 7 <StackLayout> 8 <!-- Place new controls here --> 9 <Label Text="SubscribeTopic"/> 10 <Entry x:Name="txtSubTopic" Placeholder="Subscribe Topic" /> 11 <Button Text="BtnSubscribe" Clicked="SubButton_Clicked"/> 12 <Entry x:Name="txtReceiveMessage"/> 13 <Label Text="PublishTopic"/> 14 <Entry x:Name="txtPubTopic"/> 15 <Entry x:Name="txtSendMessage" /> 16 <Button Text="Publish" Clicked="PubButton_Clicked"/> 17 <Editor> 18 19 </Editor> 20 21 </StackLayout> 22 23 </ContentPage>
1 using MQTTnet; 2 using MQTTnet.Client; 3 using MQTTnet.Client.Options; 4 using System; 5 using System.Collections.Generic; 6 using System.Linq; 7 using System.Text; 8 using System.Threading; 9 using System.Threading.Tasks; 10 using Xamarin.Forms; 11 12 namespace CatShell 13 { 14 public partial class MainPage : ContentPage 15 { 16 public IMqttClient mqttClient; 17 public IMqttClientOptions options; 18 public MainPage() 19 { 20 InitializeComponent(); 21 InitMqttClient(); 22 ConnectMqttServer(); 23 } 24 25 public void InitMqttClient() 26 { 27 // Create a new MQTT client. 28 var factory = new MqttFactory(); 29 mqttClient = factory.CreateMqttClient(); 30 31 mqttClient.UseConnectedHandler(e => { 32 33 Device.BeginInvokeOnMainThread(() => 34 { 35 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> connect success." + Environment.NewLine; 36 }); 37 }); 38 mqttClient.UseDisconnectedHandler(e => 39 { 40 Device.BeginInvokeOnMainThread(() => 41 { 42 txtReceiveMessage.Text = txtReceiveMessage.Text + $">> Disconnect." + Environment.NewLine; 43 }); 44 }); 45 mqttClient.UseApplicationMessageReceivedHandler(e => 46 { 47 Device.BeginInvokeOnMainThread(() => 48 { 49 txtReceiveMessage.Text = $">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}" + Environment.NewLine; 50 }); 51 }); 52 53 // Create TCP based options using the builder. 54 options = new MqttClientOptionsBuilder() 55 .WithClientId("Client4") 56 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal 57 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection. 58 //.WithCredentials("bud", "%spencer%") 59 //.WithTls() 60 //.WithTls(new MqttClientOptionsBuilderTlsParameters 61 //{ 62 // UseTls = true, 63 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) => 64 // { 65 // // TODO: Check conditions of certificate by using above parameters. 66 // return true; 67 // } 68 //}) 69 .WithCleanSession() 70 .Build(); 71 72 } 73 74 public async void ConnectMqttServer() 75 { 76 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken 77 78 } 79 80 private async void SubButton_Clicked(object sender, EventArgs e) 81 { 82 83 string topic = txtSubTopic.Text.Trim(); 84 85 if (string.IsNullOrEmpty(topic)) 86 { 87 //MessageBox.Show("訂閱主題不能為空!"); 88 return; 89 } 90 91 if (!mqttClient.IsConnected) 92 { 93 //MessageBox.Show("MQTT客戶端尚未連接!"); 94 return; 95 } 96 97 // Subscribe to a topic 98 await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); 99 100 txtReceiveMessage.Text = txtReceiveMessage.Text + $"已訂閱[{topic}]主題" + Environment.NewLine; 101 txtSubTopic.IsReadOnly = false; 102 //BtnSubscribe.Enabled = false; 103 } 104 105 private async void PubButton_Clicked(object sender, EventArgs e) 106 { 107 string topic = txtPubTopic.Text.Trim(); 108 109 if (string.IsNullOrEmpty(topic)) 110 { 111 //MessageBox.Show("發布主題不能為空!"); 112 return; 113 } 114 115 string inputString = txtSendMessage.Text.Trim(); 116 117 PublishMessages(topic, inputString); 118 } 119 120 public async void PublishMessages(string topicMsg, string payloadMsg) 121 { 122 var message = new MqttApplicationMessageBuilder() 123 .WithTopic(topicMsg) 124 .WithPayload(payloadMsg) 125 .WithExactlyOnceQoS() 126 .WithRetainFlag() 127 .Build(); 128 129 await mqttClient.PublishAsync(message); 130 } 131 132 } 133 }
代碼運行起來,在APP上可以直接發信息。
6 創建winForm client(可選)
可以創建一個winForm來相互互動,在VS上新建一個windows窗體應用(.NET Framework),界面設計如下

后台代碼如下:
1 using MQTTnet; 2 using MQTTnet.Client.Options; 3 using MQTTnet.Client; 4 using System; 5 using System.Collections.Generic; 6 using System.ComponentModel; 7 using System.Data; 8 using System.Drawing; 9 using System.Linq; 10 using System.Net.Security; 11 using System.Security.Cryptography.X509Certificates; 12 using System.Text; 13 using System.Threading; 14 using System.Threading.Tasks; 15 using System.Windows.Forms; 16 17 namespace MqttClientWin 18 { 19 public partial class Form1 : Form 20 { 21 public IMqttClient mqttClient; 22 public IMqttClientOptions options; 23 public Form1() 24 { 25 InitializeComponent(); 26 InitMqttClient(); 27 ConnectMqttServer(); 28 } 29 30 public void InitMqttClient() 31 { 32 // Create a new MQTT client. 33 var factory = new MqttFactory(); 34 mqttClient = factory.CreateMqttClient(); 35 36 mqttClient.UseApplicationMessageReceivedHandler(e => 37 { 38 Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###"); 39 Console.WriteLine($"+ Topic = {e.ApplicationMessage.Topic}"); 40 Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); 41 Console.WriteLine($"+ QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); 42 Console.WriteLine($"+ Retain = {e.ApplicationMessage.Retain}"); 43 Console.WriteLine(); 44 45 this.Invoke(new Action(() => 46 { 47 txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); 48 })); 49 50 //Task.Run(() => mqttClient.PublishAsync("hello/world")); 51 }); 52 53 mqttClient.UseConnectedHandler(async e => 54 { 55 Console.WriteLine("### CONNECTED WITH SERVER ###"); 56 57 //// Subscribe to a topic 58 //await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("my/topic").Build()); 59 60 //Console.WriteLine("### SUBSCRIBED ###"); 61 62 this.Invoke(new Action(() => 63 { 64 txtReceiveMessage.AppendText($">> connect success.{Environment.NewLine}"); 65 })); 66 }); 67 68 mqttClient.UseDisconnectedHandler(e => 69 { 70 this.Invoke(new Action(() => 71 { 72 txtReceiveMessage.AppendText($">> Disconnect .{Environment.NewLine}"); 73 })); 74 }); 75 76 // Create TCP based options using the builder. 77 options = new MqttClientOptionsBuilder() 78 .WithClientId("Client5") 79 .WithTcpServer("10.100.1.247", 1884) // Use TCP connection, Port is opptinal 80 //.WithWebSocketServer("broker.hivemq.com:8000/mqtt") // Use WebSocket connection. 81 //.WithCredentials("bud", "%spencer%") 82 //.WithTls() 83 //.WithTls(new MqttClientOptionsBuilderTlsParameters 84 //{ 85 // UseTls = true, 86 // CertificateValidationCallback = (X509Certificate x, X509Chain y, SslPolicyErrors z, IMqttClientOptions o) => 87 // { 88 // // TODO: Check conditions of certificate by using above parameters. 89 // return true; 90 // } 91 //}) 92 .WithCleanSession() 93 .Build(); 94 95 } 96 97 public async void ConnectMqttServer() 98 { 99 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken 100 101 } 102 103 public void ReconnectMqttServer() 104 { 105 mqttClient.UseDisconnectedHandler(async e => 106 { 107 Console.WriteLine("### DISCONNECTED FROM SERVER ###"); 108 await Task.Delay(TimeSpan.FromSeconds(5)); 109 110 try 111 { 112 await mqttClient.ConnectAsync(options, CancellationToken.None); // Since 3.0.5 with CancellationToken 113 } 114 catch 115 { 116 Console.WriteLine("### RECONNECTING FAILED ###"); 117 } 118 }); 119 } 120 121 public async void PublishMessages(string topicMsg,string payloadMsg) 122 { 123 var message = new MqttApplicationMessageBuilder() 124 .WithTopic(topicMsg) 125 .WithPayload(payloadMsg) 126 .WithExactlyOnceQoS() 127 .WithRetainFlag() 128 .Build(); 129 130 await mqttClient.PublishAsync(message); 131 } 132 133 private async void BtnSubscribe_Click(object sender, EventArgs e) 134 { 135 string topic = txtSubTopic.Text.Trim(); 136 137 if (string.IsNullOrEmpty(topic)) 138 { 139 MessageBox.Show("訂閱主題不能為空!"); 140 return; 141 } 142 143 if (!mqttClient.IsConnected) 144 { 145 MessageBox.Show("MQTT客戶端尚未連接!"); 146 return; 147 } 148 149 // Subscribe to a topic 150 await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build()); 151 152 txtReceiveMessage.AppendText($"已訂閱[{topic}]主題" + Environment.NewLine); 153 txtSubTopic.Enabled = false; 154 BtnSubscribe.Enabled = false; 155 } 156 157 private void BtnPublish_Click(object sender, EventArgs e) 158 { 159 string topic = txtPubTopic.Text.Trim(); 160 161 if (string.IsNullOrEmpty(topic)) 162 { 163 MessageBox.Show("發布主題不能為空!"); 164 return; 165 } 166 167 string inputString = txtSendMessage.Text.Trim(); 168 169 PublishMessages(topic, inputString); 170 171 } 172 } 173 }
7 MQTT協議中的訂閱、主題、會話
一、訂閱(Subscription)
訂閱包含主題篩選器(Topic Filter)和最大服務質量(QoS)。訂閱會與一個會話(Session)關聯。一個會話可以包含多個訂閱。每一個會話中的每個訂閱都有一個不同的主題篩選器。
二、會話(Session)
每個客戶端與服務器建立連接后就是一個會話,客戶端和服務器之間有狀態交互。會話存在於一個網絡之間,也可能在客戶端和服務器之間跨越多個連續的網絡連接。
三、主題名(Topic Name)
連接到一個應用程序消息的標簽,該標簽與服務器的訂閱相匹配。服務器會將消息發送給訂閱所匹配標簽的每個客戶端。
四、主題篩選器(Topic Filter)
一個對主題名通配符篩選器,在訂閱表達式中使用,表示訂閱所匹配到的多個主題。
五、負載(Payload)
消息訂閱者所具體接收的內容。
8 MQTT協議中的方法
MQTT協議中定義了一些方法(也被稱為動作),來於表示對確定資源所進行操作。這個資源可以代表預先存在的數據或動態生成數據,這取決於服務器的實現。通常來說,資源指服務器上的文件或輸出。主要方法有:
(1)Connect。等待與服務器建立連接。
(2)Disconnect。等待MQTT客戶端完成所做的工作,並與服務器斷開TCP/IP會話。
(3)Subscribe。等待完成訂閱。
(4)UnSubscribe。等待服務器取消客戶端的一個或多個topics訂閱。
(5)Publish。MQTT客戶端發送消息請求,發送完成后返回應用程序線程。
