目前的.net 架構下缺乏高效的TCP消息發送組件,而這種組件是構建高性能分布式應用所必需的。為此我結合多年的底層開發經驗開發了一個.net 下的高效TCP消息發送組件。這個組件在異步發送時可以達到每秒160萬包,而相同大小的數據包用WCF的TCP模式OneWay 方式發送每秒只能達到5.6萬包。
項目首頁
功能介紹:
NTCPMSG 組件是基於 .net framework 的開源TCP 消息發送和接收組件。和.net framework 提供的 TcpClient 類比較,這個組件是以包的方式發送消息,不存在沾包的情況。最新發布的1.3.0.0 版本除提供發送 byte[] 數據外,還提供直接發送和接收對象,並提供多種類型的序列化方式供調用者選擇。發送模式上,NTCPMSG 提供異步發送和同步發送兩種方式。另外相對於 .net framework 自帶的 TCP 類,NTCPMSG 提供了 Connection, Disconnection, accept, Receive 等事件,方便調用者對消息發送和接收的各個階段進行控制。
相比很多通訊組件只能從客戶端發送消息到服務器然后接受服務器響應, NTCPMSG 還增加了直接從服務器推送消息給客戶端的功能。這個功能對於后台分布式應用的開發也是非常有用的,因為這些分布式應用相互往往是對等的。
NTCPMSG 主要類:
SingleConnection
這個類每次建立1個鏈接到服務器
主要的方法:
AsyncSend: 這個方法發送異步消息到服務器
SyncSend: 這個方法發送同步消息到服務器
Connect: 這個方法用於和服務器建立連接時調用。這個方法有個 autoConnect 參數,如果這個參數為true,那么SingleConnection 類在和服務器斷鏈后會嘗試自動與服務器建鏈。這個是個比較實用的功能,調用者不需要考慮服務器連接中斷后恢復的情況。
主要事件:
ConnectedEventHandler: 這個事件在建立鏈接成功時觸發
ErrorEventHandler: 這個事件在發生錯誤的情況下觸發
RemoteDisconnected: 這個事件在和服務器斷鏈時觸發
ReceiveEventHandler: 這個事件當接收到直接從服務器推送過來的消息時觸發。
SingleConnectionCable
這個類可以將多個鏈接綁定到一起和服務器通訊。有用TCP本身是基於滑動窗口協議的,單鏈路的發送速度受網絡時延的影響,這也是TCP比UDP慢的一個原因。但由於TCP可以提供比UDP可靠的通訊,大部分對可靠性要求較高的系統還是要采用TCP方式發送消息。為了克服網絡時延帶來的性能下降,通過多鏈路同時發送是一個很好的解決方案,因為每個鏈路都獨立維護一個窗口,鏈路之間是並行發送。實測下來,綁定多鏈路也確實比單鏈路的發送速度快很多。所以我一般推薦用 SingleConnectionCable 這個類連接服務器。
SingleConnectionCable 和 SingleConnection 的主要方法和事件是相同的,這里就不重復介紹了。
NTcpListener
這個類顧名思義是服務器側用於偵聽消息的類。
主要的方法:
AsyncSend: 這個方法推送異步消息到客戶端
SyncSend: 這個方法推送同步消息到客戶消息,目前版本還沒有實現。
Listen: 這個方法用於開始偵聽
Close: 這個方法用於關閉偵聽
主要事件:
Accepted: 這個事件在客戶端建立鏈接成功時觸發
ErrorReceived: 這個事件在發生錯誤的情況下觸發
RemoteDisconnected: 這個事件在和客戶端斷鏈時觸發
DataReceived: 這個事件當接收到直接從服務器推送過來的消息時觸發。
性能報告:
和 WCF 的 TCP 模式比較,相同環境下,發送4字節 byte[] ,異步方式下, NTCMSG 每秒大概發送160萬包,CPU占有率60%, WCF 每秒5.6萬包,CPU占有率為100%。同步方式下 NTCPMSG 每秒13萬包,WCF每秒是2萬包。
和 TCPClient 的異步發送比較(TCPClient 發送時不考慮沾包問題,就是連續發送 4字節的 byte[]) , TCPClient 每秒大概可以發送15萬包。為什么NTCPMSG 比TCPClient 發送的還要快?這是因為 NTCPMSG 底層做了組包,將多個小包組合成大包批量發送過去。有人會問這樣是不是會導致發送不實時呢? NTCPMSG 考慮到了這個實時性的問題,組包是在兼顧實時性的情況下完成的,也就是說如果在一個非常短的時間間隔內如果累計發送的包達到一定閥值才會組包發送,這就很好的解決的實時發送的問題。
下面圖表是 NTCPMSG 自身的性能測試報告:
測試環境: 2台 i5 筆記本,1Gbp 以太網(千兆以太網交換機),客戶端用SingleConnectionCable 連接。
異步發送測試
這個測試是發送指定大小的 byte[] 數據,客戶機上開兩個測試進程模擬兩台機器同時發送進行測試。
Asyncronious message testing |
|||
Size |
Packages per second |
Network usage |
Server CPU |
4 |
1,600,000 |
26.00% |
60.00% |
16 |
1,300,000 |
33.00% |
60.00% |
64 |
700,000 |
44.00% |
60.00% |
256 |
260,000 |
58.00% |
50.00% |
1024 |
70,000 |
60.00% |
30.00% |
8192 |
14,500 |
96.00% |
27.00% |
65536 |
1,830 |
97.00% |
30.00% |
同步發送測試
這個測試是發送指定大小的 byte[] 數據,客戶機上開四個測試進程(每個進程32個線程並行發送)模擬四台機器同時發送進行測試。
Syncronious message testing |
|||
Size |
Packages per second |
Network usage |
Server CPU |
4 |
130,000 |
4.50% |
35.00% |
16 |
120,000 |
5.50% |
43.00% |
64 |
118,000 |
10.00% |
38.00% |
256 |
90,000 |
20.00% |
60.00% |
1024 |
25,000 |
22.00% |
80.00% |
8192 |
9,000 |
50.00% |
50.00% |
65536 |
1,000 |
50.00% |
15.00% |
不同的序列化方式的測試比較
這個測試是發送一個測試消息對象,客戶機上開兩個測試進程模擬兩台機器同時異步發送進行測試。
Compare with different serialization |
|||
Serialization |
Packages per second |
Network usage |
Server CPU |
Bin |
20,000 |
1.56% |
25.00% |
Xml |
13,000 |
1.38% |
40.00% |
Json |
16,000 |
1.40% |
60.00% |
SimpleBin |
22,000 |
1.64% |
28.00% |
Struct |
120,000 |
14.00% |
32.00% |
Custom |
320,000 |
9.70% |
30.00% |
這里需要說明一下,為什么前幾種序列化方式效率不高,這是因為前幾種序列化用到了反射,這個反射極大的消耗了CPU資源。如果要高效的發送,建議用結構體方式發送或者自定義序列化發送。自定義序列化由於已知要發送的數據結構,不用到發射,發送效率是最高的。從這個測試看自定義序列化,每秒發送30萬包是,服務器的CPU才30%,理論上如果多客戶端發送,服務器每秒應該可以最大接收到100萬左右的消息。
項目結構
NTCPMSG.sln 這個是 VS2008 下的項目,編譯出來是 .net 2.0 的組件
NTCPMSG2010.sln 這個是 VS2010 下的項目,編譯出來是 .net 4.0 的組件
ClientTest
這個項目是測試代碼的客戶端客戶端部分。
ServerTest
這個是測試代碼的服務器部分
Example
這個是簡單的示例代碼。可以用於入門學習。
NTCPMSG
這個是組件的核心代碼。
示例代碼
客戶端
using System; using System.Collections.Generic; using System.Text; using System.Net; using NTCPMSG.Client; using NTCPMSG.Event; namespace Example { class Client { /// <summary> /// DataReceived event will be called back when server get message from client which connect to. /// </summary> /// <param name="sender"></param> /// <param name="args"></param> static void ReceiveEventHandler(object sender, ReceiveEventArgs args) { switch ((Event)args.Event) { case Event.PushMessage: //Get OneWay message from server if (args.Data != null) { try { Console.WriteLine(Encoding.UTF8.GetString(args.Data)); } catch (Exception e) { Console.WriteLine(e); } } break; } } public static void Run(string[] args) { Console.Write("Please input server IP Address [127.0.0.1]:"); string ipAddress = Console.ReadLine().Trim().ToLower(); if (ipAddress == "") { ipAddress = "127.0.0.1"; } try { //************** SingConnection Example ********************** Console.Write("Press any key to start single connection example"); Console.ReadKey(); Console.WriteLine(); //Create a SingleConnection instanace that will try to connect to host specified in //ipAddress and port (2500). SingleConnection client = new SingleConnection(new IPEndPoint(IPAddress.Parse(ipAddress), 2500)); client.ReceiveEventHandler += new EventHandler<ReceiveEventArgs>(ReceiveEventHandler); client.Connect(); Console.WriteLine("AsyncSend: Hello world! I am Single"); //Send an asynchronously message to server client.AsyncSend((UInt32)Event.OneWay, Encoding.UTF8.GetBytes("Hello world! I am Single")); int number = 0; try { Console.WriteLine("SyncSend {0}", number); //send a synchronously message to server //send a number with event: Event.Return to server and get the response from server //with the number increased. byte[] retData = client.SyncSend((UInt32)Event.Return, BitConverter.GetBytes(number)); number = BitConverter.ToInt32(retData, 0); Console.WriteLine("Get {0}", number); } catch (Exception e) { Console.WriteLine(e); } Console.WriteLine("Waitting for 10 seconds to finish simple connection example."); System.Threading.Thread.Sleep(10000); client.Close(); //************* SingleConnectionCable Example ***************** Console.Write("Press any key to start single connection cable example"); Console.ReadKey(); Console.WriteLine(); //Create a SingleConnectionCable instance that will try to connect to host specified in //ipAddress and port (2500). //by default, SingleConnectionCable will try to connect automatically and including 6 tcp connections. SingleConnectionCable clientCable = new SingleConnectionCable(new IPEndPoint(IPAddress.Parse(ipAddress), 2500)); clientCable.ReceiveEventHandler += new EventHandler<ReceiveEventArgs>(ReceiveEventHandler); clientCable.Connect(); Console.WriteLine("AsyncSend: Hello world! I am Cable {0}", clientCable.CableId); //Send a one way message to server clientCable.AsyncSend((UInt32)Event.OneWay, Encoding.UTF8.GetBytes(string.Format("Hello world! I am Cable {0}", clientCable.CableId))); //Send object with bin serialization (By Default) Console.WriteLine("Bin serialization"); clientCable.AsyncSend((UInt32)Event.Bin, "Bin serialization"); while (true) { Console.WriteLine("SyncSend {0}", number); try { //send a number with event: Event.Return to server and get the response from server //with the number increased. byte[] retData = clientCable.SyncSend((UInt32)Event.Return, BitConverter.GetBytes(number)); number = BitConverter.ToInt32(retData, 0); Console.WriteLine("Get {0}", number); } catch (Exception e) { Console.WriteLine(e); } Console.WriteLine("Quit when you press ESC. Else continue SyncSend."); //Quit when you press ESC if (Console.ReadKey().KeyChar == 0x1B) { clientCable.Close(); return; } } } catch (Exception e) { Console.WriteLine(e); Console.ReadLine(); } } } }
服務器
using System; using System.Collections.Generic; using System.Text; using System.Net; using NTCPMSG.Server; using NTCPMSG.Event; using NTCPMSG.Serialize; namespace Example { class Server { class BinMessageParse : MessageParse { public BinMessageParse() : base(new BinSerializer(), new BinSerializer()) { } public override object ProcessMessage(int SCBID, EndPoint RemoteIPEndPoint, NTCPMSG.MessageFlag Flag, ushort CableId, uint Channel, uint Event, object obj) { Console.WriteLine(obj); return null; } } static BinMessageParse _sBinMessageParse = new BinMessageParse(); /// <summary> /// DataReceived event will be called back when server get message from client which connect to. /// </summary> /// <param name="sender"></param> /// <param name="args"></param> static void ReceiveEventHandler(object sender, ReceiveEventArgs args) { switch ((Event)args.Event) { case Event.OneWay: //Get OneWay message from client if (args.Data != null) { try { if (args.CableId != 0) { Console.WriteLine("Get one way message from cable {0}", args.CableId); } else { Console.WriteLine("Get one way message from {0}", args.RemoteIPEndPoint); } Console.WriteLine(Encoding.UTF8.GetString(args.Data)); } catch (Exception e) { Console.WriteLine(e); } } break; case Event.Return: //Get return message from client if (args.Data != null) { try { int fromClient = BitConverter.ToInt32(args.Data, 0); args.ReturnData = BitConverter.GetBytes(++fromClient); } catch (Exception e) { Console.WriteLine(e); } } break; case Event.Bin: _sBinMessageParse.ReceiveEventHandler(sender, args); break; } } /// <summary> /// RemoteDisconnected event will be called back when specified client disconnected. /// </summary> /// <param name="sender"></param> /// <param name="args"></param> static void DisconnectEventHandler(object sender, DisconnectEventArgs args) { Console.WriteLine("Remote socket:{0} disconnected.", args.RemoteIPEndPoint); } /// <summary> /// Accepted event will be called back when specified client connected /// </summary> /// <param name="sender"></param> /// <param name="args"></param> static void AcceptedEventHandler(object sender, AcceptEventArgs args) { Console.WriteLine("Remote socket:{0} connected.", args.RemoteIPEndPoint); } public static void Run(string[] args) { NTCPMSG.Server.NTcpListener listener; //Create a tcp listener that listen 2500 TCP port. listener = new NTcpListener(new IPEndPoint(IPAddress.Any, 2500)); //DataReceived event will be called back when server get message from client which connect to. listener.DataReceived += new EventHandler<ReceiveEventArgs>(ReceiveEventHandler); //RemoteDisconnected event will be called back when specified client disconnected. listener.RemoteDisconnected += new EventHandler<DisconnectEventArgs>(DisconnectEventHandler); //Accepted event will be called back when specified client connected listener.Accepted += new EventHandler<AcceptEventArgs>(AcceptedEventHandler); //Start listening. //This function will not block current thread. listener.Listen(); Console.WriteLine("Listening..."); while (true) { System.Threading.Thread.Sleep(5 * 1000); //Push message to client example. foreach (IPEndPoint clientIpEndPoint in listener.GetRemoteEndPoints()) { bool successful = listener.AsyncSend(clientIpEndPoint, (uint)Event.PushMessage, Encoding.UTF8.GetBytes("I am from server!")); if (successful) { Console.WriteLine(string.Format("Push message to {0} successful!", clientIpEndPoint)); } else { Console.WriteLine(string.Format("Push message to {0} fail!", clientIpEndPoint)); } } foreach (UInt16 cableId in listener.GetCableIds()) { bool successful = listener.AsyncSend(cableId, (uint)Event.PushMessage, Encoding.UTF8.GetBytes(string.Format("Hi cable {0}!", cableId))); if (successful) { Console.WriteLine(string.Format("Push message to cable {0} successful!", cableId)); } else { Console.WriteLine(string.Format("Push message to cable {0} fail!", cableId)); } } } //System.Threading.Thread.Sleep(System.Threading.Timeout.Infinite); } } }