通過集群的方式解決基於MQTT協議的RabbitMQ消息收發


在完成了基於AMQP協議的RabbitMQ消息收發后,我們要繼續實現基於MQTT協議的RabbitMQ消息收發。

由於C#的RabbitMQ.Client包中只實現了基於AMQP協議的消息收發功能的封裝,所以要實現基於MQTT協議的收發,我們要下載新的包。

在NuGet的解決方案中,我們選擇了簡單實用的M2Mqtt。

關於M2Mqtt的資料,可以參考: https://m2mqtt.wordpress.com/     https://github.com/eclipse/paho.mqtt.m2mqtt

 

消費者代碼:

using System;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;

namespace MQTTDemo
{
    class Client
    {
        static void Main()
        {
            // create client instance 
            MqttClient client = new MqttClient("127.0.0.1");

            // register to message received 
            client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
            
            string clientId = Guid.NewGuid().ToString();
            client.Connect(clientId);

            client.Subscribe(new string[] { "test" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
        }

        static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
        {
            string msg = System.Text.Encoding.Default.GetString(e.Message);
            Console.WriteLine(msg);
        }
    }
}
View Code

生產者代碼:

using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;

namespace MQTTServer
{
    class Server
    {
        static void Main()
        {
            // create client instance 
            MqttClient client = new MqttClient("127.0.0.1");

            string clientId = Guid.NewGuid().ToString();
            client.Connect(clientId);

            client.Publish("test", Encoding.UTF8.GetBytes("hello"), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);

            Console.WriteLine("Publish!!!");

            Console.ReadKey();
            client.Disconnect();
        }
    }
}
View Code

消費者監聽的隊列名會基於產生的Guid進行前后封裝,“test”表示的是topic值,選擇QOS_LEVEL_AT_MOST_ONCE而不是QOS_LEVEL_EXACTLY_ONCE是因為測試發現QOS_LEVEL_EXACTLY_ONCE消息會被收到多次(我也不知道為啥)。

消費者監聽的隊列會在消費者程序結束后自動刪除,生產者不產生隊列。

 

在rabbitmq-plugins enable rabbitmq_mqtt之后,我們就可以愉快地通過MQTT收發消息了。

 

然而,我們發現只能通過127.0.0.1和localhost訪問RabbitMQ服務器,而本機IP訪問失敗。

 

查閱了大量資料后,我發現這是由於rabbitmq默認的config中有這么一段文字,所以我們之能在localhost中訪問服務器。

%% The default "guest" user is only permitted to access the server
%% via a loopback interface (e.g. localhost).
%% {loopback_users, [<<"guest">>]},

 

所以我們取消了{loopback_users, []}的注釋

%% Uncomment the following line if you want to allow access to the
%% guest user from anywhere on the network.
%% {loopback_users, []},

值得注意的是,由於我們在config中僅僅取消了一行注釋,所以這段代碼是整個代碼塊的最后一行。於是我們應該將句末的逗號一同去掉。

 

然而,我發現怎么更改默認啟動的rabbitmq對應的comfig文件,都無法成功地使用我更改后的config文件,察看了log發現用的是不存在的rabbitmq.conf文件。

修改成rabbitmq.conf后服務啟動失敗,所以我放棄了直接在默認啟動服務中更改。

 

由於之前配置過rabbitmq集群,所以我打算采用集群的方式解決問題。

 

操作可以參考https://www.cnblogs.com/lucifer1997/p/9324130.html,其中我將ClusterNode1改為了mqtt,同時在rabbitmq-mqtt.config中對{loopback_users, []}進行了更改。

 

如果要修改默認的mqtt用戶、密碼、虛擬用戶、交換機信息,可以參照http://www.rabbitmq.com/mqtt.html在rabbitmq-mqtt.config中進行修改。

 

在命令行操作之前先把原來開啟的rabbitmq_mqtt停用,避免兩個服務同時監聽1883端口導致報錯。 rabbitmq-plugins disable rabbitmq_mqtt

同時在操作了rabbitmq-plugins-mqtt enable rabbitmq_management之后執行rabbitmq-plugins-mqtt enable rabbitmq_mqtt。

 

如此就可以在集群后實現遠程MQTT收發,同時還可以實現AMQP與MQTT之間的收發。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM