DotNetty項目本身的示例很容易運行起來,但是具體到真實的應用場景,還是需要進一步理解DotNetty的通道處理細節,這樣才能夠在實際項目應用中處理具體的問題。
簡單的場景下會有以下幾個問題,第一,客戶端如何向服務器主動發送消息;第二,服務器如何向指定客戶端發送消息;第三,在哪里做報文的拆包和組包。我是帶着以上幾個問題進行分析的。
以上幾個問題,在下面的代碼中會有詳細的注釋,也許不是標准方案,但也是應對上述問題的一種解決途徑。看代碼:
public partial class FrmMain : Form { public static object synobj = new object(); public static Int64 count = 0; public static DateTime dt1 = DateTime.Now; public static DateTime dt2 = DateTime.Now.AddSeconds(1); private Timer t = new Timer(); private List<IChannel> listClients = new List<IChannel>(); public FrmMain() { InitializeComponent(); t.Interval = 1000; t.Tick += T_Tick; t.Start(); } private void T_Tick(object sender, EventArgs e) { this.Text = (count / (FrmMain.dt2 - FrmMain.dt1).TotalSeconds).ToString(); } /// <summary> /// 啟動服務器 /// </summary> private async void btnStartServer_Click(object sender, EventArgs e) { IEventLoopGroup mainGroup; IEventLoopGroup workerGroup; mainGroup = new MultithreadEventLoopGroup(1); workerGroup = new MultithreadEventLoopGroup(); var bootstrap = new ServerBootstrap(); bootstrap.Group(mainGroup, workerGroup); bootstrap.Channel<TcpServerSocketChannel>(); bootstrap .Option(ChannelOption.SoBacklog, 100) .Handler(new LoggingHandler("SRV-LSTN")) .ChildHandler(new ActionChannelInitializer<IChannel>(channel => { //每個客戶端的連接創建,都會執行,channel代表了具體的連接客戶端,以下過程為每個客戶端連接創建編解碼器。 //這里可以對channel進行統一管理,保存到列表當中,這樣在主程序(服務器)中就可以針對特定的客戶端(即channel)進行消息的發送。 IChannelPipeline pipeline = channel.Pipeline; listClients.Add(channel); pipeline.AddLast(new LoggingHandler("SRV-CONN")); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); //pipeline.AddLast("heart", new IdleStateHandler(0, 0, 3000 / 1000)); pipeline.AddLast("echo", new EchoServerHandler()); })); dt1 = DateTime.Now; IChannel boundChannel = await bootstrap.BindAsync(5000); #region 模擬服務器向客戶端發送消息,前提是,客戶端連接后,要保存channel到列表。 //Task.Run(() => //{ // while (true) // { // for (int i = 0; i < listClients.Count; i++) // { // var t = listClients[i];//代表某個客戶端連接 // if (t == null) { return; } // var initialMessage = Unpooled.Buffer(256); // byte[] messageBytes = Encoding.UTF8.GetBytes("=======發送消息給客戶端======="); // initialMessage.WriteBytes(messageBytes); // t.WriteAndFlushAsync(initialMessage); // } // } //}); #endregion } /// <summary> /// 啟動客戶端 /// </summary> private async void btnStartClient_Click(object sender, EventArgs e) { List<IChannel> list = new List<IChannel>(); for (int i = 0; i < 1; i++) { var group = new MultithreadEventLoopGroup(); var bootstrap = new Bootstrap(); bootstrap .Group(group) .Channel<TcpSocketChannel>() .Option(ChannelOption.TcpNodelay, true) .Handler(new ActionChannelInitializer<ISocketChannel>(channel => { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new LoggingHandler()); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); //pipeline.AddLast("heart", new IdleStateHandler(0, 0, 3000 / 1000)); pipeline.AddLast("echo", new EchoClientHandler()); })); IChannel clientChannel = await bootstrap.ConnectAsync(textBox1.Text, 5000); //clientChannel為客戶端持有的連接對象,可以通過它主動向服務器發起請求,clientChannel.WriteAndFlushAsync() list.Add(clientChannel); } System.Threading.Thread.Sleep(1000); #region 模擬客戶端向服務器發送消息,前提是,客戶端鏈接后,要保存channel。 //list.ForEach(t => //{ // var initialMessage = Unpooled.Buffer(256); // byte[] messageBytes = Encoding.UTF8.GetBytes("====發送消息給服務器===="); // initialMessage.WriteBytes(messageBytes); // t.WriteAndFlushAsync(initialMessage); //}); #endregion } private void FrmMain_Load(object sender, EventArgs e) { //ConsoleLoggerProvider provider = new ConsoleLoggerProvider(new ConsoleLoggerSettings()); //InternalLoggerFactory.DefaultFactory.AddProvider(provider); } }
上面的代碼主要是找到了服務器和客戶端各自向對方發送數據的入口點,具體設計時可以對IChannel對象進行封裝和維護。那么,對於我們自定義協議,我們怎樣進行數據包的組包和拆包呢?答案就時上面代碼中的EchoServerHandler和EchoClientHandler兩個通道處理器對象。以服務器部分的代碼為例:
pipeline.AddLast(new LoggingHandler("SRV-CONN")); pipeline.AddLast("framing-enc", new LengthFieldPrepender(2)); pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2)); //pipeline.AddLast("heart", new IdleStateHandler(0, 0, 3000 / 1000)); pipeline.AddLast("echo", new EchoServerHandler());
服務中創建了以上四個IChannel接口對象,他們之間是什么關系呢?順序執行!接收和發送都按照AddLast的先后順序執行。如接收數據時,先做日志處理,再做解碼LengthFieldBasedFrameDecoder,最后做EchoServerHandler的自定義處理,因為是接收,所以不做編碼LengthFieldPrepender這個處理,這是DotNetty內部判斷的,LengthFieldPrepender是繼承了MessageToMessageEncoder的,MessageToMessageEncoder本身就代表了編碼操作,而接收數據不需要做編碼,所以這個操作會被略過。LengthFieldPrepender是在服務器發送數據時才做。
每個處理過程都接收上個處理過程的處理結果,比如EchoServerHandler接收到的數據,是LengthFieldBasedFrameDecoder處理完成后的輸出。演示程序的協議類型是頭部兩個字節代表數據包長度,后面是數據體,這樣在LengthFieldBasedFrameDecoder處理完成后,EchoServerHandler接收到的是不包含描述長度的兩個字節,只有數據體部分的數據,這樣我們就可以在自定義的EchoServerHandler中,進行數據體的拆包操作了。
EchoServerHandler和EchoClientHandler的代碼如下:
public class EchoServerHandler : ChannelHandlerAdapter { public override void ChannelRead(IChannelHandlerContext context, object message) { var buffer = message as IByteBuffer; if (buffer != null) { lock (FrmMain.synobj) { FrmMain.count++; } FrmMain.dt2 = DateTime.Now; Console.WriteLine(System.Threading.Thread.CurrentThread.ManagedThreadId + "Received from client: " + buffer.ToString(Encoding.UTF8) + "=" + FrmMain.count / (FrmMain.dt2 - FrmMain.dt1).TotalSeconds); } context.WriteAsync(message); } public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }
public class EchoClientHandler : ChannelHandlerAdapter { readonly IByteBuffer initialMessage; public EchoClientHandler() { this.initialMessage = Unpooled.Buffer(256); byte[] messageBytes = Encoding.UTF8.GetBytes("Hello world"); this.initialMessage.WriteBytes(messageBytes); } public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage); public override void ChannelRead(IChannelHandlerContext context, object message) { var byteBuffer = message as IByteBuffer; if (byteBuffer != null) { Console.WriteLine(System.Threading.Thread.CurrentThread.ManagedThreadId + "Received from server: " + byteBuffer.ToString(Encoding.UTF8)); } //System.Threading.Thread.Sleep(500); context.WriteAsync(message); } public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) { Console.WriteLine("Exception: " + exception); context.CloseAsync(); } }