netty通信


學習netty之前,要先了解操作系統中的IO零拷貝(已經附上鏈接了)

一、netty的簡單介紹

  • Netty 是由 JBOSS 提供的一個 Java 開源框架,現為 Github 上的獨立項目。
  • Netty 是一個異步的、基於事件驅動的網絡應用框架,用以快速開發高性能、高可靠性的網絡 IO 程序。
  • Netty 主要針對在 TCP 協議下,面向 Client 端的高並發應用,或者 **Peer-to-Peer **場景下的大量數據持續傳輸的應用。
  • Netty 本質是一個 NIO 框架,適用於服務器通訊相關的多種應用場景。
  • Dubbo 協議默認使用 Netty 作為基礎通信組件,用於實現各進程節點之間的內部通信

為什么有了netty框架

原生的NIO存在問題:

  • NIO的類庫和API繁雜:需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer
  • 要熟悉Java多線程編程,因為NIO編程涉及到Reactor模式
  • Selector可能出現空輪詢,占用CPU資源

netty是對NIO進行封裝,解決了上述問題:

  • 設計優雅:
    適用於各種傳輸類型的統一API,阻塞和非阻塞Socket;
    基於靈活且可擴展的事件模型,可以清晰地分離關注點;
    高度可定制的線程模型
  • 高性能、吞吐量更高:
    延遲更低;
    減少資源消耗;
    最小化不必要的內存復制。
  • 安全:完整的 SSL/TLS 和 StartTLS 支持。
  • 社區活躍、不斷更新

二、線程模式

下面講解的是netty線程模式的由來

現有的線程模式:

  • BIO的傳統阻塞IO服務模型
  • NIO的Reactor模型
    單 Reactor 單線程;
    單 Reactor多線程;
    主從 Reactor多線程

2.1 BIO的傳統阻塞IO服務模型

1、每個連接都需要獨立的線程完成數據的輸入,業務處理,數據返回。當並發數很大,就會創建大量的線程,占用很大系統資源。
2、連接創建后,如果當前線程暫時沒有數據可讀,該線程會阻塞在 Handler對象中的read 操作,導致上面的處理線程資源浪費。
image

2.2 NIO的Reactor模型

基於I/O多路復用模型:多個客戶端進行連接,先把連接請求給Reactor,多個連接共用一個阻塞對象Reactor,由Reactor負責監聽和分發,當客戶端連接沒有數據時不會阻塞線程。
基於線程池復用線程資源:不必再為每個連接創建線程,將連接完成后的業務處理任務分配給線程進行處理,一個線程可以處理多個連接的業務。(解決了當並發數很大時,會創建大量線程,占用很大系統資源)

Reactor模式中核心組成:

  • Reactor:在一個單獨的線程中運行,負責監聽和分發事件,分發給適當的處理線程來對IO事件做出反應。
  • Handlers:處理線程執行IO事件。

單Reactor單線程

特點:

  • 該模型簡單沒有多線程的競爭,由一個線程完成所有的操作(監聽、分發、執行),沒有充分利用多核CPU
  • 因為Reactor是單線程運行,因此在處理某個handler的IO事件時,其他的handler需要進行等待,等待時間長因為該線程還要處理業務;
  • 當Reactor出現問題,就會造成業務模塊不可用

image
上圖解析:

  • Reactor對象通過select監控客戶端請求事件,收到事件后通過dispatch 進行分發
  • 如果是建立連接請求事件,則由Acceptor通過accept處理連接請求,然后創建一個 Handler 對象處理連接完成后的后續業務處理
  • 如果不是建立連接事件,則Reactor會分發處理線程來處理Handler的IO事件(完成 Read → 業務處理 → send 的完整業務流程)

單Reactor多線程

特點:

  • 充分利用多核CPU,可能出現多線程競爭
  • 因為Reactor是單線程運行,因此在處理某個handler的IO事件時,其他的handler需要進行等待,等待時間短因為處理業務交給worker線程池執行;
  • Reactor承擔所有的事件的監聽和響應,它是單線程運行,在高並發場景容易出現性能瓶頸
  • 當Reactor出現問題,就會造成業務模塊不可用

image

上圖解析:

  • Reactor對象通過select監控客戶端請求事件,收到事件后,通過Dispatch 進行分發
  • 如果是建立連接請求事件,則由Acceptor通過accept處理連接請求,然后創建一個Handler對象處理完成連接后的各種事件
  • 如果不是連接請求事件,則Reactor會分發處理線程來處理Handler的IO事件,handler只負責響應事件(read、send),不做具體的業務處理交給worker線程池執行(這樣不會使handler阻塞太久)
  • worker線程池會分配獨立的worker線程完成真正的業務,並將結果返回給handler,handler收到響應后,通過send將結果返回給client

主從Reactor多線程

特點:

  • 主Reactor可已有多個,從Reactor也可以有多個
  • 主Reactor負責處理建立連接請求事件,從Reactor負責處理業務請求事件
  • 當從Reactor出現問題,可以調用其他的從Reactor提供服務

image

上圖解析:

  • Reactor主線程 MainReactor對象通過select監聽連接事件,收到事件后,通過Acceptor的accept處理連接事件
  • 當Acceptor處理連接事件后,MainReactor將連接分配給SubReactor,subreactor將連接加入到連接隊列進行監聽,並創建handler進行各種事件處理
  • 當有新事件發生時,subreactor就會分發處理線程來處理handler,handler通過read讀取數據,分發給后面的worker線程池處理。
  • worker線程池分配獨立的worker線程進行業務處理,並返回結果。handler 收到響應的結果后,再通過send將結果返回給client
  • Reactor主線程可以對應多個Reactor子線程,即MainRecator可以關聯多個 SubReactor

生活中的體現:
單 Reactor 單線程:前台接待員和服務員是同一個人,全程為顧客服務
單 Reactor 多線程:1 個前台接待員,多個服務員,接待員只負責接待
主從 Reactor 多線程:多個前台接待員,多個服務生

2.3 netty線程模型

netty線程模型主要是依據主從Reactor多線程
image

  • BossGroup中的NioEventLoop就像MainReactor可以有多個,WorkerGroup中的NioEventLoop就像SubReactor一樣可以有多個。

  • Netty抽象出兩組線程池,BossGroup專門負責接收客戶端的連接,WorkerGroup專門負責網絡的讀寫

  • BossGroup和WorkerGroup類型都是NioEventLoopGroup,NioEventLoopGroup相當於一個事件循環組,這個組中含有多個事件循環,每一個事件循環是NioEventLoop(NioEventLoopGroup可以有多個線程,即可以含有多個NioEventLoop)

  • NioEventLoop表示一個不斷循環的執行處理任務的線程,每個 NioEventLoop都有一個Selector,用於監聽綁定在其上的socket的網絡通訊

  • 每個BossGroup下面的NioEventLoop循環執行的步驟有3步:
    1、輪詢 accept 事件
    2、處理 accept 事件,與 client 建立連接,生成NioSocketChannel,並將其注冊到某個WorkerGroup的NioEventLoop上的Selector
    3、繼續處理任務隊列的任務,即runAllTasks

  • 每個WorkerGroup下面的NioEventLoop循環執行的步驟有3步:
    1、輪詢 read,write 事件
    2、處理 I/O 事件,即 read,write 事件,在對應NioSocketChannel處理
    3、處理任務隊列的任務,即runAllTasks

  • 每個WorkerGroup的NioEventLoop處理業務時,會使用pipeline(管道),pipeline中包含了channel(通道),即通過pipeline可以獲取到對應通道,每個通道中都有一個channelPipeline維護了很多的處理器channelhandler。

netty案例-TCP服務

服務端:
NettyServer

NettyServer代碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyServer {
    public static void main(String[] args) throws Exception {


        //創建BossGroup 和 WorkerGroup
        //說明
        //1. 創建兩個線程組 bossGroup 和 workerGroup
        //2. bossGroup 只是處理連接請求 , 真正的和客戶端業務處理,會交給 workerGroup完成
        //3. 兩個都是無限循環
        //4. bossGroup 和 workerGroup 含有的子線程(NioEventLoop)的個數
        //   默認實際 cpu核數 * 2
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8



        try {
            //創建服務器端的啟動對象,配置參數
            ServerBootstrap bootstrap = new ServerBootstrap();

            //使用鏈式編程來進行設置
            bootstrap.group(bossGroup, workerGroup) //設置兩個線程組
                    .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作為服務器的通道實現
                    .option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列等待連接個數
                    .childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態
//                    .handler(null) // 該 handler對應 bossGroup , childHandler 對應 workerGroup
                    .childHandler(new ChannelInitializer<SocketChannel>() {//創建一個通道初始化對象(匿名對象)
                        //給pipeline 設置處理器
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            System.out.println("客戶socketchannel hashcode=" + ch.hashCode()); //可以使用一個集合管理 SocketChannel, 再推送消息時,可以將業務加入到各個channel 對應的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    }); // 給我們的workerGroup 的 EventLoop 對應的管道設置處理器

            System.out.println(".....服務器 is ready...");

            //綁定一個端口並且同步生成了一個 ChannelFuture 對象(也就是立馬返回這樣一個對象)
            //啟動服務器(並綁定端口)
            ChannelFuture cf = bootstrap.bind(6668).sync();

            //給cf 注冊監聽器,監控我們關心的事件

            cf.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (cf.isSuccess()) {
                        System.out.println("監聽端口 6668 成功");
                    } else {
                        System.out.println("監聽端口 6668 失敗");
                    }
                }
            });


            //對關閉通道事件  進行監聽
            cf.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

}


NettyServerHandler

NettyServerHandler代碼
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;

import java.util.concurrent.TimeUnit;

/*
說明
1. 我們自定義一個Handler 需要繼承netty 規定好的某個HandlerAdapter(規范)
2. 這時我們自定義一個Handler , 才能稱為一個handler
 */
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    //讀取數據事件(這里我們可以讀取客戶端發送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文對象, 含有 管道pipeline , 通道channel, 地址
    2. Object msg: 就是客戶端發送的數據 默認Object
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("服務器讀取線程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
        System.out.println("server ctx =" + ctx);
        System.out.println("看看channel 和 pipeline的關系");
        Channel channel = ctx.channel();
        ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向鏈表


        //將 msg 轉成一個 ByteBuf
        //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("客戶端地址:" + channel.remoteAddress());
    }

    //數據讀取完畢
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

        //writeAndFlush 是 write + flush
        //將數據寫入到緩存,並刷新
        //一般講,我們對這個發送的數據進行編碼
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));
    }

    //發生異常后, 一般是需要關閉通道

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客戶端:
NettyClient

NettyClient代碼
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyClient {
    public static void main(String[] args) throws Exception {

        //客戶端需要一個事件循環組
        EventLoopGroup group = new NioEventLoopGroup();


        try {
            //創建客戶端啟動對象
            //注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();

            //設置相關參數
            bootstrap.group(group) //設置線程組
                    .channel(NioSocketChannel.class) // 設置客戶端通道的實現類(反射)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
                        }
                    });

            System.out.println("客戶端 ok..");

            //啟動客戶端去連接服務器端
            //關於 ChannelFuture 要分析,涉及到netty的異步模型
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
            //對關閉通道事件  進行監聽
            channelFuture.channel().closeFuture().sync();
        }finally {

            group.shutdownGracefully();

        }
    }
}

NettyClientHandler

NettyClientHandler代碼
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    //當通道就緒就會觸發該方法
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client " + ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
    }

    //當通道有讀取事件時,會觸發
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服務器回復的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服務器的地址: "+ ctx.channel().remoteAddress());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


netty中的參數組件

1、Bootstrap、ServerBootstrap

Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程序,串聯各個組件,Netty 中 Bootstrap 類是客戶端程序的啟動引導類,ServerBootstrap 是服務端啟動引導類。

2、Future、ChannelFuture

Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理。但是可以立即返回一個ChannelFuture,它可以注冊一個監聽,當操作執行成功或失敗時監聽會自動觸發注冊的監聽事件

  • Channel channel(),返回當前正在進行 IO 操作的通道
  • ChannelFuture sync(),等待異步操作執行完畢,同步執行注冊的監聽事件

Future-Listener 機制
當Future對象剛剛創建時,處於非完成狀態,調用者可以通過返回的 ChannelFuture來獲取操作執行的狀態,注冊監聽函數來執行完成后的操作。

  • 通過 isDone 方法來判斷當前操作是否完成;
  • 通過 isSuccess 方法來判斷已完成的當前操作是否成功;
  • 通過 getCause 方法來獲取已完成的當前操作失敗的原因;
  • 通過 isCancelled 方法來判斷已完成的當前操作是否被取消;
  • 通過 addListener 方法來注冊監聽器,當操作已完成(isDone方法返回完成),將會通知指定的監聽器;

3、Channel

  • Netty 網絡通信的組件,能夠用於執行網絡 I/O 操作。
  • Netty 網絡通信的組件,能夠用於執行網絡 I/O 操作。
  • Channel 提供異步的網絡 I/O 操作(如建立連接,讀寫,綁定端口),異步調用意味着任何 I/O 調用都將立即返回一個ChannelFuture,並且不保證在調用結束時所請求的 I/O 操作已完(后期通過ChannelFuture的方法查看異步執行結果)
  • 不同協議、不同的阻塞類型的連接都有不同的 Channel 類型與之對應
    NioSocketChannel,異步的客戶端 TCP Socket 連接。
    NioServerSocketChannel,異步的服務器端 TCP Socket 連接。
    NioDatagramChannel,異步的 UDP 連接。

4、Selector

  • Netty 基於 Selector 對象實現 I/O 多路復用,通過 Selector 一個線程可以監聽多個連接的 Channel 事件。
  • 當向一個 Selector 中注冊 Channel 后,Selector 內部的機制就可以自動不斷地查詢(Select)這些注冊的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網絡連接完成等),這樣程序就可以很簡單地使用一個線程高效地管理多個 Channel

5、ChannelHandler 及其實現類

  • ChannelHandler 是一個接口,處理 I/O 事件或攔截 I/O 操作,並將其轉發到其 ChannelPipeline(業務處理鏈)中的下一個處理程序。
  • ChannelHandler 本身並沒有提供很多方法,因為這個接口有許多的方法需要實現,方便使用期間,可以繼承它的子類
  • 當自定義一個handler類處理器時,需要繼承ChannelhandlerAdapter
    image

6、Pipeline 和 ChannelPipeline

  • Pipeline中包含了多個Channel(通道)
  • 一個Channel包含了一個ChannelPipeline,而ChannePipeline中又維護了一個由ChannelHandlerContext組成的雙向鏈表,並且每個channeHandlerContext中又關聯着一個channelHandler
  • ChannelPipeline是保存ChannelHandler的List,用於處理或攔截Channel 的入站事件和出站事件操作
  • 入站事件和出站事件在一個雙向鏈表中,入站事件會從鏈表head往后傳遞到最后一個入站的 handler,出站事件會從鏈表tail往前傳遞到最前t個出站的handler, c兩種類型的handler互不干擾
  • ChannelPipeline實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式
    image

ChannelPipeline addFirst(ChannelHandler... handlers),把一個業務處理類(handler)添加到鏈中的第一個位置
ChannelPipeline addLast(ChannelHandler... handlers),把一個業務處理類(handler)添加到鏈中的最后一個位置

7、ChannelHandlerContext

  • 保存 Channel 相關的所有上下文信息,同時關聯一個 ChannelHandler 對象
  • 即ChannelHandlerContext中包含一個具體的事件處理器ChannelHandler,同時ChannelHandlerContext中也綁定了對應的pipeline 和 Channel 的信息,方便對ChannelHandler進行調用。

ChannelHandlerContext的常用方法:
1、ChannelFuture close(),關閉通道
2、ChannelOutboundInvoker flush(),刷新
3、ChannelFuture writeAndFlush(Object msg),將數據寫到ChannelPipeline 中當前 ChannelHandler 的下一個ChannelHandler 開始處理(出站)

8、ChannelOption

Netty在創建Channel實例后,一般都需要設置 ChannelOption 參數
image

9、EventLoopGroup 和其實現類 NioEventLoopGroup

  • EventLoopGroup 是一組 EventLoop 的抽象,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop 同時工作,每個 EventLoop 維護着一個 Selector 實例。
  • EventLoopGroup 提供 next 接口,可以從組里面按照一定規則獲取其中一個 EventLoop 來處理任務。在 Netty 服務器端編程中,我們一般都需要提供兩個 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
  • BossEventLoop負責接收客戶端的連接並將SocketChannel注冊到 WorkerEventLoopGroup的其中一個workerEventLoop的selector上並進行后續的IO事件處理

10、Unpooled類

  • Netty提供一個專門用來操作緩沖區(即 Netty 的數據容器)的工具類

ByteBuf buffer = Unpooled.buffer(10);
ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8"));

netty的案例-群聊系統

GroupChatServer

GroupChatServer代碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class GroupChatServer {

    private int port; //監聽端口


    public GroupChatServer(int port) {
        this.port = port;
    }

    //編寫run方法,處理客戶端的請求
    public void run() throws  Exception{

        //創建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop

        try {
            ServerBootstrap b = new ServerBootstrap();

            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {

                            //獲取到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //向pipeline加入解碼器
                            pipeline.addLast("decoder", new StringDecoder());
                            //向pipeline加入編碼器
                            pipeline.addLast("encoder", new StringEncoder());
                            //加入自己的業務處理handler
                            pipeline.addLast(new GroupChatServerHandler());

                        }
                    });

            System.out.println("netty 服務器啟動");
            ChannelFuture channelFuture = b.bind(port).sync();

            //監聽關閉
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {

        new GroupChatServer(7000).run();
    }
}

GroupChatServerHandler

GroupChatServerHandler代碼
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    //這樣寫還要自己遍歷Channel
    //public static List<Channel> channels = new ArrayList<Channel>();

    //使用一個hashmap 管理私聊(私聊本案例並未實現,只是提供個思路)
    //public static Map<String, Channel> channels = new HashMap<String,Channel>();

    //定義一個channle 組,管理所有的channel
    //GlobalEventExecutor.INSTANCE) 是全局的事件執行器,是一個單例
    private static ChannelGroup  channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //handlerAdded 表示連接建立,一旦連接,第一個被執行
    //將當前channel 加入到  channelGroup
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //將該客戶加入聊天的信息推送給其它在線的客戶端

        //該方法會將 channelGroup 中所有的channel 遍歷,並發送消息,我們不需要自己遍歷

        channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
        channelGroup.add(channel);

		//私聊如何實現
//         channels.put("userid100",channel);
		

    }

    //斷開連接, 將xx客戶離開信息推送給當前在線的客戶
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

        Channel channel = ctx.channel();
        channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 離開了\n");
        System.out.println("channelGroup size" + channelGroup.size());

    }

    //表示channel 處於活動狀態, 提示 xx上線
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //這個是給服務端看的,客戶端上面已經提示xxx加入群聊了
        System.out.println(ctx.channel().remoteAddress() + " 上線了~");
    }

    //表示channel 處於不活動狀態, 提示 xx離線了
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        System.out.println(ctx.channel().remoteAddress() + " 離線了~");
    }

    //讀取數據,轉發給在線的每一個客戶端
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {

        //獲取到當前channel
        Channel channel = ctx.channel();
        //這時我們遍歷channelGroup, 根據不同的情況,回送不同的消息

        channelGroup.forEach(ch -> {
            if(channel != ch) { //不是當前的channel,轉發消息
                ch.writeAndFlush("[客戶]" + channel.remoteAddress() + " 發送了消息" + msg + "\n");
            }else {//回顯自己發送的消息給自己
                ch.writeAndFlush("[自己]發送了消息" + msg + "\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //關閉通道
        ctx.close();
    }
}

GroupChatClient

GroupChatClient代碼
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.util.Scanner;


public class GroupChatClient {

    //屬性
    private final String host;
    private final int port;

    public GroupChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();

        try {


        Bootstrap bootstrap = new Bootstrap()
                .group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {

                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {

                        //得到pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //加入相關handler
                        pipeline.addLast("decoder", new StringDecoder());
                        pipeline.addLast("encoder", new StringEncoder());
                        //加入自定義的handler
                        pipeline.addLast(new GroupChatClientHandler());
                    }
                });

        ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
        //得到channel
            Channel channel = channelFuture.channel();
            System.out.println("-------" + channel.localAddress()+ "--------");
            //客戶端需要輸入信息,創建一個掃描器
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                //通過channel 發送到服務器端
                channel.writeAndFlush(msg + "\r\n");
            }
        }finally {
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new GroupChatClient("127.0.0.1", 7000).run();
    }
}

GroupChatClientHandler

GroupChatClientHandler代碼
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {

    //從服務器拿到的數據
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}

netty的心跳檢測機制

IdleStateHandler是netty 提供的處理空閑狀態的處理器(放在ChannelPipeline維護的ChannelHandler的雙向鏈表上):

  • long readerIdleTime : 表示多長時間沒有讀, 就會發送一個心跳檢測包檢測是否連接
  • long writerIdleTime : 表示多長時間沒有寫, 就會發送一個心跳檢測包檢測是否連接
  • long allIdleTime : 表示多長時間沒有讀寫, 就會發送一個心跳檢測包檢測是否連接

netty的編解碼器

編解碼器放在ChannelPipeline維護的ChannelHandler的雙向鏈表上
服務端發數據給客戶端:服務端—>出站(編碼)—>Socket通道—>入站(解碼)—>客戶端
客戶端發數據給服務端:客戶端—>出站(編碼)—>Socket通道—>入站(解碼)—>服務端
image

編解碼器(自定義編解碼器時需要繼承下面的其中一個類):

  • ByteToMessageDecoder
  • LineBasedFrameDecoder:這個類在 Netty 內部也有使用,它使用行尾控制字符(\n或者\r\n)作為分隔符來解析數據。
  • DelimiterBasedFrameDecoder:使用自定義的特殊字符作為消息的分隔符。
  • HttpObjectDecoder:一個 HTTP 數據的解碼器
  • LengthFieldBasedFrameDecoder:通過指定長度來標識整包消息,這樣就可以自動的處理黏包和半包消息。

TCP 粘包和拆包及解決方案

TCP粘包和拆包
使用優化方法(Nagle 算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然后進行封包。這樣做雖然提高了效率,但是接收端就難於分辨出完整的數據包了,出現了粘包和拆包的問題,因為面向流的通信是無消息保護邊界的
image

  • 服務端分兩次讀取到了兩個獨立的數據包,分別是 D1 和 D2,沒有粘包和拆包
  • 服務端一次接受到了兩個數據包,D1 和 D2 粘合在一起,稱之為 TCP 粘包
  • 服務端分兩次讀取到了數據包,第一次讀取到了完整的 D1 包和 D2 包的部分內容,第二次讀取到了 D2 包的剩余內容,這稱之為 TCP 拆包
  • 服務端分兩次讀取到了數據包,第一次讀取到了 D1 包的部分內容 D1_1,第二次讀取到了 D1 包的剩余部分內容 D1_2 和完整的 D2 包。

解決方案
使用自定義協議+編解碼器來解決,關鍵就是要解決服務器端每次讀取數據長度的問題,這個問題解決,就不會出現服務器多讀或少讀數據的問題,從而避免的 TCP 粘包、拆包。

RPC(基於netty)

1、RPC(Remote Procedure Call)遠程過程調用,是一個計算機通信協議。該協議允許運行於一台計算機的程序調用另一台計算機的子程序,而程序員無需額外地為這個交互作用編程
2、兩個或多個應用程序都分布在不同的服務器上,它們之間的調用都像是本地方法調用一樣

RPC的調用流程圖
image

  • 服務消費方(client)以本地調用方式調用服務
  • client stub 接收到調用后負責將方法、參數等封裝成能夠進行網絡傳輸的消息體
  • client stub 將消息進行編碼並發送到服務端
  • server stub 收到消息后進行解碼
  • server stub 根據解碼結果調用本地的服務
  • 本地服務執行並將結果返回給 server stub
  • server stub 將返回導入結果進行編碼並發送至消費方
  • client stub 接收到消息並進行解碼
  • 服務消費方(client)得到結果

RPC 的目標就是將 2 - 8 這些步驟都封裝起來,用戶無需關心這些細節,可以像調用本地方法一樣即可完成遠程服務調用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM