Http接口封裝Mqtt協議【C#】


Http接口封裝Mqtt協議

前言

  • .Net Core 3.1 WebApi
  • 列出了mqtt客戶端的封裝目的是為了了解運作機制

1、封裝mqtt客戶端

  • mqtt底層協議基於MQTTnet 版本2.8.5 github地址
  • 實例化【單例注入AOC】
  • 發布消息
  • 訂閱消息

實例化

public MqttNetClient(MqttConfig _mqttConfig, EventHandler<MqttApplicationMessageReceivedEventArgs> receivedMessageHanddler,
    ICache iCache)
{
    mqttConfig = _mqttConfig;
    _iCache=iCache;
    var factory = new MqttFactory();
    mqttClient = factory.CreateMqttClient() as MqttClient;
    clientId = "MQTT:Cloud:Client:" + _mqttConfig.ClientIdPre + ":" + Guid.NewGuid();
    //實例化一個MqttClientOptionsBulider
    options = new MqttClientOptionsBuilder()
        .WithTcpServer(_mqttConfig.Server, _mqttConfig.Port)
        .WithCredentials(_mqttConfig.Username, _mqttConfig.Password)
        .WithClientId(clientId)
        .Build();
    if (receivedMessageHanddler != null)
    {
        //是服務器接收到消息時觸發的事件,可用來響應特定消息
        mqttClient.ApplicationMessageReceived +=receivedMessageHanddler;
    }
    //是客戶端連接成功時觸發的事件
    mqttClient.Connected+=Connected;
    //是客戶端斷開連接時觸發的事件
    mqttClient.Disconnected+=Disconnected;
    //連接服務器
    mqttClient.ConnectAsync(options);
}

發布消息

public async Task PublishAsync(string topic, string content)
{
    var message = new MqttApplicationMessageBuilder();
    message.WithTopic(topic);
    message.WithPayload(content);
    message.WithAtMostOnceQoS();
    message.WithRetainFlag(false);
    await mqttClient.PublishAsync(message.Build());
}

訂閱消息

public async Task SubscribeAsync(string topic)
{
    await mqttClient.SubscribeAsync(topic);
}

監聽mqtt回復消息

隨WebApi應用自啟動單例MqttClient去消費消息

  • 初始化mqtt連接配置
  • 主動訂閱Topic
  • 消費mqtt消息handler

初始化mqtt連接配置&主動訂閱Topic代碼

var mqttConfig = new MqttConfig
{
    Server = _config.Get("EMQServer"),//服務器IP
    Port = int.Parse(_config.Get("EMQPort")),//端口
    ClientIdPre = "IoT.MQTT"//IoT MQTT連接
};

mqttConfig.TopicList = new List<string>()
{
    IoTSubscribeMqttTopicEnum.主動訂閱.GetEnumDescription()
};
_mqttNetClient = new MqttNetClient(mqttConfig, ReceivedMessageHandler, _cache);

消費mqtt消息handler

public static void ReceivedMessageHandler(object sender, MqttApplicationMessageReceivedEventArgs e)
{
    var topic = e.ApplicationMessage.Topic;
    var payload = EncryptUtil.Base64Decode(Encoding.UTF8.GetString(e.ApplicationMessage.Payload));

    //如果Topic是主動訂閱,則去訂閱http接口入參傳過來的回復Topic字段
    if (topic.Equals(IoTSubscribeMqttTopicEnum.主動訂閱.GetEnumDescription()))
    {
        _mqttNetClient.SubscribeAsync(payload);
    }
    else
    {
        var mqttPayloadModel = JsonConvert.DeserializeObject<MqttPayloadModel<MqttContentModel>>(payload);
        //插入緩存是為了在回復的時候用,通過MID保持一致
        var cacheKey = $"{RedisKeyEnum.MQTT接收消息.GetEnumDescription()}:{mqttPayloadModel.Con.Mid}";
        var _cache = _serviceProvider.GetService<ICache>();
        _cache.InsertAsync(cacheKey, payload, 5 * 60);
    }
}

2、封裝http接口

發布消息代碼

public async Task<BaseOutModel<PublishMessageOutModel>> PublishMessage(PublishMessageInModel publishInModel)
{
    //如果回復Topic不為空,則需要等待mqtt返回消息,通過MID來保持一致
    if (!String.IsNullOrEmpty(publishInModel.ReceiveTopic))
    {
        //先主動訂閱
        _mqttNetClient.PublishMessageAsync(IoTSubscribeMqttTopicEnum.主動訂閱.GetEnumDescription(), EncryptUtil.Base64Encode(publishInModel.ReceiveTopic));
        //發布消息
        _mqttNetClient.PublishMessageAsync(publishInModel.Topic, publishInModel.Content);
        var messageId= JsonConvert.DeserializeObject<MqttPayloadModel<MqttContentModel>>(EncryptUtil.Base64Decode(publishInModel.Content)).Con.Mid;
        var cacheKey = $"{RedisKeyEnum.MQTT接收消息.GetEnumDescription()}:{messageId}";
        //8秒為超時
        var endTime = DateTime.Now.AddSeconds(8);
        var received = false;

        var cacheMsg = String.Empty;
        while (!received)
        {
            cacheMsg = await _cache.GetAsync<String>(cacheKey);
            if (cacheMsg != null || DateTime.Now > endTime)
            {
                received = true;
            }
        }
        //緩存不為空,代表收到了mqtt客戶端的響應
        if (cacheMsg != null)
        {
            //移除回復消息緩存
            _cache.RemoveAsync(cacheKey);
            return new BaseOutModel<PublishMessageOutModel>
            {
                Message = "發布消息成功,接收消息成功",
                Data = new PublishMessageOutModel { Content = cacheMsg, ReceiveStatus = MqttReceiveStatusEnum.Yes.IntToEnum() }
            };
        }
        return new BaseOutModel<PublishMessageOutModel>
        {
            Message = "發布消息成功,接收消息失敗",
            Data = new PublishMessageOutModel { ReceiveStatus = MqttReceiveStatusEnum.No.IntToEnum() }
        };
    }
    //如果Topic為空,則只需要發布消息就好
    await _mqttNetClient.PublishMessageAsync(publishInModel.Topic, publishInModel.Content);
    return new BaseOutModel<PublishMessageOutModel> { Message = "發布消息成功",Data=new PublishMessageOutModel { } };
}

接口請求入參

參數名 必選 類型 說明
Topic string 發布的Topic
Content string 發布的消息體,需要Base64字符串
ReceiveTopic string 接收消息Topic【只有不為空,才會去處理接收設備端mqtt的回復消息】
DeviceSn string 設備編號

接口請求出參

參數名 類型 說明
Data - Content string 回復消息的內容
Data - ReceiveStatus integer 回復狀態【1 已回復,0 未回復 即超時】
Code integer 返回狀態碼【200正常,0有錯誤】
Message string 錯誤描述,Code為0使用

接口請求示例

1、在Startup中將HttpClient注入IOC

services.AddHttpClient("iotmqtt", x =>
{
    x.BaseAddress= new Uri("http://iotmqtt.liyunzhi.com");
});

2、調用

//請求mqtt的消息體
var content = new
{
    Met = "Reboot",
    Con = new
    {
        Mid = Mid,
        Ts = Ts,
        Sign = Sign,
        Cid = Cid
    }
};

//請求入參模型
var iotPublishMessageModel = new
{
    //請求Topic
    Topic = "Device/Reboot",
    //請求mqtt的消息體
    Content = EncryptUtil.Base64Encode(JsonConvert.SerializeObject(content)),
    //消息ID
    MessageId = Mid,
    //可選參數 需要消息回復則傳
    ReceiveTopic = Mid+"Device/Reboot",
    //設備號
    DeviceSn = "123123123"
};

//
var client = _httpClientFactory.CreateClient("iotmqtt");
var httpResponseMessage = client.PostAsJsonAsync("Api/PublishMessage", iotPublishMessageModel).Result;
//如果當前方法用async異步修飾,請不要.Result 請await
//var httpResponseMessage =await client.PostAsJsonAsync("Api/PublishMessage", iotPublishMessageModel);
if (httpResponseMessage.IsSuccessStatusCode)
{
    //成功,返回內容自己自行解析,解析參考接口請求出參
    var data= httpResponseMessage.Content.ReadAsStringAsync().Result;
    //如果當前方法用async異步修飾,請不要.Result 請await
   //var data= await httpResponseMessage.Content.ReadAsStringAsync();
}
else
{
    //這里是失敗的代碼了,即發送失敗了
}

3、Github開源地址


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM