1、创建控制台程序
环境
net core 3.1
M2MqttDotnetCore 版本1.0.0
2、配置连接信息进行连接
选择ca根证书及客户端pfx格式证书(pfx证书由pem格式及key格式证书合成)
设置ssl通讯版本(MqttSslProtocols.TLSv1_2)、
设置mqtt版本(MqttProtocolVersion.Version_3_1)
注册证书验证回调事件(cafileValidCallback)、
注册消息接收触发事件(messageReceive)、
注册连接关闭事件(connectionClosed)、
注册消息推送事件(msgPublished)
public static void MqttClientConnection(string clientId, string ip,int port, string name, string pwd)
{
X509Certificate2 caCert = new X509Certificate2("ca.crt");
X509Certificate2 clientCert= new X509Certificate2("client.pfx", certificate);
//单向SSL通信
client = new MqttClient(ip, port, true, caCert, clientCert, MqttSslProtocols.TLSv1_2, new RemoteCertificateValidationCallback(cafileValidCallback));
client.ProtocolVersion = MqttProtocolVersion.Version_3_1;
//消息接受
client.MqttMsgPublishReceived += new MqttClient.MqttMsgPublishEventHandler(messageReceive);
client.ConnectionClosed += new MqttClient.ConnectionClosedEventHandler(connectionClosed);
client.MqttMsgPublished += new MqttClient.MqttMsgPublishedEventHandler(msgPublished);
//连接Broker
client.Connect(clientId, name, pwd);
Console.WriteLine("连接成功");
}
3、消息接收事件
/// <summary>
/// 消息接收事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static void messageReceive(object sender, MqttMsgPublishEventArgs e)
{
string msg = "接收消息[Topic:" + e.Topic + " Message:" + Encoding.Default.GetString(e.Message)+"]";
Console.WriteLine(msg);
}
4、连接断开事件
/// <summary>
/// 连接断开事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static void connectionClosed(object sender, EventArgs e)
{
string msg = "连接已断开";
Console.WriteLine(msg);
}
5、消息推送事件
/// <summary>
///
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
static void msgPublished(object sender, MqttMsgPublishedEventArgs e)
{
string msg = "连接断开";
Console.WriteLine(msg);
}
6、证书校验回调事件
/// <summary>
/// 证书校验
/// </summary>
/// <param name="sender"></param>
/// <param name="certificate"></param>
/// <param name="chain"></param>
/// <param name="sslPolicyErrors"></param>
/// <returns></returns>
static bool cafileValidCallback(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
string msg = "X509 链状态:";
foreach (X509ChainStatus status in chain.ChainStatus)
{
msg += status.StatusInformation;
}
msg += "SSL策略问题:" + (int)sslPolicyErrors;
Console.WriteLine(msg);
if (sslPolicyErrors != SslPolicyErrors.None)
return false;
return true;
}
7、主题订阅
/// <summary>
/// 主题订阅
/// </summary>
/// <param name="topic">主题</param>
/// <param name="qosLevels">消息质量</param>
public static void MsgSubscribe(string[] topic,byte[] qosLevels)
{
if (client.IsConnected)
{
client.Subscribe(topic, qosLevels);
foreach (string item in topic)
{
Console.WriteLine("订阅主题[" + item.ToString() + "]");
}
}
else
{
Console.WriteLine("未连接!");
}
}
8、消息推送
/// <summary>
/// 消息推送
/// </summary>
/// <param name="topic"></param>
/// <param name="msg"></param>
public static void MsgPublish(string topic,string pubmsg)
{
client.Publish(topic, Encoding.Default.GetBytes(pubmsg));
string msg = "推送消息[Topic:" + topic + " Message:" + pubmsg + "]";
Console.WriteLine(msg);
}
9、测试连接
string clientId = "test";
string ip = "127.0.0.1";
int port = 8883;
string name = "";
string pwd = "";
//连接
MqttClientConnection(clientId, ip, port, name, pwd);
10、测试订阅
string[] topic = { "test" };
byte[] qosLevels = { 0, 1 };
//订阅
MqttClientHelper.MsgSubscribe(topic, new byte[] { 1 });
11、测试发布
Task.Run(() =>
{
while (true)
{
try
{
//推送
string msg = "当前时间:" + DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
MqttClientHelper.MsgPublish("test", msg);
Thread.Sleep(1000);
}
catch (Exception err)
{
Console.WriteLine(err.Message);
}
}
});