Netty入門篇-從雙向通信開始


百度百科描述

Netty是由JBOSS提供的一個java開源框架,現為 Github上的獨立項目。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。

也就是說,Netty 是一個基於NIO的客戶、服務器端的編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 相當於簡化和流線化了網絡應用的編程開發過程,例如:基於 TCP 和 UDP 的 socket 服務開發。

如上摘錄自百度百科的描述。


Netty 算是目前最為主流的 NIO 框架了,目前我們也在用 NIO。在 Netty 之前還有另外一個 NIO 框架—Mina,Mina 算是早起的作品,Netty 的基礎架構跟Mina非常相似,使用時的思想也差不多,兩者還有一些微妙的關系,類似於log4j 跟 logback,Netty 和 Mina 均出自 Trustin Lee 之手。

關於Mina跟Netty的區別不是本文重點,我們繼續回到Netty上。

需求場景描述

完成對紅酒窖的室內溫度采集及監控功能。由本地應用程序+溫度傳感器定時采集室內溫度上報至服務器,如果溫度 >20 °C 則由服務器下發重啟空調指令,如果本地應用長時間不上傳溫度給服務器,則給戶主手機發送一條預警短信。

需求是瞎編的,但分析還是要分析的,在沒有接觸socker網絡編程之前我們可能會這么做:你本地寫一個定時器,然后將采集到的溫度數據調一下服務器上的某個接口,服務器拿到數據判斷一下,如果過高則返回一個帶有重啟空調的字段,至於本地斷線的情況,在數據庫維護一個時間字段,如果長時間沒有被更新則調用短信發送接口。

這樣分析起來,感覺也沒啥問題,就是覺得怪怪的,也不能說不行,就是本地設備多了對服務器負載是個問題。

咱先不采用這種方式,上邊描述的場景主要是想模擬雙方通信,實現雙全工操作「客戶端發送數據給服務端,服務端下發指令給客戶端」,一提到雙方通信,我們首先先到的就是Socket吧,來吧,簡單回顧一下Socker通信。

服務端

/**
 * 服務端
 */
public class Server {
    public static void main(String[] args) {
        InputStreamReader isr;
        BufferedReader br;
        OutputStreamWriter osw;
        BufferedWriter bw;
        String str;
        Scanner in = new Scanner(System.in);
        try {
            /* 在本機的 8899 端口開放Server */
            ServerSocket server = new ServerSocket(8899);
            /* 只要產生連接,socket便可以代表所連接的那個物體,同時這個server.accept()只有產生了連接才會進行下一步操作。*/
            Socket socket = server.accept();
            /* 輸出連接者的IP。*/
            System.out.println(socket.getInetAddress());
            System.out.println("建立了一個連接!");
            while (true) {
                isr = new InputStreamReader(socket.getInputStream());
                br = new BufferedReader(isr);
                System.out.println("客戶端回復:" + br.readLine());
                osw = new OutputStreamWriter(socket.getOutputStream());
                bw = new BufferedWriter(osw);
                System.out.print("服務端回復:");
                str = in.nextLine();
                bw.write(str + "\n");
                bw.flush();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

簡單看一下 Server 端流程,首先創建了一個ServerSocket來監聽 8899 端口,然后調用阻塞方法 accept();獲取新的連接,當獲取到新的連接之后,然后進入了while循環體,從該連接中讀取數據,讀取數據是以字節流的方式。

客戶端

public class Client {
    public static void main(String[] args) {
        InputStreamReader isr;
        BufferedReader br;
        OutputStreamWriter osw;
        BufferedWriter bw;
        String str;
        Scanner in = new Scanner(System.in);
        try {
            Socket socket = new Socket("127.0.0.1", 8899);
            System.out.println("成功連接服務器");
            while (true) {
                osw = new OutputStreamWriter(socket.getOutputStream());
                bw = new BufferedWriter(osw);
                System.out.print("客戶端發送:");
                str = in.nextLine();
                bw.write(str + "\n");
                bw.flush();
                isr = new InputStreamReader(socket.getInputStream());
                br = new BufferedReader(isr);
                System.out.println("服務端回復:" + br.readLine());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客戶端連接上服務端 8899 端口之后,進入 while 循環體,從連接中讀取數據,如下是效果圖:

上方代碼為了省事直接在主線程操作了,但即便是將代碼移植到子線程中處理,還是存在大量問題,尤其是 while 死循環。

如果將代碼放在子線程完成,那么一個連接需要一個線程來維護,一個線程包含一個死循環,一萬個線程就包含一萬個死循環.... 再就是這些 while 循環並不是每一個都能讀出數據來的,所以就會造成資源浪費,消耗性能。

總之,Socket 就是典型的傳統 IO 模型操作,IO 讀寫是面向流的,一次性只能從流中讀取一個或者多個字節,並且讀完之后流無法再讀取,你需要自己緩存數據。 而 NIO 的讀寫是面向 Buffer 的,你可以隨意讀取里面任何一個字節數據,不需要你自己緩存數據,這一切只需要移動讀寫指針即可,關於 IO 與 NIO 的區別請移步:NIO與IO的區別

Netty實現雙向通信

既然是寫netty,自然是要拿netty來實現上方紅酒窖的案例了,注:簡單的demo,這次咱先實現雙向通信,后續再根據這個系列不斷完善,今天就當入個門了。

開發環境

  • IDEA
  • maven
  • netty版本:4.1.6

首先導入如下 Maven 依賴:

<dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-all</artifactId>
 <version>4.1.6.Final</version>
</dependency>

服務端:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {

    private static NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    private static NioEventLoopGroup workerGroup = new NioEventLoopGroup();

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap
                .group(bossGroup, workerGroup)

                // 指定Channel
                .channel(NioServerSocketChannel.class)

                //服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數
                .option(ChannelOption.SO_BACKLOG, 1024)

                //設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文
                .childOption(ChannelOption.SO_KEEPALIVE, true)

                //將小的數據包包裝成更大的幀進行傳送,提高網絡的負載
                .childOption(ChannelOption.TCP_NODELAY, true)

                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) {
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });

        serverBootstrap.bind(8070);
    }

    @PreDestroy
    public void destory() throws InterruptedException {
        bossGroup.shutdownGracefully().sync();
        workerGroup.shutdownGracefully().sync();
    }

}

簡單說一下,首先創建了兩個NioEventLoopGroup對象,我們可以把它看做傳統IO模型中的兩大線程組,bossGroup主要用來負責創建新連接「監聽端口,接收新連接的線程組」,workerGroup主要用於讀取數據以及業務邏輯處理「處理每一條連接的數據讀寫的線程組」,再生動一點就是:一個是對外的銷售員,一個是負責單子落地的工人。

然后我們創建了ServerBootstrap,這個類是用來引導我們進行服務端的啟動工作,接收兩個 NioEventLoopGroup 對象,把干活的兩個安排的明明白白。

通過.channel(NioServerSocketChannel.class)來指定 IO 模型,NioServerSocketChannel.class 表示指定的是 NIO,可供的 IO模型 選擇無非就 NIO,BIO,BIO肯定是不能選擇的了。

通過.childOption()可以給每條連接設置一些TCP底層相關的屬性,比如上面,我們設置了兩種TCP屬性,其中

  • ChannelOption.SO_KEEPALIVE表示是否開啟TCP底層心跳機制,true為開啟
  • ChannelOption.TCP_NODELAY表示是否開啟Nagle算法,true表示關閉,false表示開啟,通俗地說,如果要求高實時性,有數據發送時就馬上發送,就關閉,如果需要減少發送次數減少網絡交互,就開啟。

接着,我們調用childHandler()方法,給這個引導類創建一個ChannelInitializer,這里主要就是定義后續每條連接的數據讀寫,業務處理邏輯,不理解沒關系,在后面我們會詳細分析。ChannelInitializer這個類中,我們注意到有一個泛型參數NioSocketChannel,這個類呢,就是 Netty 對 NIO 類型的連接的抽象,而我們前面NioServerSocketChannel也是對 NIO 類型的連接的抽象,NioServerSocketChannelNioSocketChannel的概念可以和 BIO 編程模型中的ServerSocket以及Socket兩個概念對應上。還沒完,我們需要在ChannelInitializer 中的initChannel() 方法里面給客戶端添加一個邏輯處理器,這個處理器的作用就是負責向服務端寫數據,也就是代碼中的如下部分:

@Override
protected void initChannel(NioSocketChannel ch) {
 ch.pipeline().addLast(new NettyServerHandler());
}

我們簡單看一下這段代碼,其中ch.pipeline() 返回的是和這條連接相關的邏輯處理鏈,采用了責任鏈模式,類似於之前文章中提到的Spring Security過濾器鏈一樣,這里不理解沒關系,后面再細說。

然后再調用 addLast() 方法 添加一個邏輯處理器,這個邏輯處理器為的就是在客戶端建立連接成功之后,向服務端寫數據,下面是這個邏輯處理器相關的代碼:

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
      // 1. 獲取數據
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(Charset.forName("utf-8")));
        System.out.println(new Date() + ": 服務端寫出數據");
        // 2. 寫數據
        ByteBuf out = getByteBuf(ctx);
        ctx.channel().writeAndFlush(out);
    }

    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        byte[] bytes = "我是發送給客戶端的數據:請重啟冰箱!".getBytes(Charset.forName("utf-8"));
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(bytes);
        return buffer;
    }

}

繼續如上這段代碼,這個邏輯處理器繼承自 ChannelInboundHandlerAdapter,然后覆蓋了 channelRead()方法,這個方法在接收到客戶端發來的數據之后被回調。

這里的 msg 參數指的就是 Netty 里面數據讀寫的載體,然后需要我們強轉一下為ByteBuf類型,然后調用 byteBuf.toString() 就能夠拿到我們客戶端發過來的字符串數據。

ok,至此服務端創建完了,我們再看客戶端。

客戶端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class NettyClient {

    private static String host = "127.0.0.1";
    private static int MAX_RETRY = 5;

    public static void main(String[] args) {
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap
                // 1.指定線程模型
                .group(workerGroup)
                // 2.指定 IO 類型為 NIO
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                // 3.IO 處理邏輯
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch) {
                        ch.pipeline()
                            .addLast(new IdleStateHandler(0, 10, 0))
                            .addLast(new StringDecoder())
                            .addLast(new StringEncoder())
                            .addLast(new NettyClientHandler());
                    }
                });
        // 4.建立連接
        bootstrap.connect(host, 8070).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("連接成功!");
            } else {
                System.err.println("連接失敗!");
                connect(bootstrap, host, 80, MAX_RETRY);
            }

        });
    }

  /**
  * 用於失敗重連
  */
    private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
        bootstrap.connect(host, port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println("連接成功!");
            } else if (retry == 0) {
                System.err.println("重試次數已用完,放棄連接!");
            } else {
                // 第幾次重連
                int order = (MAX_RETRY - retry) + 1;
                // 本次重連的間隔
                int delay = 1 << order;
                System.err.println(new Date() + ": 連接失敗,第" + order + "次重連……");
                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
                        .SECONDS);
            }
        });
    }

}

我們可以看到客戶端的引導類不再是ServerBootstrap了,而是換成了Bootstrap,這個類負責客戶端以及連接服務端,跟服務端屬性大差不差,channel、option、handle等,然后同樣指定了IO模型,同時還增加了連接監聽 bootstrap.connect(host, 8070).addListener,其中future.isSuccess()屬性值表示了連接結果,如果連接失敗則跳入 connect 進行重連,重連嘗試5次之后不再進行嘗試,這塊我們后面文章再細講「其中包含客戶端、服務端的長連接,斷線重試等」,我們先來看看客戶端指定的業務處理類:NettyClientHandler

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println(new Date() + ": 客戶端寫出數據");

        // 1. 獲取數據
        ByteBuf buffer = getByteBuf(ctx);

        // 2. 寫數據
        ctx.channel().writeAndFlush(buffer);
    }

  /**
  * 數據解析
  */
    private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
        // 1. 獲取二進制抽象 ByteBuf
        ByteBuf buffer = ctx.alloc().buffer();
        Random random = new Random();
        double value = random.nextDouble() * 14 + 8;
        String temp = "獲取室內溫度:" + value;

        // 2. 准備數據,指定字符串的字符集為 utf-8
        byte[] bytes = temp.getBytes(Charset.forName("utf-8"));

        // 3. 填充數據到 ByteBuf
        buffer.writeBytes(bytes);

        return buffer;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(new Date() + ": 客戶端讀到數據 -> " + msg.toString());
    }

}

這個邏輯處理器同樣繼承自 ChannelInboundHandlerAdapter,不同於服務端,客戶端邏輯處理器這次使用到了 channelActive()方法,這個方法會在客戶端連接建立成功之后被調用,所以我們在這個方法里完成寫數據的操作「讀取室內溫度」。

我們簡單看一下這個channelActive()方法,首先獲取傳遞的 ByteBuf 對象,這個對象怎么來的呢,我們進入 getByteBuf() 方法,我們可以看到通過調用ctx.alloc() 獲取到一個 ByteBuf ,然后我們把字符串的二進制數據填充進了 ByteBuf,這樣我們就獲取到了 Netty 需要的一個數據格式,最后我們調用 ctx.channel().writeAndFlush() 把數據寫到服務端,至此整個客戶端的寫操作就完成了。

接下來就是讀數據量,channelRead() 方法,這個方法我們在服務端代碼中已經了解過了就不再闡述了。


測試服務端客戶端代碼

首先運行服務端 main() 方法,然后再運行客戶端 main() 方法,執行效果如下:

客戶端

服務端

至此,通過這個小demo,客戶端與服務端可以完成雙向通信了。還不急着技術文章,但畢竟是局域網嗎,如果服務端的代碼部署在外網服務端效果會怎樣呢?

測試服務端在外網服務器

我們把服務端代碼部署在外網環境中試一下看看效果會怎樣。

首先我們修改一下客戶端的host地址為外網ip地址,然后本地起一下客戶端試試:

我們可以看到返回結果沒問題,那說明服務端也是沒問題的:

至此,外網服務端與局域網客戶端的雙向通信時沒問題了,測試具體細節就不展示了,后面章節我會一步一步將代碼遷移到SpringBoot Web項目中的,但是眼下這代碼還是有點問題,我們先在本地繼續完善一下。

最新紅酒窖進度請查看:https://www.cnblogs.com/niceyoo/category/1802223.html

我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下:niceyoo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM