長久以來,.Net開發人員都非常羡慕Java有Netty這樣,高效,穩定又易用的網絡通信基礎框架。終於微軟的Azure團隊,使用C#實現的Netty的版本發布。不但使用了C#和.Net平台的技術特點,並且保留了Netty原來絕大部分的編程接口。讓我們在使用時,完全可以依照Netty官方的教程來學習和使用DotNetty應用程序。
DotNetty同時也是開源的,它的源代碼托管在Github上:https://github.com/azure/dotnetty
0x01 項目預覽
從github上下載最新的代碼到本地,使用VS2017或者VSCode打開下載好的代碼,可以看到如圖所示的代碼那結構,其中源碼部分有9個項目組成,其中
DotNetty.Common 是公共的類庫項目,包裝線程池,並行任務和常用幫助類的封裝
DotNetty.Transport 是DotNetty核心的實現
DotNetty.Buffers 是對內存緩沖區管理的封裝
DotNetty.Codes 是對編解碼是封裝,包括一些基礎基類的實現,我們在項目中自定義的協議,都要繼承該項目的特定基類和實現
DotNetty.Handlers 封裝了常用的管道處理器,比如Tls編解碼,超時機制,心跳檢查,日志等,如果項目中沒有用到可以不引用,不過一般都會用到
其他還有對Redis的編解碼,Mqtt的編解碼,Protobuf2/3的編解碼項目中可根據實際情況引用
很遺憾Http協議和Websocket協議還沒有實現。
0x02 快速開始-示例-回聲程序的實現
從上一步下載的代碼中,看到有一個sample目錄,有很多例子,都大同小異, 先來看這個最簡單的Echo服務的實現吧.
Echo服務,分為服務端和客戶端,服務端使用DotNetty框架啟動一個Socket服務,並等待客戶端鏈接,當客戶端鏈接並接收客戶端消息,並將接收到的消息原樣返回給客戶端。而客戶端同樣使用DotNetty框架啟動一個Socket客戶端服務,並鏈接到服務端,並發送一條Hello的字符串信息,並等待服務端返回。如此往復。
2.1 Echo Server
來一起看一下代碼吧,我把注釋都寫到代碼中:
static async Task RunServerAsync()
{
//設置輸出日志到Console
ExampleHelper.SetConsoleLogger();
// 主工作線程組,設置為1個線程
var bossGroup = new MultithreadEventLoopGroup(1);
// 工作線程組,默認為內核數*2的線程數
var workerGroup = new MultithreadEventLoopGroup();
X509Certificate2 tlsCertificate = null;
if (ServerSettings.IsSsl) //如果使用加密通道
{
tlsCertificate = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password");
}
try
{
//聲明一個服務端Bootstrap,每個Netty服務端程序,都由ServerBootstrap控制,
//通過鏈式的方式組裝需要的參數
var bootstrap = new ServerBootstrap();
bootstrap
.Group(bossGroup, workerGroup) // 設置主和工作線程組
.Channel<TcpServerSocketChannel>() // 設置通道模式為TcpSocket
.Option(ChannelOption.SoBacklog, 100) // 設置網絡IO參數等,這里可以設置很多參數,當然你對網絡調優和參數設置非常了解的話,你可以設置,或者就用默認參數吧
.Handler(new LoggingHandler("SRV-LSTN")) //在主線程組上設置一個打印日志的處理器
.ChildHandler(new ActionChannelInitializer<ISocketChannel>(channel =>
{ //工作線程連接器 是設置了一個管道,服務端主線程所有接收到的信息都會通過這個管道一層層往下傳輸
//同時所有出棧的消息 也要這個管道的所有處理器進行一步步處理
IChannelPipeline pipeline = channel.Pipeline;
if (tlsCertificate != null) //Tls的加解密
{
pipeline.AddLast("tls", TlsHandler.Server(tlsCertificate));
}
//日志攔截器
pipeline.AddLast(new LoggingHandler("SRV-CONN"));
//出棧消息,通過這個handler 在消息頂部加上消息的長度
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
//入棧消息通過該Handler,解析消息的包長信息,並將正確的消息體發送給下一個處理Handler,該類比較常用,后面單獨說明
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
//業務handler ,這里是實際處理Echo業務的Handler
pipeline.AddLast("echo", new EchoServerHandler());
}));
// bootstrap綁定到指定端口的行為 就是服務端啟動服務,同樣的Serverbootstrap可以bind到多個端口
IChannel boundChannel = await bootstrap.BindAsync(ServerSettings.Port);
Console.ReadLine();
//關閉服務
await boundChannel.CloseAsync();
}
finally
{
//釋放工作組線程
await Task.WhenAll(
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
}
}
來看下實際的業務代碼,比較簡單,也就是打印日志,並返回收到的字符串
public class EchoServerHandler : ChannelHandlerAdapter //管道處理基類,較常用
{
// 重寫基類的方法,當消息到達時觸發,這里收到消息后,在控制台輸出收到的內容,並原樣返回了客戶端
public override void ChannelRead(IChannelHandlerContext context, object message)
{
var buffer = message as IByteBuffer;
if (buffer != null)
{
Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));
}
context.WriteAsync(message);//寫入輸出流
}
// 輸出到客戶端,也可以在上面的方法中直接調用WriteAndFlushAsync方法直接輸出
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();
//捕獲 異常,並輸出到控制台后斷開鏈接,提示:客戶端意外斷開鏈接,也會觸發
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
2.2 Echo Client
客戶端的代碼和服務端的代碼相差很少,體現了Netty統一的編程模型。有幾個不同點:
- 客戶端的Bootstrap不是ServerBootstrap了,
- 客戶端不需要主線程組,只有工作線程組,消息處理管道也建立在里主線程工作組的攔截通道上。
- 最后不是bind而是connect
static async Task RunClientAsync()
{
ExampleHelper.SetConsoleLogger();
var group = new MultithreadEventLoopGroup();
X509Certificate2 cert = null;
string targetHost = null;
if (ClientSettings.IsSsl)
{
cert = new X509Certificate2(Path.Combine(ExampleHelper.ProcessDirectory, "dotnetty.com.pfx"), "password");
targetHost = cert.GetNameInfo(X509NameType.DnsName, false);
}
try
{
var bootstrap = new Bootstrap();
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
if (cert != null)
{
pipeline.AddLast("tls", new TlsHandler(stream => new SslStream(stream, true, (sender, certificate, chain, errors) => true), new ClientTlsSettings(targetHost)));
}
pipeline.AddLast(new LoggingHandler());
pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));
pipeline.AddLast("echo", new EchoClientHandler());
}));
IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port));
Console.ReadLine();
await clientChannel.CloseAsync();
}
finally
{
await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
}
}
業務代碼
// 代碼和服務端也相差不多,並且繼承了同樣的基類。
public class EchoClientHandler : ChannelHandlerAdapter
{
readonly IByteBuffer initialMessage;
public EchoClientHandler()
{
this.initialMessage = Unpooled.Buffer(ClientSettings.Size);
byte[] messageBytes = Encoding.UTF8.GetBytes("Hello world");
this.initialMessage.WriteBytes(messageBytes);
}
//重寫基類方法,當鏈接上服務器后,馬上發送Hello World消息到服務端
public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage);
public override void ChannelRead(IChannelHandlerContext context, object message)
{
var byteBuffer = message as IByteBuffer;
if (byteBuffer != null)
{
Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8));
}
context.WriteAsync(message);
}
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
Console.WriteLine("Exception: " + exception);
context.CloseAsync();
}
}
0x03 常用Handler和基類
從Echo服務的例子中,我們可以看到Netty程序不管時服務端還是客戶端都通過一個Bootstrap/ServerBootstrap來啟動Socket程序,並通過設置處理Handler管道來處理出入的消息,管道中常見的攔截器有加解密,日志記錄,編解碼,消息頭處理,業務處理等,實際業務中根據情況可以自行添加自己的業務邏輯,同時很多處理器代碼在服務端和客戶端是公用的,Netty本身已經提供了一些常用處理器和業務處理器的基類來簡化實際開發,我們一起看一下
3.1 TlsHandler
Netty支持Tls加密傳輸,TlsHandler類可以在開發人員無須關心加密傳輸時字節碼的變化,只關心自己的業務代碼即可。在管道處理的第一個配置該類即可
3.2 LengthFieldPrepender
這個handler 會在實際發送前在將數據的長度放置在數據前,本例中使用2個字節來存儲數據的長度。
3.3 LengthFieldBasedFrameDecoder
這個handler比較常用,會在解碼前用於解析數據,用於讀取數據包的頭信息,特別是包長,並等待數據達到包長后再交由下一個handler處理。
參數說明 以下是Amp協議的參數值,並注釋了意義
InitialBytesToStrip = 0, //讀取時需要跳過的字節數
LengthAdjustment = -5, //包實際長度的糾正,如果包長包括包頭和包體,則要減去Length之前的部分
LengthFieldLength = 4, //長度字段的字節數 整型為4個字節
LengthFieldOffset = 1, //長度屬性的起始(偏移)位
MaxFrameLength = int.MaxValue, // 最大包長
3.4 ChannelHandlerAdapter和SimpleChannelInboundHandler
業務處理的常用Handler基類,一般客戶端和服務端的業務處理handler 都要繼承這個這兩個類,其中SimpleChannelInboundHandler
3.5 IdleStateHandler 鏈接狀態檢查handler
這個handler一般用於檢查鏈接的狀態,比如寫超時,讀超時。在實際項目中一般在客戶端添加它,並用於發送心跳包。
以下是DotBPE在客戶端管道中 第一個添加IdleStateHandler 並設置觸發時間
var bootstrap = new Bootstrap();
bootstrap
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Option(ChannelOption.ConnectTimeout, TimeSpan.FromSeconds(3))
.Group(new MultithreadEventLoopGroup())
.Handler(new ActionChannelInitializer<ISocketChannel>(c =>
{
var pipeline = c.Pipeline;
pipeline.AddLast(new LoggingHandler("CLT-CONN"));
MessageMeta meta = _msgCodecs.GetMessageMeta();
// IdleStateHandler
pipeline.AddLast("timeout", new IdleStateHandler(0, 0, meta.HeartbeatInterval / 1000));
//消息前處理
pipeline.AddLast(
new LengthFieldBasedFrameDecoder(
meta.MaxFrameLength,
meta.LengthFieldOffset,
meta.LengthFieldLength,
meta.LengthAdjustment,
meta.InitialBytesToStrip
)
);
pipeline.AddLast(new ChannelDecodeHandler<TMessage>(_msgCodecs));
pipeline.AddLast(new ClientChannelHandlerAdapter<TMessage>(this));
}));
return bootstrap;
然后在業務處理handler中處理UserEventTriggered事件
//ChannelHandlerAdapter 重寫UserEventTriggered
public override void UserEventTriggered(IChannelHandlerContext context, object evt){
if(evt is IdleStateEvent){
var eventState = evt as IdleStateEvent;
if(eventState !=null){
this._bootstrap.SendHeartbeatAsync(context,eventState);
}
}
}
更多細節可以參考 《Netty 4.x 用戶指南》