Netty實戰


1.Netty介紹

  1.1為什么需要Netty

    1.1.1不是所有的網絡框架都是一樣的

    1.1.2Netty的功能非常豐富

      框架組成

  1.2異步設計

    1.2.1Callbacks(回調)

      簡單的回調

public interface Fetcher {
    void fetchData(FetchCallback callback);
}

public interface FetchCallback {
    void onData(Data data);

    void onError(Throwable cause);
}

public class Worker {
    public void doWork() {
        Fetcher fetcher = ...
        fetcher.fetchData(new FetchCallback() {
            @Override
            public void onData(Data data) { //獲取到數據
                System.out.println("Data received: " + data);
            }

            @Override
            public void onError(Throwable cause) { //未獲取到數據
                System.err.println("An error accour: " + cause.getMessage());
            }
        });
    }
}

      Fetcher.fetchData()方法需傳遞一個FetcherCallback類型的參數,當獲得數據或發生錯誤時被回調。對於每種情況都提供了統一的方法:FetcherCallback.onData(),將接收數據時被調用;FetcherCallback.onError(),發生錯誤時被調用

    1.2.2Futures

    ExecutorService executor = Executors.newCachedThreadPool();
    Runnable task1 = new Runnable() {
        @Override
        public void run() {
            doSomeHeavyWork();
        }
        //...
    }
    Callable<Interger> task2 = new Callable() {
        @Override
        public Integer call() {
            return doSomeHeavyWorkWithResul();
        }
        //...
    }
    Future<?> future1 = executor.submit(task1);
    Future<Integer> future2 = executor.submit(task2);
    while(!future1.isDone()||!future2.isDone()){
        ...
        // do something else
        ...
    }

       Future的未來應用

public interface Fetcher {
    Future<Data> fetchData();
}

public class Worker {
    public void doWork() {
        Fetcher fetcher = ...
        Future<Data> future = fetcher.fetchData();
        try {
            while (!fetcher.isDone()) {
                //...
                // do something else 
            }
            System.out.println("Data received: " + future.get());
        } catch (Throwable cause) {
            System.err.println("An error accour: " + cause.getMessage());
        }
    }
}

  1.3Java中的Blocking和non-blocking IO對比

     1.3.1基於阻塞IO的EchoServer

public class PlainEchoServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);//綁定端口
        try {
            while (true) {
                final Socket clientSocket = socket.accept(); //阻塞,直到接受新的客戶端連接為止。
                System.out.println("Accepted connection from " + clientSocket);
                new Thread(new Runnable() { //創建處理客戶端連接的新線程
                    @Override
                    public void run() {
                        try {
                            BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
                            PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
                            while (true) { //從客戶端讀取數據並將其寫回
                                writer.println(reader.readLine());
                                writer.flush();
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start(); //開始執行程序
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

    1.3.2非阻塞IO基礎

      ByteBuffer

         將數據寫入ByteBuffer

        調用ByteBuffer.flip()從寫模式切換到讀取模式

        從ByteBuffer讀取數據

        ByteBuffer.clear()清除所有數據

        Bytebuffer.compact()清除已讀取數據

    Channel inChannel = ....;
    ByteBuffer buf=ByteBuffer.allocate(48);
    int bytesRead=-1;
    do{
        bytesRead=inChannel.read(buf); //將數據從通道讀取到ByteBuffer
        if(bytesRead!=-1){
            buf.flip();//使緩沖區為讀做准備
        while(buf.hasRemaining()){
            System.out.print((char)buf.get()); //讀取ByteBuffer中的字節;每個get()操作都會將位置更新1
        }
        buf.clear(); //讓ByteBuffer准備好再寫一遍
        }
    }while(bytesRead!=-1);
    inChannel.close();

      使用NIO選擇器

        1.創建一個或多個選擇器,其中可以注冊打開的通道(套接字)。

        2.注冊信道時,指定您感興趣偵聽的事件。以下四個可用事件(或操作/操作)為:接收、連接、讀取、等待

        3.在注冊通道時,您可以調用Selector.select()方法來阻塞,直到發生這些事件之一。

        4.當該方法解除阻塞時,您可以獲得所有SelectionKey實例(這些實例保存對已注冊通道和所選操作的引用)並執行一些操作。 您到底做了什么取決於哪個操作已經准備好了。SelectedKey可以在任何給定時間包含多個操作。

    1.3.3基於NIO的EchoServer

public class PlainNioEchoServer {
    public void serve(int port) throws IOException {
        System.out.println("Listening for connections on port " + port);
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);//將服務器綁定到端口
        serverChannel.configureBlocking(false);//設置為非阻塞
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);//向選擇器注冊通道,以便對被接受的新客戶端連接感興趣
        while (true) {
            try {
                selector.select();//阻塞,直到選定某物為止。
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle in a proper way
                break;
            }
            Set readyKeys = selector.selectedKeys(); //獲取所有SelectedKey實例
            Iterator iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();
                iterator.remove();//從迭代器中刪除SelectedKey
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();//接受客戶端連接
                        System.out.println("Accepted connection from " + client);
                        client.configureBlocking(false);//設置為非阻塞
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, ByteBuffer.allocate(100));//注冊到選擇器的連接並設置ByteBuffer
                    }
                    if (key.isReadable()) {//檢查SelectedKey的閱讀
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        client.read(output); //讀取數據到ByteBuffer
                    }
                    if (key.isWritable()) {//檢查SelectedKey的寫
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer output = (ByteBuffer) key.attachment();
                        output.flip();
                        client.write(output);
                        output.compact();//將數據從ByteBuffer寫入信道
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                    }
                }
            }
        }
    }
}

    1.3.4基於NIO.2的EchoServer

      與最初的NIO實現不同,NIO.2允許您發出IO操作並提供所謂的完成處理程序

public class PlainNio2EchoServer {
    public void serve(int port) throws IOException {
        System.out.println("Listening for connections on port " + port);
        final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(port);
        serverChannel.bind(address);
        final CountDownLatch latch = new CountDownLatch(1);
        serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { //開始接受新的客戶端連接。一旦其中一個被接受,CompletionHandler就會被調用。
            @Override
            public void completed(final AsynchronousSocketChannel channel, Object attachment) {
                serverChannel.accept(null, this); //再次接受新的客戶端連接
                ByteBuffer buffer = ByteBuffer.allocate(100);
                channel.read(buffer, buffer, new EchoCompletionHandler(channel)); //觸發通道上的讀取操作,一旦讀取某個消息,將通知給定的PrimeTyHand處理程序。
            }

            @Override
            public void failed(Throwable throwable, Object attachment) {
                try {
                    serverChannel.close(); //關閉套接字錯誤
                } catch (IOException e) {
                    // ingnore on close
                } finally {
                    latch.countDown();
                }
            }
        });
        try {
            latch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
        private final AsynchronousSocketChannel channel;

        EchoCompletionHandler(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            buffer.flip();
            channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { //觸發通道上的寫操作,給定的CompletionHandler一寫就會被通知

                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                    if (buffer.hasRemaining()) {
                        channel.write(buffer, buffer, this); //如果ByteBuffer中有東西,則再次觸發寫操作。
                    } else {
                        buffer.compact();
                        channel.read(buffer, buffer, EchoCompletionHandler.this); //觸發通道上的讀取操作,一旦讀取某個消息,將通知給定的PrimeTyHand處理程序。
                    }
                }

                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        // ingnore on close
                    }
                }
            });
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                channel.close();
            } catch (IOException e) {
                // ingnore on close
            }
        }
    }
}

  1.4NIO的問題和Netty中是如何解決這些問題的

    1.4.1 跨平台和兼容性問題

    1.4.2擴展ByteBuffer.或者不擴展

    1.4.3散射和聚集可能會泄漏

    1.4.4解決著名的epoll空輪詢bug

  1.5小結

2.第一個Netty程序

  2.1搭建開發環境

  2.2Netty客戶機和服務器概述

  2.3編寫Echo服務器

    2.3.1引導服務器

public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); //創建引導服務器
            b.group(group);
            b.channel(NioServerSocketChannel.class);//指定nio傳輸、本地套接字地址。
            b.localAddress(new InetSocketAddress(port));
            b.childHandler(new ChannelInitializer<SocketChannel>() { //將處理程序添加到通道管道
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new EchoServerHandler()); //綁定服務器,等待服務器關閉,並釋放資源。
                }
            });
            ChannelFuture f = b.bind().sync(); //綁定服務器,然后等待綁定完成,對sync()方法的調用將導致阻塞,直到服務器綁定。
            System.out.println(EchoServer.class.getName() + "ì started and listen on ì" + f.channel().localAddress());//應用程序將等到服務器通道關閉。
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 1) {
            System.err.println("ìUsage:" + EchoServer.class.getSimpleName() + " < port > ");
        }
        int port = Integer.parseInt(args[0]);
        new EchoServer(port).start();
    }
}

    2.3.2實現服務器/業務邏輯

@ChannelHandler.Sharable //使用@Sharable注釋,以便在各通道之間共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("Server received: " + msg);
        ctx.write(msg);//把收到的消息寫回去。請注意,這將不會將消息刷新到遠程對等端。
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); //將所有以前的書面消息(掛起)刷新到遠程對等端,並在操作完成后關閉通道。
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace(); //異常日志
        ctx.close(); //異常關閉通道
    }
}

    2.3.3捕獲異常

  2.4編寫回送客戶端

    2.4.1引導客戶端

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap(); //為客戶端創建引導程序
            b.group(group); //指定EventLoopGroup來處理客戶端事件。使用NioEventLoopGroup,因為應該使用NIO-傳輸
            b.channel(NioSocketChannel.class);//指定通道類型;為NIO-傳輸使用正確的通道類型
            b.remoteAddress(new InetSocketAddress(host, port));//設置客戶端連接的InetSocketAddress
            b.handler(new ChannelInitializer<SocketChannel>() { //使用ChannelInitiators指定ChannelHandler,一旦連接建立並創建通道,就調用它
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new EchoClientHandler());//將EchoClientHandler添加到屬於通道的Channel管道。管道擁有所有通道的通道處理器
                }
            });
            ChannelFuture f = b.connect().sync(); //將客戶端連接到遠程對等端;等待sync()完成連接
            f.channel().closeFuture().sync(); //等到ClientChannel關閉。這會擋住。
        } finally {
            group.shutdownGracefully().sync(); //關閉引導程序和線程池;釋放所有資源
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>");
            return;
        }
        
        // Parse options.
        final String host = args[0];
        final int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }
}

    2.4.2實現客戶端邏輯

@ChannelHandler.Sharable //使用@Sharable注釋,因為它可以在通道之間共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.write(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); //現在寫入通道連接的消息
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx,
                             ByteBuf in) {
        System.out.println("Client received: " + ByteBufUtil.hexDump(in.readBytes(in.readableBytes()))); //以己轉儲的形式記錄接收到的消息
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {//日志異常和關閉通道
        cause.printStackTrace();
        ctx.close();
    }
}

  2.5編譯和運行回送客戶端和服務器

    2.5.1編譯服務器和客戶端

    2.5.2運行服務器和客戶端

  2.6小結

3.Netty核心概念

  3.1Netty速成班

  3.2通道、事件和輸入/輸出(IO)

    EventLoops與EventLoopGroups的關系。

  3.3引導:什么和為什么

  3.4通道處理程序和數據流

    3.4.1把它拼湊在一起,管道和處理程序

      管道安排的示例。

  3.5編碼器、解碼器和域邏輯:對處理程序的深入觀察

    3.5.1Encodes,Deodes

    3.5.2域邏輯

4.Transports(傳輸)

  4.1案例研究:運輸遷移

    4.1.1使用無網絡的I/O和NIO

public class PlainOioServer {
    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        try {
            while (true) {
                final Socket clientSocket = socket.accept(); 
                System.out.println("Accepted connection from " +
                        clientSocket);
                //創建新線程來處理連接
                new Thread(() -> {
                    OutputStream out;
                    try {
                        out = clientSocket.getOutputStream();
                        out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //向連接的客戶端寫入消息
                        out.flush();
                        clientSocket.close(); //一旦消息被寫入並刷新,就關閉連接。
                    } catch (IOException e) {
                        e.printStackTrace();
                        try {
                            clientSocket.close();
                        } catch (IOException ex) {
                            // ignore on close
                        }
                    }
                }).start(); //啟動線程開始處理
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

    4.1.2沒有Netty的異步網絡

public class PlainNioServer {
    public void serve(int port) throws IOException {
        System.out.println("Listening for connections on port " + port);
        ServerSocketChannel serverChannel;
        Selector selector;
        serverChannel = ServerSocketChannel.open();
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        serverChannel.configureBlocking(false);
        selector = Selector.open();//打開處理通道的選擇器
        serverChannel.register(selector, SelectionKey.OP_ACCEPT); //將erverSocket注冊到選擇器,並指定它對新接受的客戶端感興趣。
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        while (true) {
            try {
                selector.select(); //等待已准備好進行處理的新事件。這將阻止直到發生什么事情
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle in a proper way
                break;
            }
            Set<SelectedKey> readyKeys = selector.selectedKeys(); //獲取接收事件的所有SelectionKey實例
            Iterator<SelectedKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) { //檢查事件是否是因為新客戶端准備接受
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        System.out.println("Accepted connection from " + client);
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate()); //接受客戶端並將其注冊到選擇器
                    }
                    if (key.isWritable()) { //檢查事件是否因為套接字已准備好寫入數據
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) { //將數據寫入連接的客戶端。如果網絡飽和,這可能不會寫入所有數據。如果是這樣的話,它將撿起未寫入的數據,並在網絡再次可寫時將其寫入。
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                    }
                }
            }
        }
    }
}

    4.1.3在Netty中使用I/O和NIO

public class NettyOioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleaseableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group);
            b.channel(OioServerSocketChannel.class);//使用OioEventLoopGroupIto允許阻塞模式(舊-IO)
            b.localAddress(new InetSocketAddress(port));
            b.childHandler(new ChannelInitializer<SocketChannel>() { //指定將為每個接受的連接調用的信道初始化器
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //添加ChannelHandler來攔截事件並允許對它們作出反應
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ctx.write(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//向客戶端寫入消息,並在消息寫入后添加ChannelFutureListener以關閉連接
                        }
                    });
                }
            });
            ChannelFuture f = b.bind().sync(); //綁定服務器以接受連接
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync(); //釋放所有資源
        }
    }
}

    4.1.4實現異步支持

public class NettyNioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleaseableBuffer(Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group);
            b.channel(NioServerSocketChannel.class);//使用OioEventLoopGroupIto允許阻塞模式(舊-IO)
            b.localAddress(new InetSocketAddress(port));
            b.childHandler(new ChannelInitializer<SocketChannel>() { //指定將為每個接受的連接調用的信道初始化器
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //添加ChannelHandler來攔截事件並允許對它們作出反應
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) throws Exception {
                            ctx.write(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//向客戶端寫入消息,並在消息寫入后添加ChannelFutureListener以關閉連接
                        }
                    });
                }
            });
            ChannelFuture f = b.bind().sync(); //綁定服務器以接受連接
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync(); //釋放所有資源
        }
    }
}

  4.2傳輸API

    通道接口層次結構

  

    最重要的信道方法

      eventLoop()返回分配給通道的EVELATIORE

      pipeline()返回分配給通道的通道管道。

      isActive()如果通道處於活動狀態,則返回該通道,這意味着它已連接到遠程對等端。

      localAddress()返回綁定到本地的SocketAddress

      remoteAddress()返回綁定遠程的SocketAddress

      write()將數據寫入遠程對等程序。這些數據是通過管道傳遞的。

    寫信給頻道

    Channel channel = ...
    ByteBuf buf = Unpooled.copiedBuffer(..your data, CharsetUtil.UTF_8);//創建保存要寫入的數據的ByteBuf
    ChannelFuture cf = channel.write(buf);//寫數據
    cf.addListener(new ChannelFutureListener() { //添加ChannelFutureListener,以便在寫入完成后得到通知
        @Override
        public void operationComplete (ChannelFuture future){
            if (future.isSuccess()) { //寫入操作完成,沒有錯誤。
                System.out.println("Write successful");
            } else {
                System.err.println("Write error"); //寫入操作已完成,但由於錯誤
                future.cause().printStacktrace();
            }
        }
    });

    使用來自多個線程的通道

    final Channel channel = ...
    final ByteBuf buf = Unpooled.copiedBuffer(..your data", CharsetUtil.UTF_8); //創建保存要寫入的數據的ByteBuf
    Runnable writer = new Runnable() { //創建Runnable將數據寫入通道
        @Override
        public void run() {
            channel.write(buf.duplicate());
        }
    };
    Executor executor = Executors.newChachedThreadPool();//獲取對執行程序的引用,該執行器使用線程執行任務。
    // write in one thread
    executor.execute(writer); //將寫任務交給Executor,以便在線程中執行。
    // write in another thread
    executor.execute(writer); //將另一個寫任務交給Executor,以便在線程中執行。

  4.3包括運輸

    4.3.1NiO非阻塞I/O

      選擇操作位集

        OP_ACCEPT一旦新連接被接受並創建了一個通道,就會得到通知。

        OP_CONNECT一旦連接嘗試完成,就會收到通知。

        OP_READ一旦數據准備好從通道中讀取,就會得到通知。

        OP_WRITE一旦有可能將更多的數據寫入通道,就會得到通知。大多數情況下,這是可能的,但可能不是因為OS套接字緩沖區已完全填滿。 您編寫得更快,遠程對等程序就可以處理它。

      選擇器邏輯

 

    4.3.2OIO舊阻塞I/O

    4.3.3VM傳輸中的局部

    4.3.4嵌入式傳輸

  4.4何時使用各種運輸方式

    低並發連接計數->OIO

    高並發連接計數->NIO

    低延時->OIO

    基本模塊代碼->OIO

    在同一個JVM中進行通信->Local

    測試ChannelHandler實現->Embedded

5.Buffers(緩沖)

  5.1緩沖API

  5.2字節數據容器

    5.2.1工作原理

    5.2.2不同類型的ByteBuf

      Heap Buffer(堆緩沖區)

    ByteBuf heapBuf = ...;
    if (heapBuf.hasArray()) { //檢查ByteBuf是否由數組支持
        byte[] array = heapBuf.array(); //獲取對數組的引用
        int offset = heapBuf.arrayOffset() + heapBuf.position(); //計算其中第一個字節的偏移量
        int length = heapBuf.readableBytes(); //獲取可讀字節的數量
        YourImpl.method(array, offset, length); //使用數組、偏移量、長度作為參數的調用方法
    }

      Direct Buffer(直接緩沖區)

    ByteBuf directBuf = ...;
    if (!directBuf.hasArray()){ //檢查ByteBuf是否不受數組支持,對於直接緩沖區,數組為false
        int length = directBuf.readableBytes(); //獲取可讀字節數
        byte[] array = new byte[length]; //分配具有可讀字節長度的新數組
        directBuf.getBytes(array); //將字節讀入數組
        YourImpl.method(array, 0, array.length);//以數組、偏移量、長度為參數的Call方法
    }

      Composite Buffer(復合緩沖區)

        編寫遺留的JDK ByteBuffer

    //Use an array to composite them
    ByteBuffer[] message = new ByteBuffer[] { header, body }; 
    // Use copy to merge both 
    ByteBuffer message2 = ByteBuffer.allocate(header.remaining()+ body.remaining(); 
    message2.put(header); 
    message2.put(body); 
    message2.flip();

        CompositeByteBuf

    CompositeByteBuf compBuf = ...;
    ByteBuf heapBuf = ...;
    ByteBuf directBuf = ...;
    compBuf.addComponent(heapBuf, directBuf); //將ByteBuf實例追加到復合
            .....
            compBuf.removeComponent(0); //在索引0 bytebuf remove(heapbuf這里)
            for (ByteBuf buf: compBuf) { //循環遍歷所有組合的ByteBuf
                System.out.println(buf.toString());
            }

      [計]存取數據

    CompositeBuf compBuf = ...;
    if (!compBuf.hasArray()) { //檢查ByteBuf是否不受數組支持,這對於復合緩沖區來說是false
        int length = compBuf.readableBytes(); //獲取可讀字節的數量
        byte[] array = new byte[length];//分配具有可讀字節長度的新數組
        compBuf.getBytes(array); //將字節讀入數組
        YourImpl.method(array, 0, array.length);//以數組、偏移量、長度為參數的Call方法
    }

  5.3 ByteBuf的字節操作

    5.3.1 隨機訪問索引

    ByteBuf buffer = ...; 
    for (int i = 0; i < buffer.capacity(); i ++) {
        byte b = buffer.getByte(i);
        System.out.println((char) b);
    }

    5.3.2 順序訪問索引

    5.3.3Discardable bytes廢棄字節

    5.3.4 可讀字節(實際內容)

    ByteBuf buffer = ...; 
    while (buffer.readable()) {
        System.out.println(buffer.readByte());
    }

    5.3.5 可寫字節Writable bytes

    ByteBuf buffer = ...; 
    while (buffer.writableBytes() >= 4) {
        buffer.writeInt(random.nextInt()); 
    }

    5.3.6 清除緩沖區索引Clearing the buffer indexs

    5.3.7 搜索操作Search operations

    5.3.8 標准和重置Mark and reset

    5.3.9 衍生的緩沖區Derived buffers

    Charset utf8 = Charset.forName("UTF-8");
    ByteBuf buf = Unpooled.copiedBuffer("Netty in Action rocks!", utf8); //創建ByteBuf,它包含給定字符串的字節
    ByteBuf sliced = buf.slice(0, 14); //創建ByteBuf的新片段,從索引0開始,以索引14結束
    System.out.println(sliced.toString(utf8); //包含在行動中的Netty
    buf.setByte(0, (byte) íJí); //更新索引0上的字節
    assert buf.get(0) == sliced.get(0);//不會失敗,因為ByteBuf共享相同的內容,因此對其中一個的修改在另一個上也是可見的

    5.3.10 讀/寫操作以及其他一些操作

  5.4 ByteBufHolder

    5.4.1 ByteBufAllocator

    5.4.2 Unpooled

    5.4.3 ByteBufUtil

6.ChannelHandler

  6.1 ChannelPipeline

    修改ChannelPipeline的方法     

      addFirst(...),添加ChannelHandler在ChannelPipeline的第一個位置
      addBefore(...),在ChannelPipeline中指定的ChannelHandler名稱之前添加ChannelHandler
      addAfter(...),在ChannelPipeline中指定的ChannelHandler名稱之后添加ChannelHandler
      addLast(ChannelHandler...),在ChannelPipeline的末尾添加ChannelHandler
      remove(...),刪除ChannelPipeline中指定的ChannelHandler
      replace(...),替換ChannelPipeline中指定的ChannelHandler

  6.2 ChannelHandlerContext

    6.2.1 通知下一個ChannelHandler

      ChannelHandlerContext、ChannelHandler、ChannelPipeline的關系

      事件通過渠道

    ChannelHandlerContext ctx = ..;
    Channel channel = ctx.channel(); //獲取屬於ChannelHandlerContext的通道的引用
    channel.write(Unpooled.copiedBuffer("Netty in Action",CharsetUtil.UTF_8));//通過通道寫入緩沖器

      信道管道事件

    ChannelHandlerContext ctx = ..;
    ChannelPipeline pipeline = ctx.pipeline();
    pipeline.write(Unpooled.copiedBuffer(ìNetty in Actionì,CharsetUtil.UTF_8));

      通過Channel或ChannelPipeline的通知:

    6.2.2 修改ChannelPipeline

  6.3 狀態模型

  6.4 ChannelHandler和其子類

    6.4.1 ChannelHandler中的方法

    6.4.2 ChannelInboundHandler

      channelRegistered,ChannelHandlerContext的Channel被注冊到EventLoop;
      channelUnregistered,ChannelHandlerContext的Channel從EventLoop中注銷
      channelActive,ChannelHandlerContext的Channel已激活
      channelInactive,ChannelHanderContxt的Channel結束生命周期
      channelRead,從當前Channel的對端讀取消息
      channelReadComplete,消息讀取完成后執行
      userEventTriggered,一個用戶事件被處罰

      channelWritabilityChanged,改變通道的可寫狀態,可以使用Channel.isWritable()檢查
      exceptionCaught,重寫父類ChannelHandler的方法,處理異常

    6.4.3 ChannelOutboundHandler

      bind,Channel綁定本地地址
      connect,Channel連接操作
      disconnect,Channel斷開連接
      close,關閉Channel
      deregister,注銷Channel
      read,讀取消息,實際是截獲ChannelHandlerContext.read()
      write,寫操作,實際是通過ChannelPipeline寫消息,Channel.flush()屬性到實際通道
      flush,刷新消息到通道

7.編解碼器Codec

  7.1 編解碼器Codec

  7.2 解碼器

    7.2.1 ByteToMessageDecoder

    7.2.2 ReplayingDecoder      

      讀取緩沖區的數據之前需要檢查緩沖區是否有足夠的字節,使用ReplayingDecoder就無需自己檢查;

    7.2.3 MessageToMessageDecoder

  7.3 編碼器

.

    7.3.1 MessageToByteEncoder

    7.3.2 MessageToMessageEncoder

  7.4 編解碼器

    7.4.1 byte-to-byte編解碼器

    7.4.2 ByteToMessageCodec

    7.4.3 MessageToMessageCodec

  7.5 其他編解碼方式

    7.5.1 CombinedChannelDuplexHandler

8.附帶的ChannelHandler和Codec

  8.1 使用SSL/TLS創建安全的Netty程序

public class SslChannelInitializer extends ChannelInitializer<Channel> {
    private final SSLContext context;
    private final boolean client;
    private final boolean startTls;

    public SslChannelInitializer(SSLContext context, boolean client, boolean startTls) {
        this.context = context;
        this.client = client;
        this.startTls = startTls;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        SSLEngine engine = context.createSSLEngine();
        engine.setUseClientMode(client);
        ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
    }
}   

    setHandshakeTimeout(long handshakeTimeout, TimeUnit unit),設置握手超時時間,ChannelFuture將得到通知
    setHandshakeTimeoutMillis(long handshakeTimeoutMillis),設置握手超時時間,ChannelFuture將得到通知
    getHandshakeTimeoutMillis(),獲取握手超時時間值
    setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit),設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗
    setHandshakeTimeoutMillis(long handshakeTimeoutMillis),設置關閉通知超時時間,若超時,ChannelFuture會關閉失敗
    getCloseNotifyTimeoutMillis(),獲取關閉通知超時時間
    handshakeFuture(),返回完成握手后的ChannelFuture
    close(),發送關閉通知請求關閉和銷毀

  8.2 使用Netty創建HTTP/HTTPS程序

    8.2.1 Netty的HTTP編碼器,解碼器和編解碼器

      HttpRequestEncoder,將HttpRequest或HttpContent編碼成ByteBuf
      HttpRequestDecoder,將ByteBuf解碼成HttpRequest和HttpContent
      HttpResponseEncoder,將HttpResponse或HttpContent編碼成ByteBuf
      HttpResponseDecoder,將ByteBuf解碼成HttpResponse和HttpContent

    8.2.2 HTTP消息聚合

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {

    private final boolean client;

    public HttpAggregatorInitializer(boolean client) {
        this.client = client;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
        pipeline.addLast("aggegator", new HttpObjectAggregator(512 * 1024));
    }

}

    8.2.3 HTTP壓縮

    8.2.4 使用HTTPS

    8.2.5 WebSocket

      BinaryWebSocketFrame,包含二進制數據
      TextWebSocketFrame,包含文本數據
      ContinuationWebSocketFrame,包含二進制數據或文本數據,BinaryWebSocketFrame和TextWebSocketFrame的結合體
      CloseWebSocketFrame,WebSocketFrame代表一個關閉請求,包含關閉狀態碼和短語
      PingWebSocketFrame,WebSocketFrame要求PongWebSocketFrame發送數據
      PongWebSocketFrame,WebSocketFrame要求PingWebSocketFrame響應

public class WebSocketServerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new HttpServerCodec(),
                new HttpObjectAggregator(65536),
                new WebSocketServerProtocolHandler("/websocket"),
                new TextFrameHandler(),
                new BinaryFrameHandler(),
                new ContinuationFrameHandler());
    }

    public static final class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
            // handler text frame
        }
    }

    public static final class BinaryFrameHandler extends SimpleChannelInboundHandler<BinaryWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, BinaryWebSocketFrame msg) throws Exception {
            //handler binary frame
        }
    }

    public static final class ContinuationFrameHandler extends SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ContinuationWebSocketFrame msg) throws Exception {
            //handler continuation frame
        }
    }
}

    8.2.6 SPDY

      SPDY(讀作“SPeeDY”)是Google開發的基於TCP的應用層協議,用以最小化網絡延遲,提升網絡速度,優化用戶的網絡使用體驗。

  8.3 處理空閑連接和超時

    IdleStateHandler,當一個通道沒有進行讀寫或運行了一段時間后出發IdleStateEvent
    ReadTimeoutHandler,在指定時間內沒有接收到任何數據將拋出ReadTimeoutException
    WriteTimeoutHandler,在指定時間內有寫入數據將拋出WriteTimeoutException

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        pipeline.addLast(new HeartbeatHandler());
    }

    public static final class HeartbeatHandler extends ChannelInboundHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
                "HEARTBEAT", CharsetUtil.UTF_8));

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

  8.4 解碼分隔符和基於長度的協議

    使用LineBasedFrameDecoder提取"\r\n"分隔幀:

/**
 * 處理換行分隔符消息
 *
 * @author c.k
 */
public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new LineBasedFrameDecoder(65 * 1204), new FrameHandler());
    }

    public static final class FrameHandler extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
            // do something with the frame 
        }
    }
}

    8.4.2 長度為基礎的協議

  8.5 寫大數據

  8.6 序列化數據

    8.6.1 普通的JDK序列化

    8.6.2 通過JBoss編組序列化

public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new MarshallingDecoder(unmarshallerProvider))
                .addLast(new MarshallingEncoder(marshallerProvider))
                .addLast(new ObjectHandler());
    }

    public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {
            // do something 
        }
    }
}

    8.6.3 使用ProtoBuf序列化

/**
 * 使用protobuf序列化數據,進行編碼解碼
 * 注意:使用protobuf需要protobuf-java-jar
 *
 * @author Administrator
 */
public class ProtoBufInitializer extends ChannelInitializer<Channel> {

    private final MessageLite lite;

    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(new ProtobufVarint32FrameDecoder())
                .addLast(new ProtobufEncoder())
                .addLast(new ProtobufDecoder(lite))
                .addLast(new ObjectHandler());
    }

    public final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception {
            // do something 
        }
    }
}

9.引導Netty應用程序

  9.1 不同的引導類型

  9.2 引導客戶端和無連接協議

  9.2.1 引導客戶端的方法    

    group(...),設置EventLoopGroup,EventLoopGroup用來處理所有通道的IO事件
    channel(...),設置通道類型
    channelFactory(...),使用ChannelFactory來設置通道類型
    localAddress(...),設置本地地址,也可以通過bind(...)或connect(...)
    option(ChannelOption<T>, T),設置通道選項,若使用null,則刪除上一個設置的ChannelOption
    attr(AttributeKey<T>, T),設置屬性到Channel,若值為null,則指定鍵的屬性被刪除
    handler(ChannelHandler),設置ChannelHandler用於處理請求事件
    clone(),深度復制Bootstrap,Bootstrap的配置相同
    remoteAddress(...),設置連接地址
    connect(...),連接遠程通道
    bind(...),創建一個新的Channel並綁定

  9.2.2 怎么引導客戶端

public class BootstrapingClient {

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class).handler(new SimpleChannelInboundHandler<ByteBuf>() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                System.out.println("Received data");
                msg.clear();
            }
        });
        ChannelFuture f = b.connect("1", 2048);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("connection finished");
                } else {
                    System.out.println("connection failed");
                    future.cause().printStackTrace();
                }
            }
        });
    }
}

    9.2.3 選擇兼容通道實現

  9.3 使用ServerBootstrap引導服務器

    9.3.1 引導服務器的方法

      group(...),設置EventLoopGroup事件循環組
      channel(...),設置通道類型
      channelFactory(...),使用ChannelFactory來設置通道類型
      localAddress(...),設置本地地址,也可以通過bind(...)或connect(...)
      option(ChannelOption<T>, T),設置通道選項,若使用null,則刪除上一個設置的ChannelOption
      childOption(ChannelOption<T>, T),設置子通道選項
      attr(AttributeKey<T>, T),設置屬性到Channel,若值為null,則指定鍵的屬性被刪除
      childAttr(AttributeKey<T>, T),設置子通道屬性
      handler(ChannelHandler),設置ChannelHandler用於處理請求事件
      childHandler(ChannelHandler),設置子ChannelHandler
      clone(),深度復制ServerBootstrap,且配置相同
      bind(...),創建一個新的Channel並綁定

    9.3.2 怎么引導服務器

public class BootstrapingServer {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
                        System.out.println("Received data");
                        msg.clear();
                    }
                });
        ChannelFuture f = b.bind(2048);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("Server bound");
                } else {
                    System.err.println("bound fail");
                    future.cause().printStackTrace();
                }
            }
        });
    }
}

  9.4 從Channel引導客戶端

public class BootstrapingFromChannel {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
                    ChannelFuture connectFuture;

                    @Override
                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                        Bootstrap b = new Bootstrap();
                        b.channel(NioSocketChannel.class).handler(
                                new SimpleChannelInboundHandler<ByteBuf>() {
                                    @Override
                                    protected void channelRead0(ChannelHandlerContext ctx,
                                                                ByteBuf msg) throws Exception {
                                        System.out.println("Received data");
                                        msg.clear();
                                    }
                                });
                        b.group(ctx.channel().eventLoop());
                        connectFuture = b.connect(new InetSocketAddress("1", 2048));
                    }

                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                            throws Exception {
                        if (connectFuture.isDone()) {
                            // do something with the data 
                        }
                    }
                });
        ChannelFuture f = b.bind(2048);
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    System.out.println("Server bound");
                } else {
                    System.err.println("bound fail");
                    future.cause().printStackTrace();
                }
            }
        });
    }
}

  9.5 添加多個ChannelHandler

public class InitChannelExample {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializerImpl());
        ChannelFuture f = b.bind(2048).sync();
        f.channel().closeFuture().sync();
    }

    static final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
        @Override
        protected void initChannel(Channel ch) throws Exception {
            ch.pipeline().addLast(new HttpClientCodec())
                    .addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
        }
    }
}

  9.6 使用通道選項和屬性

    public static void main(String[] args) {
    //創建屬性鍵對象 
        final AttributeKey<Integer> id = AttributeKey.valueOf("ID");
        //客戶端引導對象 
        Bootstrap b = new Bootstrap();
        //設置EventLoop,設置通道類型 
        b.group(new NioEventLoopGroup()).channel(NioSocketChannel.class)
                //設置ChannelHandler 
                .handler(new SimpleChannelInboundHandler<ByteBuf>() {
                    @Override
                    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg)
                            throws Exception {
                        System.out.println("Reveived data");
                        msg.clear();
                    }

                    @Override
                    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
                        //通道注冊后執行,獲取屬性值 
                        Integer idValue = ctx.channel().attr(id).get();
                        System.out.println(idValue);
                        //do something with the idValue 
                    }
                });
        //設置通道選項,在通道注冊后或被創建后設置 
        b.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
        //設置通道屬性 
        b.attr(id, 123456);
        ChannelFuture f = b.connect("www.manning.com", 80);
        f.syncUninterruptibly();
    }

10.單元測試代碼

  10.1 General

    writeInbound(Object...),寫一個消息到入站通道
    writeOutbound(Object...),寫消息到出站通道
    readInbound(),從EmbeddedChannel讀取入站消息,可能返回null
    readOutbound(),從EmbeddedChannel讀取出站消息,可能返回null
    finish(),標示EmbeddedChannel已結束,任何寫數據都會失敗

  10.2 測試ChannelHandler

    10.2.1 測試處理入站消息的handler

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {

    private final int frameLength;

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException(
                    "frameLength must be a positive integer: " + frameLength);
        }
        this.frameLength = frameLength;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {
        while (in.readableBytes() >= frameLength) {
            ByteBuf buf = in.readBytes(frameLength);
            out.add(buf);
        }
    }

}

    測試

public class FixedLengthFrameDecoderTest {

    @Test
    public void testFramesDecoded() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        // write bytes 
        Assert.assertTrue(channel.writeInbound(input));
        Assert.assertTrue(channel.finish());
        // read message 
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertNull(channel.readInbound());
    }

    @Test
    public void testFramesDecoded2() {
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        ByteBuf input = buf.duplicate();
        EmbeddedChannel channel = new EmbeddedChannel(
                new FixedLengthFrameDecoder(3));
        Assert.assertFalse(channel.writeInbound(input.readBytes(2)));
        Assert.assertTrue(channel.writeInbound(input.readBytes(7)));
        Assert.assertTrue(channel.finish());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertEquals(buf.readBytes(3), channel.readInbound());
        Assert.assertNull(channel.readInbound());
    }

}

    10.2.2 測試處理出站消息的handler

      解碼器

public class AbsIntegerEncoder extends MessageToMessageEncoder<ByteBuf> {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg,
                          List<Object> out) throws Exception {
        while (msg.readableBytes() >= 4) {
            int value = Math.abs(msg.readInt());
            out.add(value);
        }
    }
}

      測試

public class AbsIntegerEncoderTest {

    @Test
    public void testEncoded() {
        //創建一個能容納10個int的ByteBuf 
        ByteBuf buf = Unpooled.buffer();
        for (int i = 1; i < 10; i++) {
            buf.writeInt(i * -1);
        }
        //創建EmbeddedChannel對象 
        EmbeddedChannel channel = new EmbeddedChannel(new AbsIntegerEncoder());
        //將buf數據寫入出站EmbeddedChannel 
        Assert.assertTrue(channel.writeOutbound(buf));
        //標示EmbeddedChannel完成 
        Assert.assertTrue(channel.finish());
        //讀取出站數據 
        ByteBuf output = (ByteBuf) channel.readOutbound();
        for (int i = 1; i < 10; i++) {
            Assert.assertEquals(i, output.readInt());
        }
        Assert.assertFalse(output.isReadable());
        Assert.assertNull(channel.readOutbound());
    }

}

  10.3 測試異常處理

    解碼器

public class FrameChunkDecoder extends ByteToMessageDecoder {

    // 限制大小 
    private final int maxFrameSize;

    public FrameChunkDecoder(int maxFrameSize) {
        this.maxFrameSize = maxFrameSize;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {
        // 獲取可讀字節數 
        int readableBytes = in.readableBytes();
        // 若可讀字節數大於限制值,清空字節並拋出異常 
        if (readableBytes > maxFrameSize) {
            in.clear();
            throw new TooLongFrameException();
        }
        // 讀取ByteBuf並放到List中 
        ByteBuf buf = in.readBytes(readableBytes);
        out.add(buf);
    }

}

    測試代碼

public class FrameChunkDecoderTest {

    @Test
    public void testFramesDecoded() {
        //創建ByteBuf並填充9字節數據 
        ByteBuf buf = Unpooled.buffer();
        for (int i = 0; i < 9; i++) {
            buf.writeByte(i);
        }
        //復制一個ByteBuf 
        ByteBuf input = buf.duplicate();
        //創建EmbeddedChannel 
        EmbeddedChannel channel = new EmbeddedChannel(new FrameChunkDecoder(3));
        //讀取2個字節寫入入站通道 
        Assert.assertTrue(channel.writeInbound(input.readBytes(2)));
        try {
            //讀取4個字節寫入入站通道 
            channel.writeInbound(input.readBytes(4));
            Assert.fail();
        } catch (TooLongFrameException e) {

        }
        //讀取3個字節寫入入站通道 
        Assert.assertTrue(channel.writeInbound(input.readBytes(3)));
        //標識完成 
        Assert.assertTrue(channel.finish());
        //從EmbeddedChannel入去入站數據 
        Assert.assertEquals(buf.readBytes(2), channel.readInbound());
        Assert.assertEquals(buf.skipBytes(4).readBytes(3),
                channel.readInbound());
    }

}

11.WebSocket

12.SPDY

13.通過UDP廣播事件

14..實現自定義編解碼器

  14.1編解碼器的范圍

  14.2實現memcached編解碼器

  14.3了解memcached二進制協議

  14.4 Netty編碼器和解碼器

15.選擇正確的線程模型

  15.1線程模型概述

  15.2事件循環

    15.2.1使用事件循環

    15.2.2 Netty 4的I/O業務

    15.2.3 Netty 3的I/O業務

    15.2.4 Nettys線程模型內部件

  15.3為以后的執行安排任務

    15.3.1使用普通Java API調度任務

    15.3.2使用事件循環調度任務

    15.3.3計划實施內部

    15.4 I/O線程分配的詳細情況

16.用Eventloop注銷/重新注冊


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM