項目地址:https://github.com/hnlyf168/DotNet.Framework
Install-Package DotNetCN -Version 3.0.0
昨天晚上大致測試了下 ,490個客戶端(一個收一個發) 平均估計每個每秒60個包 使用mqtt協議 發送一個guid的字符串 服務器轉發每秒大約1.2-1.3w
cpu 占用:25% 一下
內存好像都在50m以下
1、協議簡介
MQTT是一個基於客戶端-服務器的消息發布/訂閱傳輸協議。MQTT協議是輕量、簡單、開放和易於實現的,這些特點使它適用范圍非常廣泛。在很多情況下,包括受限的環境中,如:機器與機器(M2M)通信和物聯網(IoT)。其在,通過衛星鏈路通信傳感器、偶爾撥號的醫療設備、智能家居、及一些小型化設備中已廣泛使用。
具體就不在這里記錄了,寫這個服務端和客戶端也只是為了更加深入的學習mqtt協議。
2、mqtt的幾種控制報文類型
名字 |
值 |
報文流動方向 |
描述 |
Reserved |
0 |
禁止 |
保留 |
CONNECT |
1 |
客戶端到服務端 |
客戶端請求連接服務端 |
CONNACK |
2 |
服務端到客戶端 |
連接報文確認 |
PUBLISH |
3 |
兩個方向都允許 |
發布消息 |
PUBACK |
4 |
兩個方向都允許 |
QoS 1消息發布收到確認 |
PUBREC |
5 |
兩個方向都允許 |
發布收到(保證交付第一步) |
PUBREL |
6 |
兩個方向都允許 |
發布釋放(保證交付第二步) |
PUBCOMP |
7 |
兩個方向都允許 |
QoS 2消息發布完成(保證交互第三步) |
SUBSCRIBE |
8 |
客戶端到服務端 |
客戶端訂閱請求 |
SUBACK |
9 |
服務端到客戶端 |
訂閱請求報文確認 |
UNSUBSCRIBE |
10 |
客戶端到服務端 |
客戶端取消訂閱請求 |
UNSUBACK |
11 |
服務端到客戶端 |
取消訂閱報文確認 |
PINGREQ |
12 |
客戶端到服務端 |
心跳請求 |
PINGRESP |
13 |
服務端到客戶端 |
心跳響應 |
DISCONNECT |
14 |
客戶端到服務端 |
客戶端斷開連接 |
Reserved |
15 |
禁止 |
保留 |
2.1、協議頭
每個MQTT控制報文都包含一個固定報頭,
固定報頭的格式
Bit |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
byte 1 |
MQTT控制報文的類型 |
用於指定控制報文類型的標志位 |
||||||
byte 2... |
|
剩余長度 |
剩余長度的計算方式:
剩余長度(Remaining Length)表示當前報文剩余部分的字節數,包括可變報頭和負載的數據。剩余長度不包括用於編碼剩余長度字段本身的字節數。
剩余長度字段使用一個變長度編碼方案,對小於128的值它使用單字節編碼。更大的值按下面的方式處理。低7位有效位用於編碼數據,最高有效位用於指示是否有更多的字節。因此每個字節可以編碼128個數值和一個延續位(continuation bit)。剩余長度字段最大4個字節
例如,十進制數64會被編碼為一個字節,數值是64,十六進制表示為0x40,。十進制數字
321(=65+2*128)被編碼為兩個字節,最低有效位在前。第一個字節是 65+128=193。注意最高位為1表示后面至少還有一個字節。第二個字節是2。
.net 計算方式代碼如下:
/// <summary> /// 獲取一個長度數據 /// </summary> /// <returns></returns> protected virtual Result<int> ReadLength() { var result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } var msgType = result.Data[0]; var msgLength = msgType & 127;//取低7為的值,因為可變長度有效值只有低7位,第8位用來標識下一個字節是否屬於長度字節 var leftBit = 7; while (msgType >> 7 == 1)//判斷最高位是否為1,如果為1則說明后面的1個字節也是屬於長度字節 { result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } msgType = result.Data[0]; msgLength = ((msgType & 127) << leftBit) | msgLength;// 因為mqtt 可變長度的字節是低位在前,所以新取到的長度要左移取到的次數*7位在|原來的長度。 leftBit += 7; } return msgLength; }
2.2、CONNECT – 連接服務端
協議格式
可看到所需要的參數,於是定義一個連接信息類來保存

/// <summary> /// mqtt 連接信息。 /// </summary> public class MQTTConnectInfo { /// <summary> /// 客戶端編號 /// </summary> public virtual string ClientId { get; set; } /// <summary> /// 用戶名 /// </summary> public virtual string UserName { get; set; } /// <summary> /// 用戶密碼 /// </summary> public virtual string Password { get; set; } /// <summary> /// 遺囑保留 /// </summary> public virtual bool WillRetain { get; set; } /// <summary> /// 遺囑QoS /// </summary> public virtual Qos WillQos { get; set; } /// <summary> /// 遺囑標志 /// </summary> public virtual bool WillFlag { get; set; } /// <summary> /// 是否清除對話。 /// </summary> public virtual bool CleanSession { get; set; } /// <summary> /// 保持連接 /// <para>警告:這里的單位是秒</para> /// </summary> public virtual ushort KeepAlive { get; set; } = 10; }
然后就是代碼按協議格式組裝好
代碼如下:

/// <summary> /// 獲取包完整的字節 /// </summary> /// <returns></returns> public override byte[] ToBytes() { var list = new List<byte>(); var mqttBytes = ProtocolName.ToBytes(Encoding.ASCII);//協議名稱:固定位MQTT list.Add((byte)(mqttBytes.Length >> 8)); list.Add((byte)(mqttBytes.Length & 255)); list.AddRange(mqttBytes); list.Add(Version);//協議版本 list.Add(ConnectFlag);//連接標識 list.Add((byte)(KeepAlive >> 8));//心跳值 list.Add((byte)(KeepAlive & 255)); var clientIdBytes = ClientId.ToBytes(Encoding.ASCII);//客戶端編號 list.Add((byte)(clientIdBytes.Length >> 8)); list.Add((byte)(clientIdBytes.Length & 255)); list.AddRange(clientIdBytes); if (HasUserName)//是否包含用戶名 { var userNameBytes = UserName.ToBytes(Encoding.ASCII); list.Add((byte)(userNameBytes.Length >> 8)); list.Add((byte)(userNameBytes.Length & 255)); list.AddRange(userNameBytes); } if (HasPassword)//是否包含用戶密碼 { var passwordBytes = Password.ToBytes(Encoding.ASCII); list.Add((byte)(passwordBytes.Length >> 8)); list.Add((byte)(passwordBytes.Length & 255)); list.AddRange(passwordBytes); } Data = list.ToArray(); list.Clear(); return base.ToBytes(); }
連接回復包格式:
|
描述 |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
||||||||
連接確認標志 |
Reserved 保留位 |
1 SP |
|
|
|
|
|
|
|
||||||||
byte 1 |
|
0 |
0 |
0 |
0 |
0 |
0 |
0 |
X |
||||||||
連接返回碼 |
|
|
|
|
|
|
|
|
|
||||||||
byte 2 |
|
X |
X |
X |
X |
X |
X |
X |
X |
||||||||
|
描述 |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
||||||||
連接確認標志 |
|
Reserved 保留位 |
|
|
1 SP |
||||||||||||
byte 1 |
|
0 |
0 |
0 |
0 |
0 |
0 |
0 |
X |
||||||||
|
|
連接返回碼 |
|
|
|
||||||||||||
byte 2 |
|
X |
X |
X |
X |
X |
X |
X |
X |
,猶豫代碼比較簡單這里就不貼了
2.3、PUBLISH – 發布消息
PUBLISH控制報文是指從客戶端向服務端或者服務端向客戶端傳輸一個應用消息
主題長度 16位 2字節 |
主題內容N |
如果QoS大於0 則有一個消息Id 16位 2字節 |
剩余的N 是消息的主題 |
|
描述 |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
Topic Name 主題名 |
|
|
|
|
|
|
|
|
|
byte 1 |
Length MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 2 |
Length LSB (3) |
0 |
0 |
0 |
0 |
0 |
0 |
1 |
1 |
byte 3 |
‘a’ (0x61) |
0 |
1 |
1 |
0 |
0 |
0 |
0 |
1 |
byte 4 |
‘/’ (0x2F) |
0 |
0 |
1 |
0 |
1 |
1 |
1 |
1 |
byte 5 |
‘b’ (0x62) |
0 |
1 |
1 |
0 |
0 |
0 |
1 |
0 |
報文標識符 |
|
|
|
|
|
|
|
|
|
byte 6 |
報文標識符 MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 7 |
報文標識符 LSB (10) |
0 |
0 |
0 |
0 |
1 |
0 |
1 |
0 |
代碼按協議格式組裝如下:
/// <summary> /// 開始組裝包。 /// </summary> protected override void Packaging() { var topicBytes = Topic.ToBytes();//主題數據 Data = new byte[topicBytes.Length + BodyBytes.Length + (QoS > 0 ? 4 : 2)]; Data[0] = (byte)(topicBytes.Length >> 8); Data[1] = (byte)(topicBytes.Length & 255); topicBytes.CopyTo(Data, 2); if (QoS > 0) { Data[topicBytes.Length + 2] = (byte)(Identifier >> 8); Data[topicBytes.Length + 3] = (byte)(Identifier & 255); } BodyBytes.CopyTo(Data, Data.Length - BodyBytes.Length);//復制消息內容 topicBytes = null; }
后面的格式我就不在一一寫出來了 ,附上一個mqtt協議文檔
這里還有一個非常重要的自己寫的一個Socket 輔助類,主要實現高性能的讀取和發送消息,已整包的形式,這樣避免粘包等問題
/// <summary> /// Berkeley 套接字 輔助 /// </summary> public abstract class SocketClient<Package> where Package : IDataPackage { private Socket m_Socket; private Timer timerHeartbeat; private System.Net.EndPoint remoteEndPoint; /// <summary> /// 客戶端唯一標識 /// </summary> public virtual long Id { get; set; } /// <summary> /// Berkeley 套接字。 /// </summary> public virtual Socket Socket { get => m_Socket; protected set { m_Socket = value; remoteEndPoint = m_Socket.RemoteEndPoint; } } /// <summary> /// 客戶端的遠程信息。 /// </summary> public virtual System.Net.EndPoint RemoteEndPoint { get => remoteEndPoint; } /// <summary> /// 心跳線程 /// </summary> protected virtual Timer TimerHeartbeat { get => timerHeartbeat; } /// <summary> /// 心跳時間。 /// </summary> public virtual int KeepAlive { get; set; } = 180000; /// <summary> /// 初始化 /// </summary> /// <param name="socket"></param> protected SocketClient(Socket socket) { Socket = socket; } /// <summary> /// 初始化 /// </summary> protected SocketClient() { } /// <summary> /// 讀取一個完整的包。 /// </summary> /// <returns></returns> protected abstract DotNet.Result<Package> ReceivePackage(); /// <summary> /// 開始循環讀取消息。 /// </summary> public virtual void OnReceive() { while (!IsClose) { try { OnHeartbeatTimer(); var bytesResult = ReceivePackage(); if (bytesResult.Success) { OnNewDataPackage(bytesResult); } else { WriteLog($"接收包時錯誤,錯誤內容:{bytesResult.Message}"); if (bytesResult.Code == -1) { this.Close(); } } } catch (Exception ex) { WriteErrorLog($"接收包時異常", ex); } } Close(); } /// <summary> /// 當接收到 /// </summary> /// <param name="bytesResult"></param> protected virtual void OnNewDataPackage(Result<Package> bytesResult) { try { // 這里使用異步會有一個問題,就是如果一個客戶端while(true)在發消息,會導致服務器線程被一個客戶端占滿而無法處理其他的客戶端。 OnHandleDataPackage(bytesResult.Data); } catch (Exception ex) { WriteErrorLog($"客戶端處理包時報錯", ex); } } #if NET40 /// <summary> /// 啟用異步讀取 /// </summary> /// <returns></returns> public virtual Task OnReceiveAsync() { return Task.Factory.StartNew(OnReceive); } #else /// <summary> /// 啟用異步讀取 /// </summary> /// <returns></returns> public virtual async Task OnReceiveAsync() { await Task.Run(() => { OnReceive(); }); } #endif private bool m_IsClose; /// <summary> /// 是否已經關閉 /// </summary> public virtual bool IsClose => m_IsClose; /// <summary> /// 關閉連接,並退出當前線程 /// </summary> public virtual void Close(int timeout = 3) { lock (this) { if (!IsClose) { m_IsClose = true; WriteLog($"關閉連接"); OnClose(); //真正關閉,避免二次關閉 } } Socket?.Close(timeout); Socket?.Dispose(); timerHeartbeat?.Dispose(); } /// <summary> /// 關閉連接並退出。 /// </summary> protected abstract void OnClose(); /// <summary> /// 設置心跳計數器 /// </summary> protected virtual void OnHeartbeatTimer() { if (timerHeartbeat == null) { timerHeartbeat = new Timer(OnHeartbeatTimerCallback, this, KeepAlive, KeepAlive); } else { timerHeartbeat.Change(KeepAlive, KeepAlive); } } /// <summary> /// 心跳實際到達后觸發,改方法又心跳計數器執行。 /// </summary> /// <param name="state"></param> protected virtual void OnHeartbeatTimerCallback(object state) { WriteLog($"客戶端{KeepAlive}s未發包,已丟棄"); Close(); } /// <summary> /// 寫入日志。 /// </summary> /// <param name="text">日志內容</param> public virtual void WriteLog(string text) { // Log.WriteLog($" 連接{RemoteEndPoint}-{text}"); } /// <summary> /// 寫入錯誤信息到日志。 /// </summary> /// <param name="text">錯誤信息描述</param> /// <param name="exception">異常信息</param> public virtual void WriteErrorLog(string text, Exception exception = null) { // Log.WriteErrorLog($" 連接{RemoteEndPoint}-{text}", exception); } /// <summary> /// 寫入日志。 /// </summary> /// <param name="text">日志內容</param> /// <param name="args"></param> public virtual void WriteLog(string text, params object[] args) { WriteLog(string.Format(text, args)); } /// <summary> /// 開始處理接收的包 /// </summary> /// <param name="dataPackage"></param> protected abstract void OnHandleDataPackage(Package dataPackage); /// <summary> /// 發送數據 /// </summary> /// <param name="bytes"></param> public virtual Result SendBytes(byte[] bytes) { lock (this) { if (!IsClose) { try { Socket.Send(bytes); return true; } catch (Exception ex) { WriteErrorLog($"發送數據{bytes.ToBase64String()}", ex); if (!Socket.Connected) { Close(); } } } } return false; } }
其mqtt的Socket實現子類如下:
/// <summary> /// mqtt 服務器連接過來的客戶端。 /// </summary> public class MQTTSocketClient : DotNet.Net.SocketClient<MQTTDataPackage> { /// <summary> /// 表示mqtt服務器。 /// </summary> public virtual MQTTServer TcpServer { get; set; } /// <summary> /// 獲取一個值,該值指示客戶端是否發送過了連接協議包。 /// </summary> public virtual bool IsConnect { get; protected set; } private readonly List<TopicDataPackage> subscribeTopics = new List<TopicDataPackage>(); /// <summary> /// 訂閱主題。 /// </summary> public TopicDataPackage[] SubscribeTopics { get => subscribeTopics.ToArray(); } /// <summary> /// 當前消息序號 /// </summary> public virtual ushort Identifier { get; set; } /// <summary> /// 客戶端連接編號 /// </summary> public virtual string ClientId { get; set; } /// <summary> /// 客戶端唯一連接id /// </summary> public override long Id { get { if (long.TryParse(ClientId, out long id)) { base.Id = id; } else { base.Id = ClientId.GetHashCode(); } return base.Id; } set { ClientId = value.ToString(); } } /// <summary> /// 寫日志。 /// </summary> /// <param name="text"></param> public override void WriteLog(string text) { if (ClientId != null) { text = $"客戶端編號:{ClientId}:{text}"; } // base.WriteLog(text); } /// <summary> /// 使用<see cref="Socket"/>客戶端初始化。 /// </summary> /// <param name="socket"></param> public MQTTSocketClient(Socket socket) : base(socket) { } /// <summary> /// 關閉服務端連接 /// </summary> protected override void OnClose() { Console.WriteLine($"{ClientId}關閉連接"); } /// <summary> /// 處理收到的包 /// </summary> /// <param name="dataPackage"></param> protected override void OnHandleDataPackage(MQTTDataPackage dataPackage) { try { WriteLog($"收到{dataPackage.MessageType} 包, QoS level:{dataPackage.QoS}"); if (IsConnect && dataPackage.MessageType != MessageType.Connect) { WriteLog($"收到{dataPackage.MessageType} 包, QoS level:{dataPackage.QoS} ,但連接尚未登錄,被拋棄"); this.Close(); } switch (dataPackage.MessageType) { case MessageType.Connect: OnConnect(dataPackage); break; case MessageType.Subscribe: OnSubscribe(dataPackage); break; case MessageType.PingRequest: OnPingRequest(dataPackage); break; case MessageType.Publish: OnPublishPackage(dataPackage); break; case MessageType.UnSubscribe: OnUnSubscribe(dataPackage); break; case MessageType.Disconnect: this.Close(); break; } } catch (Exception ex) { } dataPackage = null; } #if NET40 /// <summary> /// 當收到發布消息 /// </summary> /// <param name="dataPackage"></param> protected virtual Task OnPublishPackage(MQTTDataPackage dataPackage) { return Task.Factory.StartNew(() => { #else /// <summary> /// 當收到發布消息 /// </summary> /// <param name="dataPackage"></param> /// <returns></returns> protected virtual async Task OnPublishPackage(MQTTDataPackage dataPackage) { await Task.Run(() => { #endif try { PublishDataPackage publishDataPackage = new PublishDataPackage(dataPackage); var result = OnPublish(publishDataPackage); if (dataPackage.QoS > 0) { var package = new MQTTDataPackage() { MessageType = MessageType.PublishAck, Data = new byte[3] { (byte)(publishDataPackage.Identifier >> 8), (byte)(publishDataPackage.Identifier & 255), 0 } }; if (dataPackage.QoS == 1) { if (!result.Success) { package.Data[2] = 1; } //SendPackage(package); } } } catch (Exception ex) { } }); } /// <summary> /// 當客戶發布消息。 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnPublish(PublishDataPackage message) { WriteLog($"客戶端{message.ClientId}發布消息{message.Topic},QoS{message.QoS}。內容:{message.Text}"); try { foreach (var client in TcpServer.Clients) { foreach (var topic in client.SubscribeTopics) { if (MqttTopicFilterComparer.IsMatch(message.Topic, topic.Topic)) { var temp = message.Clone(); temp.QoS = 0;// Math.Min(message.QoS, topic.QoS);//mqtt協議規定,取訂閱主題和發送主題中最小的qos值。 client.Publish(temp); } } } } catch (Exception ex) { } return true; } /// <summary> /// 發布消息。 /// </summary> /// <param name="message">要發布的消息。</param> /// <returns></returns> public virtual Result Publish(PublishDataPackage message) { message.Identifier = ++Identifier; this.SendPackage(message);//目前不校驗,qos 直接發送 return true; } /// <summary> /// 當客戶端發送了ping 請求 /// </summary> /// <param name="dataPackage"></param> protected virtual void OnPingRequest(MQTTDataPackage dataPackage) { var package = new MQTTDataPackage() { MessageType = MessageType.PingResponse }; SendPackage(package); } /// <summary> /// 發生訂閱消息 /// </summary> /// <param name="dataPackage"></param> private void OnSubscribe(MQTTDataPackage dataPackage) { TopicDataPackage topicDataPackage = new TopicDataPackage(dataPackage); var result = OnSubscribe(topicDataPackage); var package = new SubscribeAckDataPackage() { Identifier = topicDataPackage.Identifier, Success = result.Success }; if (result.Success) { if (!subscribeTopics.Contains(topicDataPackage)) { subscribeTopics.Add(topicDataPackage); } package.ValidQos = Qos.QoS2;// } SendPackage(package); } /// <summary> /// 取消訂閱消息 /// </summary> /// <param name="dataPackage"></param> private void OnUnSubscribe(MQTTDataPackage dataPackage) { TopicDataPackage topicDataPackage = new TopicDataPackage(dataPackage); var result = OnUnSubscribe(topicDataPackage); if (result.Success) { if (subscribeTopics.Contains(topicDataPackage)) { subscribeTopics.Remove(topicDataPackage); } var package = new IdentifierAckDataPackage(MessageType.UnSubscribeAck) { Identifier = topicDataPackage.Identifier }; SendPackage(package); } } /// <summary> /// 當收到 取消訂閱主題消息時。 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnUnSubscribe(TopicDataPackage message) { WriteLog($"客戶端{message.ClientId} 取消訂閱{message.Topic},QoS{message.QoS}"); return true; } /// <summary> /// 當收到訂閱主題消息時。 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnSubscribe(TopicDataPackage message) { WriteLog($"客戶端{message.ClientId}訂閱{message.Topic},QoS{message.RequestedQoS}"); return true; } /// <summary> /// 當客戶端發送連接請求時。 /// </summary> /// <param name="dataPackage">連接請求的包</param> private void OnConnect(MQTTDataPackage dataPackage) { ConnectDataPackage connectDataPackage = new ConnectDataPackage(dataPackage); var result = OnClientConnect(connectDataPackage); var client = TcpServer.GetClientById(connectDataPackage.ClientId); if (client.Success) { client.Data.WriteLog($"新的客戶端連接{this.RemoteEndPoint}上線,舊連接關閉"); client.Data.Close(); } ClientId = connectDataPackage.ClientId; this.KeepAlive = Convert.ToInt32(connectDataPackage.KeepAlive * 1000 * 1.5); var package = new ConnectAckDataPackage() { Result = result }; SendPackage(package); } /// <summary> /// 發送一個標准的mqtt包到客戶端連接。 /// </summary> /// <param name="package"></param> public virtual void SendPackage(MQTTDataPackage package) { WriteLog($"發送{package.MessageType}包,QOS:{package.QoS}"); this.SendBytes(package.ToBytes()); } /// <summary> /// 當客戶端連接到服務驗證是否可以連接 /// </summary> /// <param name="message"></param> /// <returns></returns> protected virtual Result OnClientConnect(ConnectDataPackage message) { WriteLog($"客戶端{message.ProtocolName}連接,客戶端編號{message.ClientId},用戶名:{message.UserName},密碼:{message.Password},CeanSession:{message.CeanSession}"); return true; } /// <summary> /// 接收一個完整的包。 /// </summary> /// <returns></returns> protected override Result<MQTTDataPackage> ReceivePackage() { Result<byte[]> result; Result<MQTTDataPackage> resultPackage = new Result<MQTTDataPackage>() { Success = false }; MQTTDataPackage package = new MQTTDataPackage() { ClientId = ClientId, RemoteEndPoint = RemoteEndPoint }; result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 頭 首字節失敗"); this.Close(); return resultPackage; } package.Header = result.Data[0]; var msgLengthResult = ReadLength(); if (!msgLengthResult.Success) { WriteLog(msgLengthResult.Message); return resultPackage; } result = this.Socket.ReceiveBytes(msgLengthResult.Data); if (!result.Success) { WriteLog($"獲取數據長度{msgLengthResult.Data}內容失敗"); return resultPackage; } package.Data = result.Data; resultPackage.Data = package; resultPackage.Success = true; resultPackage.Message = "獲取包成功"; return resultPackage; } /// <summary> /// 獲取一個長度數據 /// </summary> /// <returns></returns> protected virtual Result<int> ReadLength() { var result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } var msgType = result.Data[0]; var msgLength = msgType & 127;//取低7為的值,因為可變長度有效值只有低7位,第8位用來標識下一個字節是否屬於長度字節 var leftBit = 7; while (msgType >> 7 == 1)//判斷最高位是否為1,如果為1則說明后面的1個字節也是屬於長度字節 { result = this.Socket.ReceiveBytes(1); if (!result.Success) { WriteLog("獲取mqtt 長度失敗"); return new Result<int>(result); } msgType = result.Data[0]; msgLength = ((msgType & 127) << leftBit) | msgLength;// 因為mqtt 可變長度的字節是低位在前,所以新取到的長度要左移取到的次數*7位在|原來的長度。 leftBit += 7; } return msgLength; } }