Http接口封裝Mqtt協議
前言
- .Net Core 3.1 WebApi
- 列出了mqtt客戶端的封裝目的是為了了解運作機制
1、封裝mqtt客戶端
- mqtt底層協議基於MQTTnet 版本2.8.5 github地址
- 實例化【單例注入AOC】
- 發布消息
- 訂閱消息
實例化
public MqttNetClient(MqttConfig _mqttConfig, EventHandler<MqttApplicationMessageReceivedEventArgs> receivedMessageHanddler,
ICache iCache)
{
mqttConfig = _mqttConfig;
_iCache=iCache;
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient() as MqttClient;
clientId = "MQTT:Cloud:Client:" + _mqttConfig.ClientIdPre + ":" + Guid.NewGuid();
//實例化一個MqttClientOptionsBulider
options = new MqttClientOptionsBuilder()
.WithTcpServer(_mqttConfig.Server, _mqttConfig.Port)
.WithCredentials(_mqttConfig.Username, _mqttConfig.Password)
.WithClientId(clientId)
.Build();
if (receivedMessageHanddler != null)
{
//是服務器接收到消息時觸發的事件,可用來響應特定消息
mqttClient.ApplicationMessageReceived +=receivedMessageHanddler;
}
//是客戶端連接成功時觸發的事件
mqttClient.Connected+=Connected;
//是客戶端斷開連接時觸發的事件
mqttClient.Disconnected+=Disconnected;
//連接服務器
mqttClient.ConnectAsync(options);
}
發布消息
public async Task PublishAsync(string topic, string content)
{
var message = new MqttApplicationMessageBuilder();
message.WithTopic(topic);
message.WithPayload(content);
message.WithAtMostOnceQoS();
message.WithRetainFlag(false);
await mqttClient.PublishAsync(message.Build());
}
訂閱消息
public async Task SubscribeAsync(string topic)
{
await mqttClient.SubscribeAsync(topic);
}
監聽mqtt回復消息
隨WebApi應用自啟動單例MqttClient去消費消息
- 初始化mqtt連接配置
- 主動訂閱Topic
- 消費mqtt消息handler
初始化mqtt連接配置&主動訂閱Topic代碼
var mqttConfig = new MqttConfig
{
Server = _config.Get("EMQServer"),//服務器IP
Port = int.Parse(_config.Get("EMQPort")),//端口
ClientIdPre = "IoT.MQTT"//IoT MQTT連接
};
mqttConfig.TopicList = new List<string>()
{
IoTSubscribeMqttTopicEnum.主動訂閱.GetEnumDescription()
};
_mqttNetClient = new MqttNetClient(mqttConfig, ReceivedMessageHandler, _cache);
消費mqtt消息handler
public static void ReceivedMessageHandler(object sender, MqttApplicationMessageReceivedEventArgs e)
{
var topic = e.ApplicationMessage.Topic;
var payload = EncryptUtil.Base64Decode(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
//如果Topic是主動訂閱,則去訂閱http接口入參傳過來的回復Topic字段
if (topic.Equals(IoTSubscribeMqttTopicEnum.主動訂閱.GetEnumDescription()))
{
_mqttNetClient.SubscribeAsync(payload);
}
else
{
var mqttPayloadModel = JsonConvert.DeserializeObject<MqttPayloadModel<MqttContentModel>>(payload);
//插入緩存是為了在回復的時候用,通過MID保持一致
var cacheKey = $"{RedisKeyEnum.MQTT接收消息.GetEnumDescription()}:{mqttPayloadModel.Con.Mid}";
var _cache = _serviceProvider.GetService<ICache>();
_cache.InsertAsync(cacheKey, payload, 5 * 60);
}
}
2、封裝http接口
發布消息代碼
public async Task<BaseOutModel<PublishMessageOutModel>> PublishMessage(PublishMessageInModel publishInModel)
{
//如果回復Topic不為空,則需要等待mqtt返回消息,通過MID來保持一致
if (!String.IsNullOrEmpty(publishInModel.ReceiveTopic))
{
//先主動訂閱
_mqttNetClient.PublishMessageAsync(IoTSubscribeMqttTopicEnum.主動訂閱.GetEnumDescription(), EncryptUtil.Base64Encode(publishInModel.ReceiveTopic));
//發布消息
_mqttNetClient.PublishMessageAsync(publishInModel.Topic, publishInModel.Content);
var messageId= JsonConvert.DeserializeObject<MqttPayloadModel<MqttContentModel>>(EncryptUtil.Base64Decode(publishInModel.Content)).Con.Mid;
var cacheKey = $"{RedisKeyEnum.MQTT接收消息.GetEnumDescription()}:{messageId}";
//8秒為超時
var endTime = DateTime.Now.AddSeconds(8);
var received = false;
var cacheMsg = String.Empty;
while (!received)
{
cacheMsg = await _cache.GetAsync<String>(cacheKey);
if (cacheMsg != null || DateTime.Now > endTime)
{
received = true;
}
}
//緩存不為空,代表收到了mqtt客戶端的響應
if (cacheMsg != null)
{
//移除回復消息緩存
_cache.RemoveAsync(cacheKey);
return new BaseOutModel<PublishMessageOutModel>
{
Message = "發布消息成功,接收消息成功",
Data = new PublishMessageOutModel { Content = cacheMsg, ReceiveStatus = MqttReceiveStatusEnum.Yes.IntToEnum() }
};
}
return new BaseOutModel<PublishMessageOutModel>
{
Message = "發布消息成功,接收消息失敗",
Data = new PublishMessageOutModel { ReceiveStatus = MqttReceiveStatusEnum.No.IntToEnum() }
};
}
//如果Topic為空,則只需要發布消息就好
await _mqttNetClient.PublishMessageAsync(publishInModel.Topic, publishInModel.Content);
return new BaseOutModel<PublishMessageOutModel> { Message = "發布消息成功",Data=new PublishMessageOutModel { } };
}
接口請求入參
參數名 |
必選 |
類型 |
說明 |
Topic |
是 |
string |
發布的Topic |
Content |
是 |
string |
發布的消息體,需要Base64字符串 |
ReceiveTopic |
否 |
string |
接收消息Topic【只有不為空,才會去處理接收設備端mqtt的回復消息】 |
DeviceSn |
是 |
string |
設備編號 |
接口請求出參
參數名 |
類型 |
說明 |
Data - Content |
string |
回復消息的內容 |
Data - ReceiveStatus |
integer |
回復狀態【1 已回復,0 未回復 即超時】 |
Code |
integer |
返回狀態碼【200正常,0有錯誤】 |
Message |
string |
錯誤描述,Code為0使用 |
接口請求示例
1、在Startup中將HttpClient注入IOC
services.AddHttpClient("iotmqtt", x =>
{
x.BaseAddress= new Uri("http://iotmqtt.liyunzhi.com");
});
2、調用
//請求mqtt的消息體
var content = new
{
Met = "Reboot",
Con = new
{
Mid = Mid,
Ts = Ts,
Sign = Sign,
Cid = Cid
}
};
//請求入參模型
var iotPublishMessageModel = new
{
//請求Topic
Topic = "Device/Reboot",
//請求mqtt的消息體
Content = EncryptUtil.Base64Encode(JsonConvert.SerializeObject(content)),
//消息ID
MessageId = Mid,
//可選參數 需要消息回復則傳
ReceiveTopic = Mid+"Device/Reboot",
//設備號
DeviceSn = "123123123"
};
//
var client = _httpClientFactory.CreateClient("iotmqtt");
var httpResponseMessage = client.PostAsJsonAsync("Api/PublishMessage", iotPublishMessageModel).Result;
//如果當前方法用async異步修飾,請不要.Result 請await
//var httpResponseMessage =await client.PostAsJsonAsync("Api/PublishMessage", iotPublishMessageModel);
if (httpResponseMessage.IsSuccessStatusCode)
{
//成功,返回內容自己自行解析,解析參考接口請求出參
var data= httpResponseMessage.Content.ReadAsStringAsync().Result;
//如果當前方法用async異步修飾,請不要.Result 請await
//var data= await httpResponseMessage.Content.ReadAsStringAsync();
}
else
{
//這里是失敗的代碼了,即發送失敗了
}
3、Github開源地址