BeetleX
是.net core
平台下的一個開源TCP
通訊組件,它不僅使用簡便還提供了出色性能的支持,可以輕易讓你實現上百萬級別RPS吞吐的服務應用。組件所提供的基礎功能也非常完善,可以讓你輕易擴展自己的服務應用,以下提組件集成的功能:
-
完善的會話管理機制,可以根據連接狀態和相關日志
-
專門針對內存池實現的異步流讀寫,支持標准Stream的同並提供高效的性能
-
消息IO合並,廣播序列化合並等性能強化功能
-
提供簡潔的協議擴展規范,輕易實現http,websocket,mqtt等應用通訊協議
-
支持TLS,讓你構建的通訊服務更安全可靠
擴展的組件
以下是Beetlex
擴展的一些功能組件
- https://github.com/IKende/FastHttpApi
- https://github.com/IKende/Bumblebee
- https://github.com/IKende/BeetleX.Redis
- https://github.com/IKende/XRPC
- https://github.com/IKende/HttpClients
性能
一開始說組可以讓你現上百萬級別RPS吞吐的服務應用其實一點不假,BeetleX
的基礎性能有這樣的支撐能力;雖然組件不能說是.net core
上性能最好的,但在功能和綜合性能上絕對非常出色(詳細可以https://tfb-status.techempower.com/ 查看測試結果,可惜這個網站提交的.net core
組件比較少,大部都是基於aspcore的通訊模塊擴展).以下是JSON serialization
基礎輸出的一個測試結果(Plaintext
在官方的測試環境一直沒辦法跑起來....)
在測試中組件只落后於aspcore-rhtx
這是紅帽專門針對 .net core編寫的linux網絡驅動.
Single query
構建基礎TCP應用
組件在構建TCP服務的時候非常簡單,主要歸功於它提供了完善的Stream
讀寫功能,而這些功能讓你完全不用關心bytes的讀寫。基於Stream
的好處就是可以輕松和第三方序列化的組件進行整合。以下是簡單地構建一個Hello
服務。
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } public override void SessionReceive(IServer server, SessionReceiveEventArgs e) { var pipeStream = e.Stream.ToPipeStream(); if (pipeStream.TryReadLine(out string name)) { Console.WriteLine(name); pipeStream.WriteLine("hello " + name); e.Session.Stream.Flush(); } base.SessionReceive(server, e); } }
以上就是一個簡單的TCP
服務,讓以代碼正常運行需要引用Beetlex
最新版的組件可以在Nuget上找到。以上服務的功能很簡單當接收數據后嘗試從流中讀取一行字符,如果讀取成功則把內容寫入到流中提交返回。通過以上代碼是不是感覺寫個服務比較簡單(但是PipeStream
並不是線程安全的,所以不能涉及到多線程讀寫它)
協議處理規則
其實PipeStream
處理數據已經非常方便了,那為什么還需要制定一個協議處理規范呢?前面已經說了PipeStream
並不是線程安全的,很容易帶來使用上的風險,所以引入協議處理規則來進行一個安全約束的同時可以實現多線程消息處理。組件提供了這樣一個接口來規范消息的處理,接口如下:
public interface IPacket : IDisposable { EventHandler<EventArgs.PacketDecodeCompletedEventArgs> Completed { get; set; } IPacket Clone(); void Decode(ISession session, System.IO.Stream stream); void Encode(object data, ISession session, System.IO.Stream stream); byte[] Encode(object data, IServer server); ArraySegment<byte> Encode(object data, IServer server, byte[] buffer); }
如果你要處理消息對象,則需要實現以上接口(當然這個接口的實現不是必須的,只要把握好PipeStream
安全上的控制就好);但實現這接口來處理消息可以帶很多好處,可以多消息合並IO,廣播消息合並序列化等高效的功能。不過在不了解組件的情況實現這個接口的確也是有些難度的,所以組件提供了一個基礎的類FixedHeaderPacket
,它是一個抽像類用於描述有個消息頭長的信息流處理。
字符消息分包
接下來通過FixedHeaderPacket
來實現一個簡單的字符分包協議消息;主要在發送消息的時候添加一個大小頭用來描述消息的長度(這是在TCP中解決粘包的主要手段)。
public class StringPacket : BeetleX.Packets.FixedHeaderPacket { public override IPacket Clone() { return new StringPacket(); } protected override object OnReader(ISession session, PipeStream stream) { return stream.ReadString(CurrentSize); } protected override void OnWrite(ISession session, object data, PipeStream stream) { stream.Write((string)data); } }
通過FixedHeaderPacket
制定一個分包規則是非常簡單的,主要實現讀寫兩個方法。下面即可在服務中引用這個包作為TCP
數據流的分析規則:
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program,StringPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { Console.WriteLine(message); server.Send($"hello {message}", session); } }
經過分析器包裝后,就再也不用流來處理數據了,可以直接進行對像的發送處理。
集成Protobuf
處理String
並不是友好的事情,畢竟沒有對象來得直觀和操作方便;以下是通過FixedHeaderPacket
擴展Protobuf
對象傳輸,以下是針對Protobuf
的規則擴展:
public class ProtobufPacket : BeetleX.Packets.FixedHeaderPacket { static ProtobufPacket() { TypeHeader.Register(typeof(ProtobufClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new ProtobufPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return ProtoBuf.Meta.RuntimeTypeModel.Default.Deserialize(stream, null, type, size); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); ProtoBuf.Meta.RuntimeTypeModel.Default.Serialize(stream, data); } }
使用規則分析器
class Program : ServerHandlerBase { private static IServer server; public static void Main(string[] args) { server = SocketFactory.CreateTcpServer<Program, Messages.ProtobufPacket>(); //server.Options.DefaultListen.Port =9090; //server.Options.DefaultListen.Host = "127.0.0.1"; server.Open(); Console.Read(); } protected override void OnReceiveMessage(IServer server, ISession session, object message) { ((Messages.Register)message).DateTime = DateTime.Now; server.Send(message, session); } }
不同序列化的擴展
既然有了一個Protobuf
作為樣本,那針對其他序列化的實現就比較簡單了
- json
public class JsonPacket : BeetleX.Packets.FixedHeaderPacket { static JsonPacket() { TypeHeader.Register(typeof(JsonClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new JsonPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; var buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(size); stream.Read(buffer, 0, size); try { return SpanJson.JsonSerializer.NonGeneric.Utf8.Deserialize(new ReadOnlySpan<byte>(buffer, 0, size), type); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer); } } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); var buffer = SpanJson.JsonSerializer.NonGeneric.Utf8.SerializeToArrayPool(data); try { stream.Write(buffer.Array, buffer.Offset, buffer.Count); } finally { System.Buffers.ArrayPool<byte>.Shared.Return(buffer.Array); } } }
- messagepack
public class MsgpackPacket : BeetleX.Packets.FixedHeaderPacket { static MsgpackPacket() { TypeHeader.Register(typeof(MsgpackClientPacket).Assembly); } public static BeetleX.Packets.CustomTypeHeader TypeHeader { get; set; } = new BeetleX.Packets.CustomTypeHeader(BeetleX.Packets.MessageIDType.INT); public override IPacket Clone() { return new MsgpackPacket(); } protected override object OnReader(ISession session, PipeStream stream) { Type type = TypeHeader.ReadType(stream); var size = CurrentSize - 4; return MessagePackSerializer.NonGeneric.Deserialize(type, stream, true); } protected override void OnWrite(ISession session, object data, PipeStream stream) { TypeHeader.WriteType(data, stream); MessagePackSerializer.NonGeneric.Serialize(data.GetType(), stream, data); } }