abp版本: 4.3.0.0
.net core 版本 2.2
1、Mqtt
1.1 添加程序集:M2MqttDotnetCore(差點以為沒有.net core 的)
2.2 實現代碼:抄了個單例模式,並將服務器斷開和消息接收事件委托給外層
public class MqttClientService { private IConfiguration _config; private static volatile MqttClientService _instance = null; private static readonly object LockHelper = new object(); /// <summary> /// 創建單例模式 /// </summary> public static MqttClientService CreateInstance(IConfiguration config) { if (_instance == null) { lock (LockHelper) { if (_instance == null) _instance = new MqttClientService(config); } } return _instance; } /// <summary> /// 實例化訂閱客戶端 /// </summary> public MqttClient SubscribeClient { get; set; } public Action<Object, MqttMsgPublishEventArgs> ReceivedMsg { get; set; } public Action<object, EventArgs> ClosedCon; public MqttClientService(IConfiguration config) { _config = config; //生成客戶端ID並連接服務器 string ClientId = _config["MqttService:ClientId"]; string HostIP = _config["MqttService:HostIP"]; string Port= _config["MqttService:Port"]; // create client instance SubscribeClient = new MqttClient(IPAddress.Parse(HostIP), int.Parse(Port), false, new X509Certificate(), new X509Certificate(), MqttSslProtocols.None); // 消息接收處理事件 SubscribeClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived; //與服務器斷開事件 SubscribeClient.ConnectionClosed += Client_ConnectionClosed; SubscribeClient.Connect(ClientId); // 在這里初始化訂閱,從數據庫取出所有需要訂閱的設備信息,進行訂閱 // SubscribeClient.Subscribe(new string[] { "avatar/uploaded" }, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); } void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) { // handle message received ReceivedMsg?.Invoke(sender, e); } /// <summary> /// 與服務器斷開 /// </summary> private void Client_ConnectionClosed(object sender, EventArgs e) { ClosedCon?.Invoke(sender, e); } /// <summary> /// 發布 /// </summary> /// <param name="Topic">發布的主題</param> /// <param name="Data">發布的消息內容</param> public void Publish(string Topic, string Data) { SubscribeClient.Publish(Topic, Encoding.UTF8.GetBytes(Data), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false); } /// <summary> /// 訂閱 /// </summary> /// <param name="Topic">訂閱的主題</param> public void Subscribe(string[] Topic) { // 訂閱主題"/home/temperature" 消息質量為 2(只有一次) SubscribeClient.Subscribe(Topic, new byte[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE }); } /// <summary> /// 取消訂閱 /// </summary> public void Unsubscribe(string[] Topic) { SubscribeClient.Unsubscribe(Topic); } }
2.signalr,繼承 AbpHubBase, ISingletonDependency(實現依賴注入的單例模式),這里只是為了做測試,所以代碼有點丑
public class MqttHub : AbpHubBase, ISingletonDependency//AbpHubBase { // 使用GetByUserIdOrNull、GetAllClients和IsOnline方法 獲取在線用戶信息 private readonly IOnlineClientManager _onlineClient; private readonly IConfiguration _config; public MqttHub(IConfiguration config,IOnlineClientManager onlineClient):base() { _config = config; _onlineClient = onlineClient; } static List<MqttUserModel> userList = new List<MqttUserModel>(); /// <summary> /// 訂閱 這個方法名字是自定義的。參數也是自定義的 /// </summary> public async Task SubscribeMessage(string Topic) { MqttClientService service = MqttClientService.CreateInstance(_config); service.ReceivedMsg += MqttHelper_ReceivedMsg; service.ClosedCon += MqttHelper_ClosedCon; await Task.Run(() => service.Subscribe(new string[] { Topic })); } /// <summary> /// 取消訂閱 /// </summary> /// <param name="Topic"></param> public void Unsubscribe(string Topic) { MqttClientService service = MqttClientService.CreateInstance(_config); service.ReceivedMsg += MqttHelper_ReceivedMsg; service.ClosedCon += MqttHelper_ClosedCon; service.Unsubscribe(new string[] { Topic }); } /// <summary> /// 發布 /// </summary> /// <param name="Topic"></param> /// <param name="Data"></param> public void PublishMessage(string Topic, string Data) { MqttClientService service = MqttClientService.CreateInstance(_config); service.ReceivedMsg += MqttHelper_ReceivedMsg; service.ClosedCon += MqttHelper_ClosedCon; service.Publish(Topic, Data);//發布 var clientConnectionId = Context.ConnectionId; //這是與我連接的客戶端的連接ID(瀏覽器端) } /// <summary> /// 添加在線人員 /// </summary> public void AddOnlineUser() { //直接從當前登錄者信息里面取 var user = userList.FirstOrDefault(x => x.Id == AbpSession.GetUserId()); if (user == null) { //添加在線人員 userList.Add(new MqttUserModel { ConnectionId = Context.ConnectionId, Id = AbpSession.GetUserId(),//隨機用戶id UserName = AbpSession.GetUserName(), }); } else { user.ConnectionId = Context.ConnectionId; } Clients.All.SendAsync("getMessage",new { msg = "當前登錄用戶:" + user.UserName + "\r\n" }); var clientConnectionId = Context.ConnectionId; //這是與我連接的客戶端的連接ID(瀏覽器端) Clients.Client(clientConnectionId).SendAsync("getMessage", new { msg = "您好,歡迎登陸!" });//指定接收者 } /// <summary> /// /// </summary> private void MqttHelper_ClosedCon(object sender, EventArgs e) { Clients.All.SendAsync("getMessage", new { msg = "服務器已斷開鏈接\r\n" }); } /// <summary> /// 接收到服務器端的返回 /// </summary>> private void MqttHelper_ReceivedMsg(object sender, MqttMsgPublishEventArgs e) { byte[] b = e.Message; string str = System.Text.Encoding.UTF8.GetString(b); //All表示監聽所有連接上來的客戶端。 //getMessage是一個動態的方法,名字我們可以隨意定的。這里我僅僅是給他取名叫getMessage而已,我們也可以叫Clients.All.ABC(); Clients.All.SendAsync("getMessage", new { msg = str + "\r\n" });//調用所有連接上來的客戶端(包括自己)監聽的getMessage事件。All是一個dynamic屬性,所以可以隨意的監聽 } public void SendMessage(string message) { Clients.All.SendAsync("getMessage", new { msg = string.Format("User {0}: {1}", AbpSession.UserId, message) }); } /// <summary> /// /// </summary> /// <returns></returns> public async override Task OnConnectedAsync() { await base.OnConnectedAsync(); Logger.Debug("A client connected to MyChatHub: " + Context.ConnectionId); } /// <summary> /// 重寫父類OnDisconnected方法 :OnConnected方法客戶端斷開連接的時候會調用此方法 /// </summary> /// <param name="exception"></param> /// <returns></returns> public async override Task OnDisconnectedAsync(Exception exception) { await base.OnDisconnectedAsync(exception); Logger.Debug("A client disconnected from MyChatHub: " + Context.ConnectionId); }
3、前端:
<div class="layui-fluid"> <div class="layui-col-sm3"> <textarea id="TextArea1" rows="30" cols="900" style=" width:100%;height:100%;max-width:inherit;"></textarea> </div> <div class="layui-col-sm6"> <h5>訂閱測試</h5> @*<p>服務器地址:</p> <p><input id="txtIP" type="text" /></p>*@ <p>訂閱主題:</p> <p><input id="txtTopic" type="text" value="12/deviceStatus/12190101999" /></p> <p><input id="btnSubscribe" type="button" value="訂閱" /><input id="btnSubscribeNo" type="button" value="取消訂閱" /></p> <h5>發布測試</h5> <p>發布主題:</p> <p><input id="txtTopicPub" type="text" /></p> <p>發布內容</p> <p><textarea id="txtPublish" rows="10" cols="20" style=" width:100%;height:100%;max-width:inherit;"></textarea></p> <p><input id="btnPublic" type="button" value="發布" /></p> <input type="button" id="btn1" value="提交" /> </div> </div>
4 JS
var chatHub = null; abp.signalr.startConnection(abp.appPath + 'signalr-mqttHub', function (connection) { chatHub = connection; // Save a reference to the hub connection.on('getMessage', function (message) { // Register for incoming messages $("#TextArea1").text($("#TextArea1").text() + message.msg + ""); //console.log('received message: ' + message); }); }).then(function (connection) { $("#TextArea1").text($("#TextArea1").text() + "連接MyHub成功\r\n"); //abp.log.debug('Connected to mqttHub server!'); abp.event.trigger('mqttHub.connected'); }); abp.event.on('mqttHub.connected', function () { // Register for connect event chatHub.invoke('sendMessage', "Hi everybody, I'm connected to the chat!"); // Send a message to the server }); $("#btnSubscribe").click(function () { chatHub.invoke('subscribeMessage', $("#txtTopic").val()); }); $("#btnSubscribeNo").click(function () { chatHub.invoke('unsubscribe', $("#txtTopic").val()); }); $("#btnPublic").click(function () { chatHub.invoke('publishMessage', $("#txtPublish").val()) });
5.Startup Configure
app.UseSignalR(routes => { routes.MapHub<AbpCommonHub>("/signalr"); routes.MapHub<MqttHub>("/signalr-mqttHub"); // Prefix with '/signalr' });
原諒我寫得太匆忙,15分鍾居然沒有寫完,只能后面再補細節