1.Netty介紹
1.1為什么需要Netty
1.1.1不是所有的網絡框架都是一樣的
1.1.2Netty的功能非常豐富
框架組成
1.2異步設計
1.2.1Callbacks(回調)
簡單的回調
public interface Fetcher { void fetchData(FetchCallback callback); } public interface FetchCallback { void onData(Data data); void onError(Throwable cause); } public class Worker { public void doWork() { Fetcher fetcher = ... fetcher.fetchData(new FetchCallback() { @Override public void onData(Data data) { //獲取到數據 System.out.println("Data received: " + data); } @Override public void onError(Throwable cause) { //未獲取到數據 System.err.println("An error accour: " + cause.getMessage()); } }); } }
Fetcher.fetchData()方法需傳遞一個FetcherCallback類型的參數,當獲得數據或發生錯誤時被回調。對於每種情況都提供了統一的方法:FetcherCallback.onData(),將接收數據時被調用;FetcherCallback.onError(),發生錯誤時被調用
1.2.2Futures
ExecutorService executor = Executors.newCachedThreadPool(); Runnable task1 = new Runnable() { @Override public void run() { doSomeHeavyWork(); } //... } Callable<Interger> task2 = new Callable() { @Override public Integer call() { return doSomeHeavyWorkWithResul(); } //... } Future<?> future1 = executor.submit(task1); Future<Integer> future2 = executor.submit(task2); while(!future1.isDone()||!future2.isDone()){ ... // do something else ... }
Future的未來應用
public interface Fetcher { Future<Data> fetchData(); } public class Worker { public void doWork() { Fetcher fetcher = ... Future<Data> future = fetcher.fetchData(); try { while (!fetcher.isDone()) { //... // do something else } System.out.println("Data received: " + future.get()); } catch (Throwable cause) { System.err.println("An error accour: " + cause.getMessage()); } } }
1.3Java中的Blocking和non-blocking IO對比
1.3.1基於阻塞IO的EchoServer
public class PlainEchoServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port);//綁定端口 try { while (true) { final Socket clientSocket = socket.accept(); //阻塞,直到接受新的客戶端連接為止。 System.out.println("Accepted connection from " + clientSocket); new Thread(new Runnable() { //創建處理客戶端連接的新線程 @Override public void run() { try { BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true); while (true) { //從客戶端讀取數據並將其寫回 writer.println(reader.readLine()); writer.flush(); } } catch (IOException e) { e.printStackTrace(); try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } } }).start(); //開始執行程序 } } catch (IOException e) { e.printStackTrace(); } } }
1.3.2非阻塞IO基礎
ByteBuffer
將數據寫入ByteBuffer
調用ByteBuffer.flip()從寫模式切換到讀取模式
從ByteBuffer讀取數據
ByteBuffer.clear()清除所有數據
Bytebuffer.compact()清除已讀取數據
Channel inChannel = ....; ByteBuffer buf=ByteBuffer.allocate(48); int bytesRead=-1; do{ bytesRead=inChannel.read(buf); //將數據從通道讀取到ByteBuffer if(bytesRead!=-1){ buf.flip();//使緩沖區為讀做准備 while(buf.hasRemaining()){ System.out.print((char)buf.get()); //讀取ByteBuffer中的字節;每個get()操作都會將位置更新1 } buf.clear(); //讓ByteBuffer准備好再寫一遍 } }while(bytesRead!=-1); inChannel.close();
使用NIO選擇器
1.創建一個或多個選擇器,其中可以注冊打開的通道(套接字)。
2.注冊信道時,指定您感興趣偵聽的事件。以下四個可用事件(或操作/操作)為:接收、連接、讀取、等待
3.在注冊通道時,您可以調用Selector.select()方法來阻塞,直到發生這些事件之一。
4.當該方法解除阻塞時,您可以獲得所有SelectionKey實例(這些實例保存對已注冊通道和所選操作的引用)並執行一些操作。 您到底做了什么取決於哪個操作已經准備好了。SelectedKey可以在任何給定時間包含多個操作。
1.3.3基於NIO的EchoServer
public class PlainNioEchoServer { public void serve(int port) throws IOException { System.out.println("Listening for connections on port " + port); ServerSocketChannel serverChannel = ServerSocketChannel.open(); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address);//將服務器綁定到端口 serverChannel.configureBlocking(false);//設置為非阻塞 Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT);//向選擇器注冊通道,以便對被接受的新客戶端連接感興趣 while (true) { try { selector.select();//阻塞,直到選定某物為止。 } catch (IOException ex) { ex.printStackTrace(); // handle in a proper way break; } Set readyKeys = selector.selectedKeys(); //獲取所有SelectedKey實例 Iterator iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); iterator.remove();//從迭代器中刪除SelectedKey try { if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept();//接受客戶端連接 System.out.println("Accepted connection from " + client); client.configureBlocking(false);//設置為非阻塞 client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));//注冊到選擇器的連接並設置ByteBuffer } if (key.isReadable()) {//檢查SelectedKey的閱讀 SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); client.read(output); //讀取數據到ByteBuffer } if (key.isWritable()) {//檢查SelectedKey的寫 SocketChannel client = (SocketChannel) key.channel(); ByteBuffer output = (ByteBuffer) key.attachment(); output.flip(); client.write(output); output.compact();//將數據從ByteBuffer寫入信道 } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { } } } } } }
1.3.4基於NIO.2的EchoServer
與最初的NIO實現不同,NIO.2允許您發出IO操作並提供所謂的完成處理程序
public class PlainNio2EchoServer { public void serve(int port) throws IOException { System.out.println("Listening for connections on port " + port); final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(port); serverChannel.bind(address); final CountDownLatch latch = new CountDownLatch(1); serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { //開始接受新的客戶端連接。一旦其中一個被接受,CompletionHandler就會被調用。 @Override public void completed(final AsynchronousSocketChannel channel, Object attachment) { serverChannel.accept(null, this); //再次接受新的客戶端連接 ByteBuffer buffer = ByteBuffer.allocate(100); channel.read(buffer, buffer, new EchoCompletionHandler(channel)); //觸發通道上的讀取操作,一旦讀取某個消息,將通知給定的PrimeTyHand處理程序。 } @Override public void failed(Throwable throwable, Object attachment) { try { serverChannel.close(); //關閉套接字錯誤 } catch (IOException e) { // ingnore on close } finally { latch.countDown(); } } }); try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private final AsynchronousSocketChannel channel; EchoCompletionHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { //觸發通道上的寫操作,給定的CompletionHandler一寫就會被通知 @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); //如果ByteBuffer中有東西,則再次觸發寫操作。 } else { buffer.compact(); channel.read(buffer, buffer, EchoCompletionHandler.this); //觸發通道上的讀取操作,一旦讀取某個消息,將通知給定的PrimeTyHand處理程序。 } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } } }
1.4NIO的問題和Netty中是如何解決這些問題的
1.4.1 跨平台和兼容性問題
1.4.2擴展ByteBuffer.或者不擴展
1.4.3散射和聚集可能會泄漏
1.4.4解決著名的epoll空輪詢bug
1.5小結
2.第一個Netty程序
2.1搭建開發環境
2.2Netty客戶機和服務器概述
2.3編寫Echo服務器
2.3.1引導服務器
public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); //創建引導服務器 b.group(group); b.channel(NioServerSocketChannel.class);//指定nio傳輸、本地套接字地址。 b.localAddress(new InetSocketAddress(port)); b.childHandler(new ChannelInitializer<SocketChannel>() { //將處理程序添加到通道管道 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); //綁定服務器,等待服務器關閉,並釋放資源。 } }); ChannelFuture f = b.bind().sync(); //綁定服務器,然后等待綁定完成,對sync()方法的調用將導致阻塞,直到服務器綁定。 System.out.println(EchoServer.class.getName() + "ì started and listen on ì" + f.channel().localAddress());//應用程序將等到服務器通道關閉。 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { if (args.length != 1) { System.err.println("ìUsage:" + EchoServer.class.getSimpleName() + " < port > "); } int port = Integer.parseInt(args[0]); new EchoServer(port).start(); } }
2.3.2實現服務器/業務邏輯
@ChannelHandler.Sharable //使用@Sharable注釋,以便在各通道之間共享 public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println("Server received: " + msg); ctx.write(msg);//把收到的消息寫回去。請注意,這將不會將消息刷新到遠程對等端。 } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); //將所有以前的書面消息(掛起)刷新到遠程對等端,並在操作完成后關閉通道。 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); //異常日志 ctx.close(); //異常關閉通道 } }
2.3.3捕獲異常
2.4編寫回送客戶端
2.4.1引導客戶端
public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); //為客戶端創建引導程序 b.group(group); //指定EventLoopGroup來處理客戶端事件。使用NioEventLoopGroup,因為應該使用NIO-傳輸 b.channel(NioSocketChannel.class);//指定通道類型;為NIO-傳輸使用正確的通道類型 b.remoteAddress(new InetSocketAddress(host, port));//設置客戶端連接的InetSocketAddress b.handler(new ChannelInitializer<SocketChannel>() { //使用ChannelInitiators指定ChannelHandler,一旦連接建立並創建通道,就調用它 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler());//將EchoClientHandler添加到屬於通道的Channel管道。管道擁有所有通道的通道處理器 } }); ChannelFuture f = b.connect().sync(); //將客戶端連接到遠程對等端;等待sync()完成連接 f.channel().closeFuture().sync(); //等到ClientChannel關閉。這會擋住。 } finally { group.shutdownGracefully().sync(); //關閉引導程序和線程池;釋放所有資源 } } public static void main(String[] args) throws Exception { if (args.length != 2) { System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>"); return; } // Parse options. final String host = args[0]; final int port = Integer.parseInt(args[1]); new EchoClient(host, port).start(); } }
2.4.2實現客戶端邏輯
@ChannelHandler.Sharable //使用@Sharable注釋,因為它可以在通道之間共享 public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override public void channelActive(ChannelHandlerContext ctx) { ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); //現在寫入通道連接的消息 } @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println("Client received: " + ByteBufUtil.hexDump(in.readBytes(in.readableBytes()))); //以己轉儲的形式記錄接收到的消息 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {//日志異常和關閉通道 cause.printStackTrace(); ctx.close(); } }
2.5編譯和運行回送客戶端和服務器
2.5.1編譯服務器和客戶端
2.5.2運行服務器和客戶端
2.6小結
3.Netty核心概念
3.1Netty速成班
3.2通道、事件和輸入/輸出(IO)
EventLoops與EventLoopGroups的關系。
3.3引導:什么和為什么
3.4通道處理程序和數據流
3.4.1把它拼湊在一起,管道和處理程序
管道安排的示例。
3.5編碼器、解碼器和域邏輯:對處理程序的深入觀察
3.5.1Encodes,Deodes
3.5.2域邏輯
4.Transports(傳輸)
4.1案例研究:運輸遷移
4.1.1使用無網絡的I/O和NIO
public class PlainOioServer { public void serve(int port) throws IOException { final ServerSocket socket = new ServerSocket(port); try { while (true) { final Socket clientSocket = socket.accept(); System.out.println("Accepted connection from " + clientSocket); //創建新線程來處理連接 new Thread(() -> { OutputStream out; try { out = clientSocket.getOutputStream(); out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //向連接的客戶端寫入消息 out.flush(); clientSocket.close(); //一旦消息被寫入並刷新,就關閉連接。 } catch (IOException e) { e.printStackTrace(); try { clientSocket.close(); } catch (IOException ex) { // ignore on close } } }).start(); //啟動線程開始處理 } } catch (IOException e) { e.printStackTrace(); } } }
4.1.2沒有Netty的異步網絡
public class PlainNioServer { public void serve(int port) throws IOException { System.out.println("Listening for connections on port " + port); ServerSocketChannel serverChannel; Selector selector; serverChannel = ServerSocketChannel.open(); ServerSocket ss = serverChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address); serverChannel.configureBlocking(false); selector = Selector.open();//打開處理通道的選擇器 serverChannel.register(selector, SelectionKey.OP_ACCEPT); //將erverSocket注冊到選擇器,並指定它對新接受的客戶端感興趣。 final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes()); while (true) { try { selector.select(); //等待已准備好進行處理的新事件。這將阻止直到發生什么事情 } catch (IOException ex) { ex.printStackTrace(); // handle in a proper way break; } Set<SelectedKey> readyKeys = selector.selectedKeys(); //獲取接收事件的所有SelectionKey實例 Iterator<SelectedKey> iterator = readyKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { if (key.isAcceptable()) { //檢查事件是否是因為新客戶端准備接受 ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); System.out.println("Accepted connection from " + client); client.configureBlocking(false); client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); //接受客戶端並將其注冊到選擇器 } if (key.isWritable()) { //檢查事件是否因為套接字已准備好寫入數據 SocketChannel client = (SocketChannel) key.channel(); ByteBuffer buffer = (ByteBuffer) key.attachment(); while (buffer.hasRemaining()) { if (client.write(buffer) == 0) { //將數據寫入連接的客戶端。如果網絡飽和,這可能不會寫入所有數據。如果是這樣的話,它將撿起未寫入的數據,並在網絡再次可寫時將其寫入。 break; } } client.close(); } } catch (IOException ex) { key.cancel(); try { key.channel().close(); } catch (IOException cex) { } } } } } }
4.1.3在Netty中使用I/O和NIO
public class NettyOioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleaseableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8"))); EventLoopGroup group = new OioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group); b.channel(OioServerSocketChannel.class);//使用OioEventLoopGroupIto允許阻塞模式(舊-IO) b.localAddress(new InetSocketAddress(port)); b.childHandler(new ChannelInitializer<SocketChannel>() { //指定將為每個接受的連接調用的信道初始化器 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //添加ChannelHandler來攔截事件並允許對它們作出反應 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.write(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//向客戶端寫入消息,並在消息寫入后添加ChannelFutureListener以關閉連接 } }); } }); ChannelFuture f = b.bind().sync(); //綁定服務器以接受連接 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); //釋放所有資源 } } }
4.1.4實現異步支持
public class NettyNioServer { public void server(int port) throws Exception { final ByteBuf buf = Unpooled.unreleaseableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8"))); EventLoopGroup group = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(group); b.channel(NioServerSocketChannel.class);//使用OioEventLoopGroupIto允許阻塞模式(舊-IO) b.localAddress(new InetSocketAddress(port)); b.childHandler(new ChannelInitializer<SocketChannel>() { //指定將為每個接受的連接調用的信道初始化器 @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //添加ChannelHandler來攔截事件並允許對它們作出反應 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.write(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//向客戶端寫入消息,並在消息寫入后添加ChannelFutureListener以關閉連接 } }); } }); ChannelFuture f = b.bind().sync(); //綁定服務器以接受連接 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); //釋放所有資源 } } }
4.2傳輸API
通道接口層次結構
最重要的信道方法
eventLoop()返回分配給通道的EVELATIORE
pipeline()返回分配給通道的通道管道。
isActive()如果通道處於活動狀態,則返回該通道,這意味着它已連接到遠程對等端。
localAddress()返回綁定到本地的SocketAddress
remoteAddress()返回綁定遠程的SocketAddress
write()將數據寫入遠程對等程序。這些數據是通過管道傳遞的。
寫信給頻道
Channel channel = ... ByteBuf buf = Unpooled.copiedBuffer(..your data, CharsetUtil.UTF_8);//創建保存要寫入的數據的ByteBuf ChannelFuture cf = channel.write(buf);//寫數據 cf.addListener(new ChannelFutureListener() { //添加ChannelFutureListener,以便在寫入完成后得到通知 @Override public void operationComplete (ChannelFuture future){ if (future.isSuccess()) { //寫入操作完成,沒有錯誤。 System.out.println("Write successful"); } else { System.err.println("Write error"); //寫入操作已完成,但由於錯誤 future.cause().printStacktrace(); } } });
使用來自多個線程的通道
final Channel channel = ... final ByteBuf buf = Unpooled.copiedBuffer(..your data", CharsetUtil.UTF_8); //創建保存要寫入的數據的ByteBuf Runnable writer = new Runnable() { //創建Runnable將數據寫入通道 @Override public void run() { channel.write(buf.duplicate()); } }; Executor executor = Executors.newChachedThreadPool();//獲取對執行程序的引用,該執行器使用線程執行任務。 // write in one thread executor.execute(writer); //將寫任務交給Executor,以便在線程中執行。 // write in another thread executor.execute(writer); //將另一個寫任務交給Executor,以便在線程中執行。
4.3包括運輸
4.3.1NiO非阻塞I/O
選擇操作位集
OP_ACCEPT一旦新連接被接受並創建了一個通道,就會得到通知。
OP_CONNECT一旦連接嘗試完成,就會收到通知。
OP_READ一旦數據准備好從通道中讀取,就會得到通知。
OP_WRITE一旦有可能將更多的數據寫入通道,就會得到通知。大多數情況下,這是可能的,但可能不是因為OS套接字緩沖區已完全填滿。 您編寫得更快,遠程對等程序就可以處理它。
選擇器邏輯
4.3.2OIO舊阻塞I/O
4.3.3VM傳輸中的局部
4.3.4嵌入式傳輸
4.4何時使用各種運輸方式
低並發連接計數->OIO
高並發連接計數->NIO
低延時->OIO
基本模塊代碼->OIO
在同一個JVM中進行通信->Local
測試ChannelHandler實現->Embedded
5.Buffers(緩沖)
5.1緩沖API
5.2字節數據容器
5.2.1工作原理
5.2.2不同類型的ByteBuf
Heap Buffer(堆緩沖區)
ByteBuf heapBuf = ...; if (heapBuf.hasArray()) { //檢查ByteBuf是否由數組支持 byte[] array = heapBuf.array(); //獲取對數組的引用 int offset = heapBuf.arrayOffset() + heapBuf.position(); //計算其中第一個字節的偏移量 int length = heapBuf.readableBytes(); //獲取可讀字節的數量 YourImpl.method(array, offset, length); //使用數組、偏移量、長度作為參數的調用方法 }
Direct Buffer(直接緩沖區)
ByteBuf directBuf = ...; if (!directBuf.hasArray()){ //檢查ByteBuf是否不受數組支持,對於直接緩沖區,數組為false int length = directBuf.readableBytes(); //獲取可讀字節數 byte[] array = new byte[length]; //分配具有可讀字節長度的新數組 directBuf.getBytes(array); //將字節讀入數組 YourImpl.method(array, 0, array.length);//以數組、偏移量、長度為參數的Call方法 }
Composite Buffer(復合緩沖區)
編寫遺留的JDK ByteBuffer
//Use an array to composite them ByteBuffer[] message = new ByteBuffer[] { header, body }; // Use copy to merge both ByteBuffer message2 = ByteBuffer.allocate(header.remaining()+ body.remaining(); message2.put(header); message2.put(body); message2.flip();
CompositeByteBuf
CompositeByteBuf compBuf = ...; ByteBuf heapBuf = ...; ByteBuf directBuf = ...; compBuf.addComponent(heapBuf, directBuf); //將ByteBuf實例追加到復合 ..... compBuf.removeComponent(0); //在索引0 bytebuf remove(heapbuf這里) for (ByteBuf buf: compBuf) { //循環遍歷所有組合的ByteBuf System.out.println(buf.toString()); }
[計]存取數據
CompositeBuf compBuf = ...; if (!compBuf.hasArray()) { //檢查ByteBuf是否不受數組支持,這對於復合緩沖區來說是false int length = compBuf.readableBytes(); //獲取可讀字節的數量 byte[] array = new byte[length];//分配具有可讀字節長度的新數組 compBuf.getBytes(array); //將字節讀入數組 YourImpl.method(array, 0, array.length);//以數組、偏移量、長度為參數的Call方法 }
5.3 ByteBuf的字節操作
5.3.1 隨機訪問索引
ByteBuf buffer = ...; for (int i = 0; i < buffer.capacity(); i ++) { byte b = buffer.getByte(i); System.out.println((char) b); }
5.3.2 順序訪問索引
5.3.3Discardable bytes廢棄字節
5.3.4 可讀字節(實際內容)
ByteBuf buffer = ...; while (buffer.readable()) { System.out.println(buffer.readByte()); }
5.3.5 可寫字節Writable bytes
ByteBuf buffer = ...; while (buffer.writableBytes() >= 4) { buffer.writeInt(random.nextInt()); }
5.3.6 清除緩沖區索引Clearing the buffer indexs
5.3.7 搜索操作Search operations
5.3.8 標准和重置Mark and reset
5.3.9 衍生的緩沖區Derived buffers
Charset utf8 = Charset.forName("UTF-8"); ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //創建ByteBuf,它包含給定字符串的字節 ByteBuf sliced = buf.slice(0, 14); //創建ByteBuf的新片段,從索引0開始,以索引14結束 System.out.println(sliced.toString(utf8); //包含在行動中的Netty buf.setByte(0, (byte) íJí); //更新索引0上的字節 assert buf.get(0) == sliced.get(0);//不會失敗,因為ByteBuf共享相同的內容,因此對其中一個的修改在另一個上也是可見的
5.3.10 讀/寫操作以及其他一些操作
5.4 ByteBufHolder
5.4.1 ByteBufAllocator
5.4.2 Unpooled
5.4.3 ByteBufUtil
6.ChannelHandler
6.1 ChannelPipeline
修改ChannelPipeline的方法
addFirst(...),添加ChannelHandler在ChannelPipeline的第一個位置
addBefore(...),在ChannelPipeline中指定的ChannelHandler名稱之前添加ChannelHandler
addAfter(...),在ChannelPipeline中指定的ChannelHandler名稱之后添加ChannelHandler
addLast(ChannelHandler...),在ChannelPipeline的末尾添加ChannelHandler
remove(...),刪除ChannelPipeline中指定的ChannelHandler
replace(...),替換ChannelPipeline中指定的ChannelHandler
6.2 ChannelHandlerContext
6.2.1 通知下一個ChannelHandler
ChannelHandlerContext、ChannelHandler、ChannelPipeline的關系
事件通過渠道
ChannelHandlerContext ctx = ..; Channel channel = ctx.channel(); //獲取屬於ChannelHandlerContext的通道的引用 channel.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));//通過通道寫入緩沖器
信道管道事件
ChannelHandlerContext ctx = ..; ChannelPipeline pipeline = ctx.pipeline(); pipeline.write(Unpooled.copiedBuffer(ìNetty in Actionì,CharsetUtil.UTF_8));
通過Channel或ChannelPipeline的通知:
6.2.2 修改ChannelPipeline
6.3 狀態模型
6.4 ChannelHandler和其子類
6.4.1 ChannelHandler中的方法
6.4.2 ChannelInboundHandler
channelRegistered,ChannelHandlerContext的Channel被注冊到EventLoop;
channelUnregistered,ChannelHandlerContext的Channel從EventLoop中注銷
channelActive,ChannelHandlerContext的Channel已激活
channelInactive,ChannelHanderContxt的Channel結束生命周期
channelRead,從當前Channel的對端讀取消息
channelReadComplete,消息讀取完成后執行
userEventTriggered,一個用戶事件被處罰
channelWritabilityChanged,改變通道的可寫狀態,可以使用Channel.isWritable()檢查
exceptionCaught,重寫父類ChannelHandler的方法,處理異常
6.4.3 ChannelOutboundHandler
bind,Channel綁定本地地址
connect,Channel連接操作
disconnect,Channel斷開連接
close,關閉Channel
deregister,注銷Channel
read,讀取消息,實際是截獲ChannelHandlerContext.read()
write,寫操作,實際是通過ChannelPipeline寫消息,Channel.flush()屬性到實際通道
flush,刷新消息到通道
7.編解碼器Codec
7.1 編解碼器Codec
7.2 解碼器
7.2.1 ByteToMessageDecoder
7.2.2 ReplayingDecoder
讀取緩沖區的數據之前需要檢查緩沖區是否有足夠的字節,使用ReplayingDecoder就無需自己檢查;
7.2.3 MessageToMessageDecoder
7.3 編碼器
.
7.3.1 MessageToByteEncoder
7.3.2 MessageToMessageEncoder
7.4 編解碼器
7.4.1 byte-to-byte編解碼器
7.4.2 ByteToMessageCodec
7.4.3 MessageToMessageCodec
7.5 其他編解碼方式
7.5.1 CombinedChannelDuplexHandler
8.附帶的ChannelHandler和Codec
8.1 使用SSL/TLS創建安全的Netty程序
public class SslChannelInitializer extends ChannelInitializer<Channel> { private final SSLContext context; private final boolean client; private final boolean startTls; public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) { this.context = context; this.client = client; this.startTls = startTls; } @Override protected void initChannel(Channel ch) throws Exception { SSLEngine engine = context.createSSLEngine(); engine.setUseClientMode(client); ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls)); } }
setHandshakeTimeout(long handshakeTimeout, TimeUnit unit),設置握手超時時間,ChannelFuture將得到通知
setHandshakeTimeoutMillis(long handshakeTimeoutMillis),設置握手超時時間,ChannelFuture將得到通知
getHandshakeTimeoutMillis(),獲取握手超時時間值
setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit),設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗
setHandshakeTimeoutMillis(long handshakeTimeoutMillis),設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗
getCloseNotifyTimeoutMillis(),獲取關閉通知超時時間
handshakeFuture(),返回完成握手后的ChannelFuture
close(),發送關閉通知請求關閉和銷毀
8.2 使用Netty創建HTTP/HTTPS程序
8.2.1 Netty的HTTP編碼器,解碼器和編解碼器
HttpRequestEncoder,將HttpRequest或HttpContent編碼成ByteBuf
HttpRequestDecoder,將ByteBuf解碼成HttpRequest和HttpContent
HttpResponseEncoder,將HttpResponse或HttpContent編碼成ByteBuf
HttpResponseDecoder,將ByteBuf解碼成HttpResponse和HttpContent
8.2.2 HTTP消息聚合
public class HttpAggregatorInitializer extends ChannelInitializer<Channel> { private final boolean client; public HttpAggregatorInitializer(boolean client) { this.client = client; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (client) { pipeline.addLast("codec", new HttpClientCodec()); } else { pipeline.addLast("codec", new HttpServerCodec()); } pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024)); } }
8.2.3 HTTP壓縮
8.2.4 使用HTTPS
8.2.5 WebSocket
BinaryWebSocketFrame,包含二進制數據
TextWebSocketFrame,包含文本數據
ContinuationWebSocketFrame,包含二進制數據或文本數據,BinaryWebSocketFrame和TextWebSocketFrame的結合體
CloseWebSocketFrame,WebSocketFrame代表一個關閉請求,包含關閉狀態碼和短語
PingWebSocketFrame,WebSocketFrame要求PongWebSocketFrame發送數據
PongWebSocketFrame,WebSocketFrame要求PingWebSocketFrame響應
public class WebSocketServerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new HttpServerCodec(), new HttpObjectAggregator(65536), new WebSocketServerProtocolHandler("/websocket"), new TextFrameHandler(), new BinaryFrameHandler(), new ContinuationFrameHandler()); } public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // handler text frame } } public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception { //handler binary frame } } public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception { //handler continuation frame } } }
8.2.6 SPDY
SPDY(讀作“SPeeDY”)是Google開發的基於TCP的應用層協議,用以最小化網絡延遲,提升網絡速度,優化用戶的網絡使用體驗。
8.3 處理空閑連接和超時
IdleStateHandler,當一個通道沒有進行讀寫或運行了一段時間后出發IdleStateEvent
ReadTimeoutHandler,在指定時間內沒有接收到任何數據將拋出ReadTimeoutException
WriteTimeoutHandler,在指定時間內有寫入數據將拋出WriteTimeoutException
public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); pipeline.addLast(new HeartbeatHandler()); } public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter { private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer( "HEARTBEAT", CharsetUtil.UTF_8)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { super.userEventTriggered(ctx, evt); } } } }
8.4 解碼分隔符和基於長度的協議
使用LineBasedFrameDecoder提取"\r\n"分隔幀:
/** * 處理換行分隔符消息 * * @author c.k */ public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1204), new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { // do something with the frame } } }
8.4.2 長度為基礎的協議
8.5 寫大數據
8.6 序列化數據
8.6.1 普通的JDK序列化
8.6.2 通過JBoss編組序列化
public class MarshallingInitializer extends ChannelInitializer<Channel> { private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) { this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider)) .addLast(new MarshallingEncoder(marshallerProvider)) .addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> { @Override protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception { // do something } } }
8.6.3 使用ProtoBuf序列化
/** * 使用protobuf序列化數據,進行編碼解碼 * 注意:使用protobuf需要protobuf-java-jar * * @author Administrator */ public class ProtoBufInitializer extends ChannelInitializer<Channel> { private final MessageLite lite; public ProtoBufInitializer(MessageLite lite) { this.lite = lite; } @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()) .addLast(new ProtobufEncoder()) .addLast(new ProtobufDecoder(lite)) .addLast(new ObjectHandler()); } public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> { @Override protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception { // do something } } }
9.引導Netty應用程序
9.1 不同的引導類型
9.2 引導客戶端和無連接協議
9.2.1 引導客戶端的方法
group(...),設置EventLoopGroup,EventLoopGroup用來處理所有通道的IO事件
channel(...),設置通道類型
channelFactory(...),使用ChannelFactory來設置通道類型
localAddress(...),設置本地地址,也可以通過bind(...)或connect(...)
option(ChannelOption<T>, T),設置通道選項,若使用null,則刪除上一個設置的ChannelOption
attr(AttributeKey<T>, T),設置屬性到Channel,若值為null,則指定鍵的屬性被刪除
handler(ChannelHandler),設置ChannelHandler用於處理請求事件
clone(),深度復制Bootstrap,Bootstrap的配置相同
remoteAddress(...),設置連接地址
connect(...),連接遠程通道
bind(...),創建一個新的Channel並綁定
9.2.2 怎么引導客戶端
public class BootstrapingClient { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Received data"); msg.clear(); } }); ChannelFuture f = b.connect("1", 2048); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("connection finished"); } else { System.out.println("connection failed"); future.cause().printStackTrace(); } } }); } }
9.2.3 選擇兼容通道實現
9.3 使用ServerBootstrap引導服務器
9.3.1 引導服務器的方法
group(...),設置EventLoopGroup事件循環組
channel(...),設置通道類型
channelFactory(...),使用ChannelFactory來設置通道類型
localAddress(...),設置本地地址,也可以通過bind(...)或connect(...)
option(ChannelOption<T>, T),設置通道選項,若使用null,則刪除上一個設置的ChannelOption
childOption(ChannelOption<T>, T),設置子通道選項
attr(AttributeKey<T>, T),設置屬性到Channel,若值為null,則指定鍵的屬性被刪除
childAttr(AttributeKey<T>, T),設置子通道屬性
handler(ChannelHandler),設置ChannelHandler用於處理請求事件
childHandler(ChannelHandler),設置子ChannelHandler
clone(),深度復制ServerBootstrap,且配置相同
bind(...),創建一個新的Channel並綁定
9.3.2 怎么引導服務器
public class BootstrapingServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Received data"); msg.clear(); } }); ChannelFuture f = b.bind(2048); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("bound fail"); future.cause().printStackTrace(); } } }); } }
9.4 從Channel引導客戶端
public class BootstrapingFromChannel { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new SimpleChannelInboundHandler<ByteBuf>() { ChannelFuture connectFuture; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Bootstrap b = new Bootstrap(); b.channel(NioSocketChannel.class).handler( new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Received data"); msg.clear(); } }); b.group(ctx.channel().eventLoop()); connectFuture = b.connect(new InetSocketAddress("1", 2048)); } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { if (connectFuture.isDone()) { // do something with the data } } }); ChannelFuture f = b.bind(2048); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("Server bound"); } else { System.err.println("bound fail"); future.cause().printStackTrace(); } } }); } }
9.5 添加多個ChannelHandler
public class InitChannelExample { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializerImpl()); ChannelFuture f = b.bind(2048).sync(); f.channel().closeFuture().sync(); } static final class ChannelInitializerImpl extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new HttpClientCodec()) .addLast(new HttpObjectAggregator(Integer.MAX_VALUE)); } } }
9.6 使用通道選項和屬性
public static void main(String[] args) { //創建屬性鍵對象 final AttributeKey<Integer> id = AttributeKey.valueOf("ID"); //客戶端引導對象 Bootstrap b = new Bootstrap(); //設置EventLoop,設置通道類型 b.group(new NioEventLoopGroup()).channel(NioSocketChannel.class) //設置ChannelHandler .handler(new SimpleChannelInboundHandler<ByteBuf>() { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Reveived data"); msg.clear(); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { //通道注冊后執行,獲取屬性值 Integer idValue = ctx.channel().attr(id).get(); System.out.println(idValue); //do something with the idValue } }); //設置通道選項,在通道注冊后或被創建后設置 b.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); //設置通道屬性 b.attr(id, 123456); ChannelFuture f = b.connect("www.manning.com", 80); f.syncUninterruptibly(); }
10.單元測試代碼
10.1 General
writeInbound(Object...),寫一個消息到入站通道
writeOutbound(Object...),寫消息到出站通道
readInbound(),從EmbeddedChannel讀取入站消息,可能返回null
readOutbound(),從EmbeddedChannel讀取出站消息,可能返回null
finish(),標示EmbeddedChannel已結束,任何寫數據都會失敗
10.2 測試ChannelHandler
10.2.1 測試處理入站消息的handler
public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException( "frameLength must be a positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { while (in.readableBytes() >= frameLength) { ByteBuf buf = in.readBytes(frameLength); out.add(buf); } } }
測試
public class FixedLengthFrameDecoderTest { @Test public void testFramesDecoded() { ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); EmbeddedChannel channel = new EmbeddedChannel( new FixedLengthFrameDecoder(3)); // write bytes Assert.assertTrue(channel.writeInbound(input)); Assert.assertTrue(channel.finish()); // read message Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertNull(channel.readInbound()); } @Test public void testFramesDecoded2() { ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } ByteBuf input = buf.duplicate(); EmbeddedChannel channel = new EmbeddedChannel( new FixedLengthFrameDecoder(3)); Assert.assertFalse(channel.writeInbound(input.readBytes(2))); Assert.assertTrue(channel.writeInbound(input.readBytes(7))); Assert.assertTrue(channel.finish()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertEquals(buf.readBytes(3), channel.readInbound()); Assert.assertNull(channel.readInbound()); } }
10.2.2 測試處理出站消息的handler
解碼器
public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> { @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception { while (msg.readableBytes() >= 4) { int value = Math.abs(msg.readInt()); out.add(value); } } }
測試
public class AbsIntegerEncoderTest { @Test public void testEncoded() { //創建一個能容納10個int的ByteBuf ByteBuf buf = Unpooled.buffer(); for (int i = 1; i < 10; i++) { buf.writeInt(i * -1); } //創建EmbeddedChannel對象 EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder()); //將buf數據寫入出站EmbeddedChannel Assert.assertTrue(channel.writeOutbound(buf)); //標示EmbeddedChannel完成 Assert.assertTrue(channel.finish()); //讀取出站數據 ByteBuf output = (ByteBuf) channel.readOutbound(); for (int i = 1; i < 10; i++) { Assert.assertEquals(i, output.readInt()); } Assert.assertFalse(output.isReadable()); Assert.assertNull(channel.readOutbound()); } }
10.3 測試異常處理
解碼器
public class FrameChunkDecoder extends ByteToMessageDecoder { // 限制大小 private final int maxFrameSize; public FrameChunkDecoder(int maxFrameSize) { this.maxFrameSize = maxFrameSize; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { // 獲取可讀字節數 int readableBytes = in.readableBytes(); // 若可讀字節數大於限制值,清空字節並拋出異常 if (readableBytes > maxFrameSize) { in.clear(); throw new TooLongFrameException(); } // 讀取ByteBuf並放到List中 ByteBuf buf = in.readBytes(readableBytes); out.add(buf); } }
測試代碼
public class FrameChunkDecoderTest { @Test public void testFramesDecoded() { //創建ByteBuf並填充9字節數據 ByteBuf buf = Unpooled.buffer(); for (int i = 0; i < 9; i++) { buf.writeByte(i); } //復制一個ByteBuf ByteBuf input = buf.duplicate(); //創建EmbeddedChannel EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3)); //讀取2個字節寫入入站通道 Assert.assertTrue(channel.writeInbound(input.readBytes(2))); try { //讀取4個字節寫入入站通道 channel.writeInbound(input.readBytes(4)); Assert.fail(); } catch (TooLongFrameException e) { } //讀取3個字節寫入入站通道 Assert.assertTrue(channel.writeInbound(input.readBytes(3))); //標識完成 Assert.assertTrue(channel.finish()); //從EmbeddedChannel入去入站數據 Assert.assertEquals(buf.readBytes(2), channel.readInbound()); Assert.assertEquals(buf.skipBytes(4).readBytes(3), channel.readInbound()); } }
11.WebSocket
12.SPDY
13.通過UDP廣播事件
14..實現自定義編解碼器
14.1編解碼器的范圍
14.2實現memcached編解碼器
14.3了解memcached二進制協議
14.4 Netty編碼器和解碼器
15.選擇正確的線程模型
15.1線程模型概述
15.2事件循環
15.2.1使用事件循環
15.2.2 Netty 4的I/O業務
15.2.3 Netty 3的I/O業務
15.2.4 Nettys線程模型內部件
15.3為以后的執行安排任務
15.3.1使用普通Java API調度任務
15.3.2使用事件循環調度任務
15.3.3計划實施內部
15.4 I/O線程分配的詳細情況
16.用Eventloop注銷/重新注冊