1、創建控制台程序
環境
net core 3.1
M2MqttDotnetCore 版本1.0.0
2、配置連接信息進行連接
選擇ca根證書及客戶端pfx格式證書(pfx證書由pem格式及key格式證書合成)
設置ssl通訊版本(MqttSslProtocols.TLSv1_2)、
設置mqtt版本(MqttProtocolVersion.Version_3_1)
注冊證書驗證回調事件(cafileValidCallback)、
注冊消息接收觸發事件(messageReceive)、
注冊連接關閉事件(connectionClosed)、
注冊消息推送事件(msgPublished)
public static void MqttClientConnection(string clientId, string ip,int port, string name, string pwd)
{
X509Certificate2 caCert = new X509Certificate2("ca.crt");
X509Certificate2 clientCert= new X509Certificate2("client.pfx", certificate);
//單向SSL通信
client = new MqttClient(ip, port, true, caCert, clientCert, MqttSslProtocols.TLSv1_2, new RemoteCertificateValidationCallback(cafileValidCallback));
client.ProtocolVersion = MqttProtocolVersion.Version_3_1;
//消息接受
client.MqttMsgPublishReceived += new MqttClient.MqttMsgPublishEventHandler(messageReceive);
client.ConnectionClosed += new MqttClient.ConnectionClosedEventHandler(connectionClosed);
client.MqttMsgPublished += new MqttClient.MqttMsgPublishedEventHandler(msgPublished);
//連接Broker
client.Connect(clientId, name, pwd);
Console.WriteLine("連接成功");
}
3、消息接收事件
/// <summary>
/// 消息接收事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static void messageReceive(object sender, MqttMsgPublishEventArgs e)
{
string msg = "接收消息[Topic:" + e.Topic + " Message:" + Encoding.Default.GetString(e.Message)+"]";
Console.WriteLine(msg);
}
4、連接斷開事件
/// <summary>
/// 連接斷開事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static void connectionClosed(object sender, EventArgs e)
{
string msg = "連接已斷開";
Console.WriteLine(msg);
}
5、消息推送事件
/// <summary>
///
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static void msgPublished(object sender, MqttMsgPublishedEventArgs e)
{
string msg = "連接斷開";
Console.WriteLine(msg);
}
6、證書校驗回調事件
/// <summary>
/// 證書校驗
/// </summary>
/// <param name="sender"></param>
/// <param name="certificate"></param>
/// <param name="chain"></param>
/// <param name="sslPolicyErrors"></param>
/// <returns></returns>
static bool cafileValidCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
string msg = "X509 鏈狀態:";
foreach (X509ChainStatus status in chain.ChainStatus)
{
msg += status.StatusInformation;
}
msg += "SSL策略問題:" + (int)sslPolicyErrors;
Console.WriteLine(msg);
if (sslPolicyErrors != SslPolicyErrors.None)
return false;
return true;
}
7、主題訂閱
/// <summary>
/// 主題訂閱
/// </summary>
/// <param name="topic">主題</param>
/// <param name="qosLevels">消息質量</param>
public static void MsgSubscribe(string[] topic,byte[] qosLevels)
{
if (client.IsConnected)
{
client.Subscribe(topic, qosLevels);
foreach (string item in topic)
{
Console.WriteLine("訂閱主題[" + item.ToString() + "]");
}
}
else
{
Console.WriteLine("未連接!");
}
}
8、消息推送
/// <summary>
/// 消息推送
/// </summary>
/// <param name="topic"></param>
/// <param name="msg"></param>
public static void MsgPublish(string topic,string pubmsg)
{
client.Publish(topic, Encoding.Default.GetBytes(pubmsg));
string msg = "推送消息[Topic:" + topic + " Message:" + pubmsg + "]";
Console.WriteLine(msg);
}
9、測試連接
string clientId = "test";
string ip = "127.0.0.1";
int port = 8883;
string name = "";
string pwd = "";
//連接
MqttClientConnection(clientId, ip, port, name, pwd);
10、測試訂閱
string[] topic = { "test" };
byte[] qosLevels = { 0, 1 };
//訂閱
MqttClientHelper.MsgSubscribe(topic, new byte[] { 1 });
11、測試發布
Task.Run(() =>
{
while (true)
{
try
{
//推送
string msg = "當前時間:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
MqttClientHelper.MsgPublish("test", msg);
Thread.Sleep(1000);
}
catch (Exception err)
{
Console.WriteLine(err.Message);
}
}
});