ABP .net Core MQTT+signalr通訊


abp版本: 4.3.0.0

.net core 版本 2.2

1、Mqtt 

1.1 添加程序集:M2MqttDotnetCore(差點以為沒有.net core 的)

2.2 實現代碼:抄了個單例模式,並將服務器斷開和消息接收事件委托給外層

public class MqttClientService
    {
        private IConfiguration _config;
        private static volatile MqttClientService _instance = null;

        private static readonly object LockHelper = new object();

        /// <summary>
        /// 創建單例模式
        /// </summary>
        public static MqttClientService CreateInstance(IConfiguration config)
        {
            if (_instance == null)
            {
                lock (LockHelper)
                {
                    if (_instance == null)
                        _instance = new MqttClientService(config);
                }
            }
            return _instance;
        }

        /// <summary>
        /// 實例化訂閱客戶端
        /// </summary>
        public MqttClient SubscribeClient { get; set; }


        public Action<Object, MqttMsgPublishEventArgs> ReceivedMsg { get; set; }
        public Action<object, EventArgs> ClosedCon;

        public MqttClientService(IConfiguration config)
        {
            _config = config;
            //生成客戶端ID並連接服務器  
            string ClientId = _config["MqttService:ClientId"];
            string HostIP = _config["MqttService:HostIP"];
            string Port= _config["MqttService:Port"];
            // create client instance 
            SubscribeClient = new MqttClient(IPAddress.Parse(HostIP), int.Parse(Port), false, new X509Certificate(), new X509Certificate(), MqttSslProtocols.None);

            // 消息接收處理事件
            SubscribeClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
            //與服務器斷開事件
            SubscribeClient.ConnectionClosed += Client_ConnectionClosed;

            SubscribeClient.Connect(ClientId);

            // 在這里初始化訂閱,從數據庫取出所有需要訂閱的設備信息,進行訂閱
            // SubscribeClient.Subscribe(new string[] { "avatar/uploaded" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
        }

        void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
        {
            // handle message received 
            ReceivedMsg?.Invoke(sender, e);
        }

        /// <summary>
        /// 與服務器斷開
        /// </summary>
        private void Client_ConnectionClosed(object sender, EventArgs e)
        {
            ClosedCon?.Invoke(sender, e);
        }

        /// <summary>
        /// 發布
        /// </summary>
        /// <param name="Topic">發布的主題</param>
        /// <param name="Data">發布的消息內容</param>
        public void Publish(string Topic, string Data)
        {
            SubscribeClient.Publish(Topic, Encoding.UTF8.GetBytes(Data), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);
        }

        /// <summary>
        /// 訂閱
        /// </summary>
        /// <param name="Topic">訂閱的主題</param>
        public void Subscribe(string[] Topic)
        {
            // 訂閱主題"/home/temperature" 消息質量為 2(只有一次) 
            SubscribeClient.Subscribe(Topic, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });
        }

        /// <summary>
        /// 取消訂閱
        /// </summary>
        public void Unsubscribe(string[] Topic)
        {
            SubscribeClient.Unsubscribe(Topic);
        }
    }

  2.signalr,繼承 AbpHubBase, ISingletonDependency(實現依賴注入的單例模式),這里只是為了做測試,所以代碼有點丑

public class MqttHub : AbpHubBase, ISingletonDependency//AbpHubBase
    {
        // 使用GetByUserIdOrNull、GetAllClients和IsOnline方法 獲取在線用戶信息
        private readonly IOnlineClientManager _onlineClient;
        private readonly IConfiguration _config;
        public MqttHub(IConfiguration config,IOnlineClientManager onlineClient):base()
        {
            _config = config;
            _onlineClient = onlineClient;
        }

        static List<MqttUserModel> userList = new List<MqttUserModel>();
        /// <summary>
        /// 訂閱 這個方法名字是自定義的。參數也是自定義的
        /// </summary>
        public async Task SubscribeMessage(string Topic)
        {
            MqttClientService service = MqttClientService.CreateInstance(_config);
            service.ReceivedMsg += MqttHelper_ReceivedMsg;
            service.ClosedCon += MqttHelper_ClosedCon;
            await Task.Run(() => service.Subscribe(new string[] { Topic }));
        }
        /// <summary>
        /// 取消訂閱
        /// </summary>
        /// <param name="Topic"></param>
        public void Unsubscribe(string Topic)
        {
            MqttClientService service = MqttClientService.CreateInstance(_config);
            service.ReceivedMsg += MqttHelper_ReceivedMsg;
            service.ClosedCon += MqttHelper_ClosedCon;
            service.Unsubscribe(new string[] { Topic });
        }
        /// <summary>
        /// 發布
        /// </summary>
        /// <param name="Topic"></param>
        /// <param name="Data"></param>
        public void PublishMessage(string Topic, string Data)
        {
            MqttClientService service = MqttClientService.CreateInstance(_config);
            service.ReceivedMsg += MqttHelper_ReceivedMsg;
            service.ClosedCon += MqttHelper_ClosedCon;
            service.Publish(Topic, Data);//發布

            var clientConnectionId = Context.ConnectionId; //這是與我連接的客戶端的連接ID(瀏覽器端)
        }
        /// <summary>
        /// 添加在線人員
        /// </summary>
        public void AddOnlineUser()
        {
            //直接從當前登錄者信息里面取
            var user = userList.FirstOrDefault(x => x.Id == AbpSession.GetUserId());
            if (user == null)
            {
                //添加在線人員
                userList.Add(new MqttUserModel
                {
                    ConnectionId = Context.ConnectionId,
                    Id = AbpSession.GetUserId(),//隨機用戶id
                    UserName = AbpSession.GetUserName(),
                });
            }
            else
            {
                user.ConnectionId = Context.ConnectionId;
            }
            Clients.All.SendAsync("getMessage",new { msg = "當前登錄用戶:" + user.UserName + "\r\n" });

            var clientConnectionId = Context.ConnectionId; //這是與我連接的客戶端的連接ID(瀏覽器端)
            Clients.Client(clientConnectionId).SendAsync("getMessage", new { msg = "您好,歡迎登陸!" });//指定接收者
        }
        /// <summary>
        /// 
        /// </summary>
        private void MqttHelper_ClosedCon(object sender, EventArgs e)
        {
            Clients.All.SendAsync("getMessage", new { msg = "服務器已斷開鏈接\r\n" });
        }
        /// <summary>
        /// 接收到服務器端的返回
        /// </summary>>
        private void MqttHelper_ReceivedMsg(object sender, MqttMsgPublishEventArgs e)
        {
            byte[] b = e.Message;
            string str = System.Text.Encoding.UTF8.GetString(b);

            //All表示監聽所有連接上來的客戶端。
            //getMessage是一個動態的方法,名字我們可以隨意定的。這里我僅僅是給他取名叫getMessage而已,我們也可以叫Clients.All.ABC();
            Clients.All.SendAsync("getMessage", new { msg = str + "\r\n" });//調用所有連接上來的客戶端(包括自己)監聽的getMessage事件。All是一個dynamic屬性,所以可以隨意的監聽
        }

        public void SendMessage(string message)
        {
            Clients.All.SendAsync("getMessage", new { msg = string.Format("User {0}: {1}", AbpSession.UserId, message) });
        }

        /// <summary>
        /// 
        /// </summary>
        /// <returns></returns>
        public async override Task OnConnectedAsync()
        {
            await base.OnConnectedAsync();
            Logger.Debug("A client connected to MyChatHub: " + Context.ConnectionId);
        }

        /// <summary>
        /// 重寫父類OnDisconnected方法 :OnConnected方法客戶端斷開連接的時候會調用此方法
        /// </summary>
        /// <param name="exception"></param>
        /// <returns></returns>
        public async override Task OnDisconnectedAsync(Exception exception)
        {
            await base.OnDisconnectedAsync(exception);
            Logger.Debug("A client disconnected from MyChatHub: " + Context.ConnectionId);
        }

  3、前端:

    <div class="layui-fluid">
        <div class="layui-col-sm3">
            <textarea id="TextArea1" rows="30" cols="900" style=" width:100%;height:100%;max-width:inherit;"></textarea>
        </div>
        <div class="layui-col-sm6">
            <h5>訂閱測試</h5>
            @*<p>服務器地址:</p>
                <p><input id="txtIP" type="text" /></p>*@
            <p>訂閱主題:</p>
            <p><input id="txtTopic" type="text" value="12/deviceStatus/12190101999" /></p>
            <p><input id="btnSubscribe" type="button" value="訂閱" /><input id="btnSubscribeNo" type="button" value="取消訂閱" /></p>
            <h5>發布測試</h5>
            <p>發布主題:</p>
            <p><input id="txtTopicPub" type="text" /></p>
            <p>發布內容</p>
            <p><textarea id="txtPublish" rows="10" cols="20" style=" width:100%;height:100%;max-width:inherit;"></textarea></p>
            <p><input id="btnPublic" type="button" value="發布" /></p>
            <input type="button" id="btn1" value="提交" />
        </div>
    </div>

  4 JS

var chatHub = null;

            abp.signalr.startConnection(abp.appPath + 'signalr-mqttHub', function (connection) {
                chatHub = connection; // Save a reference to the hub

                connection.on('getMessage', function (message) { // Register for incoming messages
                    $("#TextArea1").text($("#TextArea1").text() + message.msg + "");
                    //console.log('received message: ' + message);
                });
            }).then(function (connection) {
                $("#TextArea1").text($("#TextArea1").text() + "連接MyHub成功\r\n");
                //abp.log.debug('Connected to mqttHub server!');
                abp.event.trigger('mqttHub.connected');
            });

            abp.event.on('mqttHub.connected', function () { // Register for connect event
                chatHub.invoke('sendMessage', "Hi everybody, I'm connected to the chat!"); // Send a message to the server
            });

            $("#btnSubscribe").click(function () {
                chatHub.invoke('subscribeMessage', $("#txtTopic").val());
            });

            $("#btnSubscribeNo").click(function () {
                chatHub.invoke('unsubscribe', $("#txtTopic").val());
            });

            $("#btnPublic").click(function () {
                chatHub.invoke('publishMessage', $("#txtPublish").val())
            });

  5.Startup Configure

 app.UseSignalR(routes =>
            {
                routes.MapHub<AbpCommonHub>("/signalr");
                routes.MapHub<MqttHub>("/signalr-mqttHub"); // Prefix with '/signalr'
            });

  原諒我寫得太匆忙,15分鍾居然沒有寫完,只能后面再補細節

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM