快速搭建MQTT服務器(MQTTnet和Apache Apollo)


前言

MQTT協議是IBM開發的一個即時通訊協議,有可能成為物聯網的重要組成部分,http://mqtt.org/。

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For example, it has been used in sensors communicating to a broker via satellite link, over occasional dial-up connections with healthcare providers, and in a range of home automation and small device scenarios. It is also ideal for mobile applications because of its small size, low power usage, minimised data packets, and efficient distribution of information to one or many receivers

通過https://github.com/mqtt/mqtt.github.io/wiki/servers 找到官方推薦的服務端軟件,比如:Apache Apollo

通過https://github.com/mqtt/mqtt.github.io/wiki/libraries可以找到推薦的客戶端類庫,比如:Eclipse Paho Java

MQTTnet

MQTTnet 是MQTT協議的.NET 開源類庫。

MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation from http://mqtt.org/.

通過Nuget搜索MQTT找到了MQTTnet,它不是下載量最多的,也不在官方推薦列表中,主要是因為同時支持客戶端和服務端,所以開始下載試用,結果證明有坑,源碼在vs2015中不能打開,客戶端示例接收不到消息。

開源地址:https://github.com/chkr1011/MQTTnet

首先把官方的控制台程序改成winform的,界面如下:

復制代碼
 public partial class Form1 : Form
    {

        private MqttServer mqttServer = null;
        private MqttClient mqttClient = null;

        public Form1()
        {
            InitializeComponent();
        }

        private void button_啟動服務端_Click(object sender, EventArgs e)
        {

            MqttTrace.TraceMessagePublished += MqttTrace_TraceMessagePublished;
        

            if (this.mqttServer == null)
            {
                try
                {
                    var options = new MqttServerOptions
                    {
                        ConnectionValidator = p =>
                        {
                            if (p.ClientId == "SpecialClient")
                            {
                                if (p.Username != "USER" || p.Password != "PASS")
                                {
                                    return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                                }
                            }

                            return MqttConnectReturnCode.ConnectionAccepted;
                        }
                    };

                    mqttServer = new MqttServerFactory().CreateMqttServer(options);

                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.Message);
                    return;

                }
            }
            mqttServer.Start();

            this.txt_服務器.AppendText( $">> 啟動成功..." + Environment.NewLine);
        }

        private void MqttTrace_TraceMessagePublished(object sender, MqttTraceMessagePublishedEventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_服務器.AppendText($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}" + Environment.NewLine);
                if (e.Exception != null)
                {
                    this.txt_服務器.AppendText( e.Exception + Environment.NewLine);
                }
            }));
        }

        private void button_停止服務端_Click(object sender, EventArgs e)
        {
            if (mqttServer != null)
            {
                mqttServer.Stop();
            }
            this.txt_服務器.AppendText( $">> 停止成功" + Environment.NewLine);
        }

        private async void button_啟動客戶端_Click(object sender, EventArgs e)
        {
            if (this.mqttClient == null)
            {
                var options = new MqttClientOptions
                {
                    Server = "192.168.2.54",
                    ClientId = "zbl",
                    CleanSession = true
                };
                this.mqttClient = new MqttClientFactory().CreateMqttClient(options);
                this.mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
                this.mqttClient.Connected += MqttClient_Connected;
                this.mqttClient.Disconnected += MqttClient_Disconnected;


            }
            try
            {
                await this.mqttClient.ConnectAsync();
            }
            catch(Exception ex)
            {
                this.txt_客戶端.AppendText( $"### CONNECTING FAILED ###" + Environment.NewLine);
            }


        }

        private void MqttClient_Connected(object sender, EventArgs e)
        {

            this.Invoke(new Action( async () =>
            {

                this.txt_客戶端.AppendText( $"### CONNECTED WITH SERVER ###" + Environment.NewLine);
                await this.mqttClient.SubscribeAsync(new List<TopicFilter>{
                        new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
                    });
                this.txt_客戶端.AppendText($"### SUBSCRIBED  ###" + Environment.NewLine);
            }));
        }

        private void MqttClient_Disconnected(object sender, EventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_客戶端.AppendText( $"### DISCONNECTED FROM SERVER ###" + Environment.NewLine);

            }));
        }

        private void MqttClient_ApplicationMessageReceived(object sender, MQTTnet.Core.MqttApplicationMessageReceivedEventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_客戶端.AppendText( $">> Topic:{e.ApplicationMessage.Topic} Payload:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} QoS:{e.ApplicationMessage.QualityOfServiceLevel}   Retain:{e.ApplicationMessage.Retain}" + Environment.NewLine);

            }));
        }

        private async void button_停止客戶端_Click(object sender, EventArgs e)
        {
            if (this.mqttClient != null)
            {
                await this.mqttClient.DisconnectAsync();
            }
            this.txt_客戶端.AppendText( $">> 停止成功" + Environment.NewLine);
        }

        private async void button_發送_Click(object sender, EventArgs e)
        {

            //var options = new MqttClientOptions
            //{
            //    Server = "localhost"
            //};
            //var client = new MqttClientFactory().CreateMqttClient(options);

            //await client.ConnectAsync();

            var applicationMessage = new MqttApplicationMessage(this.txt_topic.Text,
                Encoding.UTF8.GetBytes(this.txt_message.Text), MqttQualityOfServiceLevel.AtMostOnce, false);

            await this.mqttClient.PublishAsync(applicationMessage);

        }

        private void button_清空服務端_Click(object sender, EventArgs e)
        {
            this.txt_服務器.Text = "";
        }
    }
復制代碼

需要注意的是按照作者說明是

1
2
3
4
5
6
7
8
9
10
11
12
13
while  ( true )
{
     Console.ReadLine();
 
     var  applicationMessage =  new  MqttApplicationMessage(
         "A/B/C" ,
         Encoding.UTF8.GetBytes( "Hello World" ),
         MqttQualityOfServiceLevel.AtLeastOnce,
         false
     );
 
     await client.PublishAsync(applicationMessage);
}

 客戶端死活都收不到消息,改成 MqttQualityOfServiceLevel.AtMostOnce  就可以了,找問題時嘗試下載源碼調試因為vs2015打不開項目也折騰了一會。這個問題提交了issues,期待作者回復。

 

Apache Apollo

1.下載Apollo服務器,我這里用的是Binaries for Windows。下載后解壓到一個文件夾,注意路徑不要包含中文,安裝手冊

2.創建Broker Instance,命令行cd到bin目錄,執行/bin/apollo create mybroker,執行后就會在bin目錄下創建mybroker文件夾。

3.運行Broker Instance,命令行cd到mybroker/bin目錄,執行mybroker/bin/apollo-broker.cmd run

4.Web Administrator,地址 http://127.0.0.1:61680/ or https://127.0.0.1:61681/,默認賬號 admin,密碼 password

 

 參考

 

搭建了MQTT服務端之后需要在Esp8266模塊和手機App中分別實現客戶端功能,稍后待續。。。。

 

 
分類:  IoT


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM