Netty高性能網絡應用框架對標P7面試題分享v4.1.70.Final


概述

**本人博客網站 **IT小神 www.itxiaoshen.com

定義

Netty官網 https://netty.io/ 最新版本為4.1.70.Final

Netty是一個異步的、事件驅動網絡應用框架,用於快速開發可靠、可維護的高性能協議服務器和客戶端。簡單的說Netty是一個基於NIO的客戶、服務器端的編程框架,它可以大大簡化如TCP和UDP套接字的網絡編程.

Netty由JBOSS提供的一個Java開源框架,現為 Github上的獨立項目Netty從許多協議如FTP、SMTP、HTTP和各種基於二進制和文本的協議的實現精心設計從而兼顧實現了易於開發、性能、穩定性和靈活性。分為核心層、傳輸服務、協議支持。

架構

最下面一層是Netty最底層最核心的部分包括零拷貝、API庫、可擴展的事件模型;上面右邊橙色部分Protocol Support協議支持,包括Http協議、WebSocket、SSL(安全套接字協議)、谷歌Protobuf協議、zlib/gzip壓縮與解壓縮、Large File Transfer大文件傳輸等等;紅色的部分Transport Services傳輸服務,包括Socket、Datagram、Http Tunnel、In-VM Pipe等等。足以看出Netty的功能、協議、傳輸方式都比較全,比較強大。

image-20211117112056271

特性

Netty 是目前Java技術棧中最流行的、首選的 NIO 框架,性能和穩定性都有保障,社區比較活躍,基於 Netty 進行二次定制服務開發成本小,提供了簡單易用的API從網絡處理代碼中解耦業務邏輯,且已得到成百上千的商業及商用項目驗證,許多框架和開源組件的底層 rpc 都是使用的Netty,如Dubbo、Elasticsearch 、RocketMQ以及大數據Hadoop、Spark等等。下面是Netty官方描述的特性:

  • 設計
    • 提供各種傳輸類型的統一API,使用阻塞和非阻塞套接字時候使用的是同一個 API,只是需要設置的參數不一樣。
    • 基於靈活和可擴展的事件模型明確實現關注點的分離。
    • 高度可定制的線程模型-單線程,一個或多個線程池,如SEDA。
    • 真正的無連接數據報套接字支持(UDP,從3.1開始)。
  • 易用性
    • 完善的 Javadoc 文檔和用戶指南、示例代碼。
    • 不需要額外的依賴,JDK 5 (Netty 3.x)或 JDK 6 (Netty 4.x)就足夠了。
    • 注意:一些組件(如HTTP/2)可能有更多的要求。更多信息請參閱需求頁面。
  • 性能
    • 更好的吞吐量,更低的延遲。
    • 更少的資源消耗。
    • 最小化不必要的內存拷貝。
  • 安全性
    • 完整的SSL/TLS和StartTLS支持
  • 社區
    • 發布的更早和更頻繁。
    • 社區驅動,作者自2003年以來一直在編寫類似的框架,關注反饋。

為何5.x系列不推薦

從官方上看4.x版本是當前官方推薦,4.x版本目前也一直在維護中,3.x版本是比較舊的版本,跟4.x版本相比變化比較大,特別是API。5.x是被舍棄的版本,官方不再支持! Netty 5.0以前是發布alpha版,之前也有一部分書籍是基於Netty5來寫的,從作者在GitHub上的回復得出:使用ForkJoinPool增加了復雜性,並且沒有顯示出明顯的性能優勢。同時保持所有分支的同步是一項相當大的工作,在當前的master中沒有任何東西可以證明一個新的版本是合理的。

image-20211118182105813

NIO簡單理解

NIO不是Java獨有的概念,實質上是為IO多路復用技術;它是由操作系統提供的系統調用,早期操作系統調用select,poll,但是性能低下,后來漸漸演化成了Linux下的epoll和Mac里的kqueue,使用最為廣泛的是epoll;而Netty就是基於Java NIO技術封裝的一套框架。為什么要封裝,因為原生的Java NIO使用起來沒那么方便,而且還有臭名昭著的bug,Netty把它封裝之后,提供了一個易於操作的使用模式和接口,用戶使用起來也就便捷多了。

image-20211128121703416

關於NIO我們簡單說明一下:

  • 客戶端監聽(Listen)時,Accept是阻塞的,只有新連接來了,Accept才會返回,主線程才能繼。
  • 讀寫socket時,Read是阻塞的,只有請求消息來了,Read才能返回,子線程才能繼續處理。
  • 讀寫socket時,Write是阻塞的,只有客戶端把消息收了,Write才能返回,子線程才能繼續讀取下一個請求。
  • 傳統的BIO模式下,從頭到尾的所有線程都是阻塞的,這些線程就干等着,占用系統的資源,什么事也不干。
  • 那么NIO首先是做到非阻塞,采用的是事件機制,通過線程Accept、讀寫操作,請求處理等;如果什么事都沒得做也不會死循環,它會將線程休眠起來,直到下一個事件來了再繼續干活。

Netty入門示例

Netty 4.1.70 源碼官網下載地址 https://github.com/netty/netty/archive/refs/tags/netty-4.1.70.Final.tar.gz

Netty GitHub下載地址 https://github.com/netty/netty

下載Netty,在Netty源碼中example提供不同協議的樣本代碼示例,官網有有樣例wiki說明,非常方便使用,各位伙伴可以根據具體場景選擇使用。

image-20211126192621899

image-20211126203239657

  • echo:非常基本的客戶機和服務器。
  • discard:了解如何異步發送無限數據流,而不會淹沒寫緩沖區。
  • uptime:實現自動重連機制。
  • telnet:一個經典的基於線路的網絡應用程序。
  • securechat:一種基於tls的聊天服務器,由Telnet示例衍生而來。
  • objectecho:交換可序列化的Java對象。
  • factorial:使用自定義二進制協議編寫有狀態客戶機和服務器。
  • worldclock:快速協議原型與谷歌協議緩沖區集成。
  • http snoop:構建自己的非常輕量級的HTTP客戶機和服務器。
  • file:文件服務器,異步大文件流在HTTP。
  • http websocketx:使用Web Sockets向HTTP添加雙向全雙工通信通道
  • proxy:編寫一個高效的隧道代理服務器。
  • udt bytes:在類似tcp的字節流模式下使用[UDT]
  • udt message:在類似udp的消息傳遞模式下使用[UDT]
  • udt rendezvousBytes:對稱點對點會合連接模式下的字節流
  • udt rendezvous:對稱點對點交會連接模式下的消息流
<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.70.Final</version>
</dependency>

服務端示例代碼EchoServer和EchoServerHandler

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

/**
 * Echoes back any received data from a client.
 */
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc()));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(serverHandler);
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * Handler implementation for the echo server.
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //獲取客戶端發送過來的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客戶端" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("歡迎來到Java Netty開源世界,讓我們一起學習吧!", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

客戶端示例代碼EchoServer和EchoServerHandler

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

/**
 * Sends one message when a connection is open and echoes back any received
 * data to the server.  Simply put, the echo client initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY, true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
                     if (sslCtx != null) {
                         p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                     }
                     //p.addLast(new LoggingHandler(LogLevel.INFO));
                     p.addLast(new EchoClientHandler());
                 }
             });

            // Start the client.
            ChannelFuture f = b.connect(HOST, PORT).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}
/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.itxs.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * Handler implementation for the echo client.  It initiates the ping-pong
 * traffic between the echo client and server by sending the first message to
 * the server.
 */
public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("大神你好,我想學習提升!", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服務端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

啟動服務端和客戶端,輸出如下

image-20211126202215504

image-20211126202253540

面試題

Netty與傳統JDK NIO的比較?

  • 傳統NIO缺點:
    • NIO的類庫和API繁雜,學習成本高,你需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
    • 需要熟悉Java多線程編程。這是因為NIO編程涉及到Reactor模式,你必須對多線程和網絡編程非常熟悉才能寫出高質量的NIO程序;還需要考慮考慮斷連重連、半包讀寫、失敗緩存等問題處理。
    • 臭名昭著的epoll bug。它會導致Selector空輪詢,最終導致CPU 100%。JDK NIO的bug,直到JDK1.7版本依然沒得到根本性的解決。
  • Netty優點:
    • 異步事件驅動框架,可快速開發高性能的服務端和客戶端.
    • API使用簡單,學習成本低。封裝了JDK底層BIO和NIO模型,提供更加簡單易用安全的 API.
    • 功能強大,內置了多種解碼編碼器,自帶編解碼器解決拆包粘包問題,無需用戶困擾,支持多種協議,自帶各種協議棧。
    • 性能高,對比其他主流的NIO框架,Netty的性能最,Reactor線程模型支持高並發海量連接.
    • 社區活躍,發現BUG會及時修復,迭代版本周期短,不斷加入新的功能。
    • Dubbo、Elasticsearch都采用了Netty,質量得到驗證。

Netty Channel和Jdk Nio包Channel關系簡單一句話就是Netty包裝Jdk Channel的對象,並設置為非阻塞模式

Netty的線程模型?

Netty 線程模型是典型的 Reactor 模型結構,其中常用的 Reactor 線程模型有三種,分別為:Reactor 單線程模型、Reactor 多線程模型和主從 Reactor 多線程模型。而在 Netty 的線程模型並非固定不變,通過在啟動輔助類中創建不同的 EventLoopGroup 實例並通過適當的參數配置,就可以支持這三種 Reactor 線程模型。

  • Reactor 單線程模型:Reactor 單線程模型指的是所有的 IO 操作都在同一個 NIO 線程上面完成。作為 NIO 服務端接收客戶端的 TCP 連接,作為 NIO 客戶端向服務端發起 TCP 連接,讀取通信對端的請求或向通信對端發送消息請求或者應答消息。由於 Reactor 模式使用的是異步非阻塞 IO,所有的 IO 操作都不會導致阻塞,理論上一個線程可以獨立處理所有 IO 相關的操作。

image-20211125170837019

//單線程模型簡單代碼示例
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
 .channel(NioServerSocketChannel.class)
  • Reactor 多線程模型:對於一些小容量應用場景,可以使用單線程模型,但是對於高負載、大並發的應用卻不合適,需要對該模型進行改進,演進為 Reactor 多線程模型。Rector 多線程模型與單線程模型最大的區別就是有一組 NIO 線程處理 IO 操作;有專門一個 NIO 線程 -Acceptor 線程用於監聽服務端,接收客戶端的 TCP 連接請求;而 1 個 NIO 線程可以同時處理N條鏈路,但是 1 個鏈路只對應 1 個 NIO 線程,防止發生並發操作問題。網絡 IO 操作-讀、寫等由一個 NIO 線程池負責,線程池可以采用標准的 JDK 線程池實現,它包含一個任務隊列和 N 個可用的線程,由這些 NIO 線程負責消息的讀取、解碼、編碼和發送。

image-20211125202156912

//多線程模型簡單代碼示例,bossGroup中只有一個線程,而workerGroup中的線程是CPU核心數乘以2,那么就對應Reactor的多線程模型。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
  • 主從 Reactor 多線程模型:在並發極高的情況單獨一個 Acceptor 線程可能會存在性能不足問題,為了解決性能問題,產生主從 Reactor 多線程模型。
    • 主從 Reactor 線程模型的特點是:服務端用於接收客戶端連接的不再是 1 個單獨的 NIO 線程,而是一個獨立的 NIO 線程池。
    • Acceptor 接收到客戶端 TCP 連接請求處理完成后,將新創建的 SocketChannel 注冊到 IO 線程池(sub reactor 線程池)的某個 IO 線程上,由它負責 SocketChannel 的讀寫和編解碼工作。
    • Acceptor 線程池僅僅只用於客戶端的登陸、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端 subReactor 線程池的 IO 線程上,由 IO 線程負責后續的 IO 操作。

image-20211125202702340

//主從Reactor多線程模型簡單代碼示例
EventLoopGroup bossGroup = new NioEventLoopGroup(4);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)

Netty 中的 BossGroup 為什么使用線程池?

  • 當系統在運行過程中,如果頻繁的進行線程上下文切換,會帶來額外的性能損耗。多線程並發執行某個業務流程,業務開發者還需要時刻對線程安全保持警惕,哪些數據可能會被並發修改,怎么保護?這不僅降低了開發效率,也會帶來額外的性能損耗。
  • 因此Netty采用了串行化設計理念,從消息的讀取、編碼以及后續 ChannelHandler 的執行,始終都由 IO 線程 EventLoop 負責,這就意外着整個流程不會進行線程上下文的切換,數據也不會面臨被並發修改的風險。

說說EventLoopGroup和EventLoop?

  • EventLoopGroup 是一組 EventLoop 的抽象,一個 EventLoopGroup 當中會包含一個或多個 EventLoop,EventLoopGroup 提供 next 接口,可以從一組 EventLoop 里面按照一定規則獲取其中一個 EventLoop 來處理任務。
  • EventLoopGroup 實際上就是個線程池,一個 EventLoopGroup 包含一個或者多個 EventLoop ;一個 EventLoop 在它的生命周期內只和一個 Thread 綁定;所有 EventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理;一個 Channel 在它的生命周期內只注冊於一個 EventLoop;每一個 EventLoop 負責處理一個或多個 Channel;
  • 在 Netty 服務器端編程中我們需要 Boss EventLoopGroup 和 Worker EventLoopGroup 這兩個 EventLoopGroup 來進行工作。
  • BossEventLoopGroup 通常是一個單線程的 EventLoop,EventLoop 維護着一個注冊了 ServerSocketChannel 的 Selector 實例,EventLoop 的實現涵蓋 IO 事件的分離和分發(Dispatcher),EventLoop 的實現充當 Reactor 模式中的分發(Dispatcher)的角色;所以通常可以將 BossEventLoopGroup 的線程數參數為 1。
  • BossEventLoop 只負責處理連接,故開銷非常小,連接到來,馬上按照策略將 SocketChannel 轉發給 WorkerEventLoopGroup,WorkerEventLoopGroup 會由 next 選擇其中一個 EventLoop 來將這 個SocketChannel 注冊到其維護的 Selector 並對其后續的 IO 事件進行處理。

說說粘包、拆包、半包以及Netty如果處理?

  • 粘包和半包,指的都不是一次是正常的 ByteBuf 緩存區接收。
    • 粘包,就是接收端讀取的時候,多個發送過來的 ByteBuf “粘”在了一起。換句話說,接收端讀取一次的 ByteBuf ,讀到了多個發送端的 ByteBuf ,是為粘包。
    • 半包,就是接收端將一個發送端的ByteBuf “拆”開了,形成一個破碎的包,我們定義這種 ByteBuf 為半包。換句話說,接收端讀取一次的 ByteBuf ,讀到了發送端的一個 ByteBuf的一部分,是為半包。
    • 比如我們應用層面使用了Netty,而對於操作系統來說只認TCP協議,盡管我們的應用層是按照 ByteBuf 為 單位來發送數據,server按照Bytebuf讀取,但是到了底層操作系統仍然是按照字節流發送數據,因此,數據到了服務端,也是按照字節流的方式讀入,然后到了 Netty 應用層面,重新拼裝成 ByteBuf,而這里的 ByteBuf 與客戶端按順序發送的 ByteBuf 可能是不對等的。因此,我們需要在客戶端根據自定義協議來組裝我們應用層的數據包,然后在服務端根據我們的應用層的協議來組裝數據包,這個過程通常在服務端稱為拆包,而在客戶端稱為粘包。
    • 拆包和粘包是相對的,一端粘了包,另外一端就需要將粘過的包拆開,發送端將三個數據包粘成兩個 TCP 數據包發送到接收端,接收端就需要根據應用協議將兩個數據包重新組裝成三個數據包。
    • 在沒有 Netty 的情況下,用戶如果自己需要拆包,基本原理就是不斷從 TCP 緩沖區中讀取數據,每次讀取完都需要判斷是否是一個完整的數據包,如果當前讀取的數據不足以拼接成一個完整的業務數據包,那就保留該數據,繼續從 TCP 緩沖區中讀取,直到得到一個完整的數據包。 如果當前讀到的數據加上已經讀取的數據足夠拼接成一個數據包,那就將已經讀取的數據拼接上本次讀取的數據,構成一個完整的業務數據包傳遞到業務邏輯,多余的數據仍然保留,以便和下次讀到的數據嘗試拼接。
  • Netty中的拆包器
    • 固定長度的拆包器 FixedLengthFrameDecoder:每個應用層數據包的都拆分成都是固定長度的大小,比如 1024字節。這個顯然不大適應在 Java 聊天程序 進行實際應用。
    • 行拆包器 LineBasedFrameDecoder:每個應用層數據包,都以換行符作為分隔符,進行分割拆分。這個顯然不大適應在 Java 聊天程序 進行實際應用。
    • 分隔符拆包器 DelimiterBasedFrameDecoder:每個應用層數據包,都通過自定義的分隔符,進行分割拆分。這個版本,是LineBasedFrameDecoder 的通用版本,本質上是一樣的。這個顯然不大適應在 Java 聊天程序 進行實際應用。
    • 基於數據包長度的拆包器 LengthFieldBasedFrameDecoder:將應用層數據包的長度,作為接收端應用層數據包的拆分依據。按照應用層數據包的大小,拆包。這個拆包器,有一個要求,就是應用層協議中包含數據包的長度。

簡單說說Netty高性能的原因?

  • IO模型、協議、線程模型(事件驅動、異步非阻塞、NIO多路復用非阻塞)都與性能強相關;IO通信性能三原則:傳輸(AIO)、協議(Http)、線程(主從Reactor線程模型)。
  • 無鎖串行化的設計理念:即消息的處理盡可能在同一個線程內完成,期間不進行線程切換,這樣就避免了多線程競爭和同步鎖。表面上看,串行化設計似乎CPU利用率不高,並發程度不夠。但是,通過調整NIO線程池的線程參數,可以同時啟動多個串行化的線程並行運行,這種局部無鎖化的串行線程設計相比一個隊列-多個工作線程模型性能更優。
  • 零拷貝:jdk bytebuffer:無法動態擴容,api使用復雜,讀寫切換時要手動調用flip和rewind等方法,capacity、readerindex、writerindex,支持順序讀寫操作。
  • 內存池管理,PoolByteBuf 是Netty內存池管理,比普通的new ByteBuf性能提高了數十倍。
  • 高效並發編程:Volatile的大量、正確使用;CAS和原子類的廣泛使用;線程安全容器的使用;通過讀寫鎖提升並發性能。
  • 對高性能序列化框架的支持,如protobuf。
  • 靈活TCP參數調優。

說說對於Netty的零拷貝理解?

  • 傳統意義的拷貝

    • 是在發送數據的時候,傳統的實現方式是:
      • File.read(bytes)
      • Socket.send(bytes)
    • 這種方式需要四次數據拷貝和四次上下文切換:
      • 數據從磁盤讀取到內核的read buffer。
      • 數據從內核緩沖區拷貝到用戶緩沖區。
      • 數據從用戶緩沖區拷貝到內核的socket buffer。
      • 數據從內核的socket buffer拷貝到網卡接口(硬件)的緩沖區。
  • 零拷貝概念

    • 明顯上面的第二步和第三步是沒有必要的,通過java的FileChannel.transferTo方法,可以避免上面兩次多余的拷貝(當然這需要底層操作系統支持),下面的兩次操作都不需要CPU參與,所以就達到了零拷貝。
      • 調用transferTo,數據從文件由DMA引擎拷貝到內核read buffer。
      • 接着DMA從內核read buffer將數據拷貝到網卡接口buffer。
  • Netty中的零拷貝主要體現在三個方面:

    • bytebuffer:Netty發送和接收消息主要使用bytebuffer,bytebuffer使用堆外內存(DirectMemory)直接進行Socket讀寫。原因:如果使用傳統的堆內存進行Socket讀寫,JVM會將堆內存buffer拷貝一份到直接內存中然后再寫入socket,多了一次緩沖區的內存拷貝。DirectMemory中可以直接通過DMA發送到網卡接口。堆外內存也即是DirectMemory,直接內存申請較慢但訪問較快,一般操作堆內內存-》直接內存-》系統調用-》硬盤/網卡,而非直接內存需要從堆內-》直接內存的二次拷貝。

    img

    • Composite Buffers:傳統的ByteBuffer,如果需要將兩個ByteBuffer中的數據組合到一起,我們需要首先創建一個size=size1+size2大小的新的數組,然后將兩個數組中的數據拷貝到新的數組中。但是使用Netty提供的組合ByteBuf,就可以避免這樣的操作,因為CompositeByteBuf並沒有真正將多個Buffer組合起來,而是保存了它們的引用,從而避免了數據的拷貝,實現了零拷貝。
    • 對於FileChannel.transferTo的使用:Netty中使用了FileChannel的transferTo方法,該方法依賴於操作系統實現零拷貝。
    • 不改變原來buf只是做邏輯拆分、合並、包裝,減少大量內存復制,並由此提升性能,使用 Netty 提供的 CompositeByteBuf 類, 可以將多個ByteBuf 合並為一個邏輯上的 ByteBuf, 避免了各個 ByteBuf 之間的拷貝。ByteBuf 支持 slice 操作, 因此可以將 ByteBuf 分解為多個共享同一個存儲區域的 ByteBuf, 避免了內存的拷貝。通過 FileRegion 包裝的FileChannel.tranferTo 實現文件傳輸, 可以直接將文件緩沖區的數據發送到目標 Channel, 避免了傳統通過循環 write 方式導致的內存拷貝問題。簡單示例用法:
    package com.itxs.main;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.CompositeByteBuf;
    import io.netty.buffer.Unpooled;
    
    public class AppMain {
        public static void main(String[] args) {
            byte[] arr = {1,2,3,4,5};
            //wrappedBuffer方法:將byte數組包裝ByteBuf對象
            ByteBuf byteBuf = Unpooled.wrappedBuffer(arr);
            System.out.println(byteBuf.getByte(3));
            arr[3] = 6;
            System.out.println(byteBuf.getByte(3));
    
            ByteBuf byteBuf1 = Unpooled.wrappedBuffer("hello-netty".getBytes());
            //slice方法:將一個ByteBuf對象切分為多個ByteBuf對象
            ByteBuf sliceByteBuf = byteBuf1.slice(1, 2);
            sliceByteBuf.unwrap();
            System.out.println(sliceByteBuf.toString());
    
            ByteBuf buffer1 = Unpooled.buffer(3);
            buffer1.writeByte(1);
            ByteBuf buffer2 = Unpooled.buffer(3);
            buffer2.writeByte(4);
            CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
            //CompositeByteBuf:將多個ByteBuf合並為一個邏輯上ByteBuf,避免各個ByteBuf之前的拷貝
            CompositeByteBuf compositeByteBufNew = compositeByteBuf.addComponents(true, buffer1, buffer2);
            System.out.println(compositeByteBufNew);
        }
    }
    

    Netty內部執行大體流程?

  • 服務端流程

    • 1、創建ServerBootStrap實例。
    • 2、設置並綁定Reactor線程池:EventLoopGroup,EventLoop就是處理所有注冊到本線程的Selector上面的Channel。
    • 3、設置並綁定服務端的channel。
    • 4、5、創建處理網絡事件的ChannelPipeline和handler,網絡時間以流的形式在其中流轉,handler完成多數的功能定制:比如編解碼 SSl安全認證.
    • 6、綁定並啟動監聽端口。
    • 7、當輪訓到准備就緒的channel后,由Reactor線程:NioEventLoop執行pipline中的方法,最終調度並執行channelHandler客戶端

image-20211125210021289

  • 客戶端流程

image-20211125210208526

  • 一個網絡請求步驟:
  1. 准備消息。
  2. 編碼,比如有一個字符串的消息要發送出去,那么發送出去之前要把這個字符串消息轉為字節(Byte),這是因為網絡上傳輸的不能是原本的字符串;又比如要給發送出去的消息加一個消息標識,用來以后另一端收消息的程序可以用來解決粘包拆包問題。
  3. 將消息發送到網絡通道,write方法。
  4. 網絡傳輸。
  5. 程序另一端讀取數據,read方法。
  6. 解碼,和編碼相對應,比如發送過來的消息,是字符串轉為字節,那么解碼要做的事就是把字節轉為字符串;又或者數據還加了標識,就要根據這個標識去讀取數據,解決粘包拆包問題。
  7. 處理業務邏輯,比如是做群聊場景,那么當程序收到消息之后,要將這條轉發給對應群聊中的每一個人。
  8. 准備數據響應給消息發送者。

說說Netty的核心組件?

  • Channel

    • 一種連接到網絡套接字或能進行讀、寫、連接和綁定等I/O操作的組件。
    • Channel 接口是 Netty 對網絡操作抽象類,它除了包括基本的 I/O 操作,如 bind()、connect()、read()、write() 等。
    • 比較常用的Channel接口實現類是NioServerSocketChannel(服務端)和NioSocketChannel(客戶端),這兩個 Channel 可以和 BIO 編程模型中的ServerSocket以及Socket兩個概念對應上。Netty 的 Channel 接口所提供的 API,大大地降低了直接使用 Socket 類的復雜性。
    • 此外OioSocketChannel為同步阻塞的客戶端 TCP Socket 連接;OioServerSocketChannel為同步阻塞的服務器端 TCP Socket 連接。
  • EventLoop

    • EventLoop(事件循環)接口可以說是 Netty 中最核心的概念。EventLoop 定義了 Netty 的核心抽象,用於處理連接的生命周期中所發生的事件。
    • EventLoop 的主要作用實際就是負責監聽網絡事件並調用事件處理器進行相關 I/O 操作的處理。
  • Channel和EventLoop關系

    • Channel 為 Netty 網絡操作(讀寫等操作)抽象類,EventLoop 負責處理注冊到其上的Channel 處理 I/O 操作,兩者配合參與 I/O 操作。
  • ChannelFuture

    • ChannelFuture提供操作完成時一種異步通知的方式。一般在Socket編程中,等待響應結果都是同步阻塞的,而Netty則不會造成阻塞,因為ChannelFuture是采取類似觀察者模式的形式進行獲取結果
    • Netty 是異步非阻塞的,所有的 I/O 操作都為異步的;不能立刻得到操作是否執行成功,但是可以通過 ChannelFuture 接口的 addListener() 方法注冊一個 ChannelFutureListener,當操作執行成功或者失敗時,監聽就會自動觸發返回結果。並且還可以通過ChannelFuture 的 channel() 方法獲取關聯的Channel。還可以通過 ChannelFuture 接口的 sync()方法讓異步的操作變成同步的。
  • ChannelHandler和ChannelPipeline

    • 指定了序列化編解碼器以及自定義的 ChannelHandler 處理消息。
    • ChannelHandler 是消息的具體處理器。他負責處理讀寫操作、客戶端連接等事情。
    • ChannelPipeline 為 ChannelHandler 的鏈,提供了一個容器並定義了用於沿着鏈傳播入站和出站事件流的 API 。當 Channel 被創建時,它會被自動地分配到它專屬的 ChannelPipeline。
    • 可以在 ChannelPipeline 上通過 addLast() 方法添加一個或者多個ChannelHandler ,因為一個數據或者事件可能會被多個 Handler 處理。當一個 ChannelHandler 處理完之后就將數據交給下一個 ChannelHandler 。
    • Netty設計ChannelHandlerContext上下文對象,就可以拿到channel、pipeline等對象,就可以進行讀寫等操作。ChannelHandlerContext在pipeline中是一個鏈表的形式。
  • EventloopGroup與EventLoop是什么關系

    • EventLoopGroup 包含多個 EventLoop(每一個 EventLoop 通常內部包含一個線程), EventLoop 的主要作用實際就是負責監聽網絡事件並調用事件處理器進行相關 I/O 操作的處理 ;且 EventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理,即 Thread 和 EventLoop 屬於 1 : 1 的關系,從而保證線程安全。

    • Boss EventloopGroup 用於接收連接,Worker EventloopGroup 用於具體的處理(消息的讀寫以及其他邏輯處理) ; 當客戶端通過 connect 方法連接服務端時,BossGroup 處理客戶端連接請求。當客戶端處理完成后,會將這個連接提交給 WorkerGroup 來處理,然后 workerGroup 負責處理其 IO 相關操作。

    image-20211128103026212

    image-20211127180311744

  • **Bootstrap 和 ServerBootstrap **怎么理解?

    • Bootstrap 是客戶端的啟動引導類/輔助類。ServerBootstrap 客戶端的啟動引導類/輔助類。Bootstrap和ServerBootStrap是Netty提供的一個創建客戶端和服務端啟動器的工廠類,使用這個工廠類非常便利地創建啟動類,根據上面的一些例子,其實也看得出來能大大地減少了開發的難度。都是繼承於AbstractBootStrap抽象類,所以大致上的配置方法都相同。

    image-20211127175809129

    • Bootstrap 通常使用 connet() 方法連接到遠程的主機和端口,作為一個 Netty TCP 協議通信中的客戶端。另外,Bootstrap 也可以通過 bind() 方法綁定本地的一個端口,作為 UDP 協議通信中的一端;ServerBootstrap通常使用 bind() 方法綁定本地的端口上,然后等待客戶端的連接。
    • Bootstrap 只需要配置一個線程組— EventLoopGroup ,而 ServerBootstrap需要配置兩個線程組— EventLoopGroup ,一個用於接收連接,一個用於具體的處理。
  • Selector

    • Netty中的Selector也和NIO的Selector是一樣的,就是用於監聽事件,管理注冊到Selector中的channel,實現多路復用器。

    Netty內部結構Selector位置如下圖所示

    image-20211125210709129

  • 自定義序列化編解碼器

    • 在 Java 中自帶的有實現 Serializable 接口來實現序列化,但由於它性能、安全性等原因一般情況下是不會被使用到的。
    • 通常情況下,我們使用 Protostuff、Hessian2、json 序列方式比較多,另外還有一些序列化性能非常好的序列化方式也是很好的選擇:
    • 專門針對 Java 語言的:Kryo,FST 等等跨語言的:Protostuff(基於 protobuf 發展而來),ProtoBuf,Thrift,Avro,MsgPack 等等

客戶端啟動類Bootstrap的connect過程?

客戶端啟動類Bootstrap的connect過程有一些需要先配置比如核心的有netty的工作線程池EventLoopGroup和Netty Channel實現類、還有Channel內部Pipeline依賴處理器單元,配置完客戶啟動后先將Netty Channel的實現類進行反射創建出來一個Netty的Channel對象,初始化Channel內部Pipeline,將處理器單元也即是handler裝載到內部的Channel管道里,后續IO操作可以用到,還需要一個線程池NioEventLoopGroup才能工作,將Channel注冊EventLoop。使用Bootstrap創建啟動器的步驟可分為以下幾步:

image-20211127175840961

Channel注冊到EventLoop中,這一步做了哪些?

NioEventLoop不單是一個線程,里面線程處理IO事件,當然也可以處理普通任務,因此有任務隊列和NIO 核心類Selector,它調用操作系統函數完成一對多的監聽,NioEventLoop類內部持有這個Selector對象來監聽socket的,Channel注冊到EventLoop其實底層就是將Netty Channel對象內部維護注冊到的Jdk Nio Channel Selector對象。

NioEventLoop的工作過程?

  • 創建后就會去執行自身的run方法,這里面是個死循環。EventLoop中維護一個Selector實例
  • 需要計算出一個IO選擇策略,比如是使用Selector.select(阻塞到有socket就緒) select now(非阻塞),選擇哪種策略主要看NioEventLoop任務隊列內是否有本地任務執行,如果有調阻塞就不太優雅,因為這樣會延遲非IO任務的執行。
  • 接下來就要處理selectkeys集合他表示本次Selector刷選出來就緒狀態的Channel,迭代key集合,從每個key拿到關聯channel,再看是讀就緒還是寫就緒,比如讀就緒,將socket緩沖區load到一個Bytebuf中,然后調用當前Channel的處理管道Pipeline傳播讀事件接口方法,也就是Pipeline的fireChannelRead方法,就這樣從socket收到數據放到這個Channel的Pipeline中,在Pipeline中依次調用Inbound接口的Handler處理器進行讀就緒事件處理,本次處理完IO事件后,NioEventLoop接下來會處理內部任務。

Pipeline設計模式理解?

  • 管道意思,可以安插任意處理單元,依次傳遞,in事件處理、out事件處理,加密、轉換為json字符串、編碼器,先寫channel,最后會刷到channel關聯的socket寫緩存區,提供一個可插拔靈活的處理框架。
  • netty高可擴展性也正是來源pipeline責任鏈的設計模式。協議解碼器:將二進制數據轉為java對象;協議編碼器:將java對象轉為二進制數據;業務邏輯處理程序-執行實際的業務邏輯。
  • 處理器Handler主要分為兩種:ChannelInboundHandlerAdapter(入站處理器)、ChannelOutboundHandler(出站處理器),入站指的是數據從底層java NIO Channel到Netty的Channel。出站指的是通過Netty的Channel來操作底層的java NIO Channel。
  • 在channel中裝配ChannelHandler流水線處理器,一個channel可能需要多個channelHandler處理器和順序的。pipeline相當於處理器的容器。初始化channel時,把channelHandler按順序裝在pipeline中,就可以實現按序執行channelHandler了。在一個Channel中,只有一個ChannelPipeline。該pipeline在Channel被創建的時候創建。ChannelPipeline包含了一個ChannelHander形成的列表,且所有ChannelHandler都會注冊到ChannelPipeline中。
  • ChannelInboundHandlerAdapter處理器常用的事件有
    1. 注冊事件 fireChannelRegistered。
    2. 連接建立事件 fireChannelActive。
    3. 讀事件和讀完成事件 fireChannelRead、fireChannelReadComplete。
    4. 異常通知事件 fireExceptionCaught。
    5. 用戶自定義事件 fireUserEventTriggered。
    6. Channel 可寫狀態變化事件 fireChannelWritabilityChanged。
    7. 連接關閉事件 fireChannelInactive。
  • ChannelOutboundHandler處理器常用的事件有
    1. 端口綁定 bind。
    2. 連接服務端 connect。
    3. 寫事件 write。
    4. 刷新時間 flush。
    5. 讀事件 read。
    6. 主動斷開連接 disconnect。
    7. 關閉 channel 事件 close。

image-20211127171420697

Netty Pipeline 當前處理單元處理完畢后有做一個小優化是什么?

每個處理單元的Handler對象都有一個int類型的mask字段,表示當前handler對上層Inbound或Outbound指定這個接口方法有沒有復寫,有為1無為0,主要可以避免一些空調用,可以找到后面第一個實現事件Handler,可以跳過沒有實現響應事件的Handler。

Netty是一個全異步的框架,怎么做到?

主要是Promise接口,他是Future接口的一個增強,submit返回future句柄, get獲取任務結果阻塞線程,原因內部沒有線程資源,Netty Promise實現類ChannelPromise內部有一個Channel對象,注冊到NioEventLoop,同事也就持有NioEventLoop,內部有線程資源,Channel Promise有線程資源了,所以Channel Promise可以注冊一些Listener對象,等任務完成后可以處理后續事情。

Netty 線程池理解?

  • NioEventLoopGroup 默認的構造函數實際會起的線程數為 CPU核心數*2

    image-20211128112212513

  • NioEventLoopGroup,NioEventLoop池和單個線程的關系,group線程池有next方法返回EventLoop線程對象,內部線程Thread是延遲創建的,雖然先創建出來,但是NioEventLoop內部線程對象並不會創建,等接到第一個任務才創建,NioEventLoop不像是單線程,一個線程的線程池(single exector方法),EventLoop內部有隊列,可以做Selector工作也做普通任務,NioEventLoop內部隊列也做了優化,原來采用LinkedBlockingQueue+Condition條件隊列,Netty沒有使用JDK JUC的Queue而是使用JcTools的Queue,單多生產者但多消費者場景,背后也應該是CAS實現。

  • 每個NioEventLoopGroup對象內部都會分配一組NioEventLoop,其大小是 nThreads, 這樣就構成了一個線程池, 一個NIOEventLoop 和一個線程相對應,這和我們上面說的 EventloopGroup 和 EventLoop關系這部分內容相對應。

為什么需要心跳機制?Netty 中心跳機制了解么?

  • 在 TCP 保持長連接的過程中,可能會出現斷網等網絡異常出現,異常發生的時候, client 與 server 之間如果沒有交互的話,它們是無法發現對方已經掉線的。為了解決這個問題, 我們就需要引入心跳機制。
  • 心跳機制的工作原理是: 在 client 與 server 之間在一定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器就會發送一個特殊的數據包給對方, 當接收方收到這個數據報文后, 也立即發送一個特殊的數據報文, 回應發送方, 此即一個 PING-PONG 交互。所以, 當某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性.
  • TCP 實際上自帶的就有長連接選項,本身是也有心跳包機制,也就是 TCP 的選項:SO_KEEPALIVE。但是,TCP 協議層面的長連接靈活性不夠。所以,一般情況下我們都是在應用層協議上實現自定義心跳機制的,也就是在 Netty 層面通過編碼實現。通過 Netty 實現心跳機制的話,核心類是 IdleStateHandler 。

說說taskQueue和scheduleTaskQueue?

如果Handler處理器有一些長時間的業務處理,可以交給taskQueue異步處理,代碼使用示例如下:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //獲取客戶端發送過來的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客戶端" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        ctx.channel().eventLoop().execute(() -> {
            try {
                Thread.sleep(2000);
                System.out.println("耗時業務處理");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }

通過debug調試就可以看到taskQueue有一個剛剛添加進去的任務

image-20211126205853698

延時任務隊列和上面任務隊列非常相似,只是多了一個可延遲一定時間再執行的設置,代碼使用示例如下:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        //獲取客戶端發送過來的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客戶端" + ctx.channel().remoteAddress() + "發送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));

        ctx.channel().eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    //長時間操作,不至於長時間的業務操作導致Handler阻塞
                    Thread.sleep(1000);
                    System.out.println("長時間的業務處理");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },10, TimeUnit.SECONDS);//10秒后執行
    }

同樣在debug進行調試可以查看到有一個scheduleTaskQueue任務待執行中

image-20211128104231997


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM