百度百科描述
Netty是由JBOSS提供的一個java開源框架,現為 Github上的獨立項目。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
也就是說,Netty 是一個基於NIO的客戶、服務器端的編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty 相當於簡化和流線化了網絡應用的編程開發過程,例如:基於 TCP 和 UDP 的 socket 服務開發。
如上摘錄自百度百科的描述。
Netty 算是目前最為主流的 NIO 框架了,目前我們也在用 NIO。在 Netty 之前還有另外一個 NIO 框架—Mina,Mina 算是早起的作品,Netty 的基礎架構跟Mina非常相似,使用時的思想也差不多,兩者還有一些微妙的關系,類似於log4j 跟 logback,Netty 和 Mina 均出自 Trustin Lee 之手。
關於Mina跟Netty的區別不是本文重點,我們繼續回到Netty上。
需求場景描述
完成對紅酒窖的室內溫度采集及監控功能。由本地應用程序+溫度傳感器定時采集室內溫度上報至服務器,如果溫度 >20 °C 則由服務器下發重啟空調指令,如果本地應用長時間不上傳溫度給服務器,則給戶主手機發送一條預警短信。
需求是瞎編的,但分析還是要分析的,在沒有接觸socker網絡編程之前我們可能會這么做:你本地寫一個定時器,然后將采集到的溫度數據調一下服務器上的某個接口,服務器拿到數據判斷一下,如果過高則返回一個帶有重啟空調的字段,至於本地斷線的情況,在數據庫維護一個時間字段,如果長時間沒有被更新則調用短信發送接口。
這樣分析起來,感覺也沒啥問題,就是覺得怪怪的,也不能說不行,就是本地設備多了對服務器負載是個問題。
咱先不采用這種方式,上邊描述的場景主要是想模擬雙方通信,實現雙全工操作「客戶端發送數據給服務端,服務端下發指令給客戶端」,一提到雙方通信,我們首先先到的就是Socket吧,來吧,簡單回顧一下Socker通信。
服務端
/**
* 服務端
*/
public class Server {
public static void main(String[] args) {
InputStreamReader isr;
BufferedReader br;
OutputStreamWriter osw;
BufferedWriter bw;
String str;
Scanner in = new Scanner(System.in);
try {
/* 在本機的 8899 端口開放Server */
ServerSocket server = new ServerSocket(8899);
/* 只要產生連接,socket便可以代表所連接的那個物體,同時這個server.accept()只有產生了連接才會進行下一步操作。*/
Socket socket = server.accept();
/* 輸出連接者的IP。*/
System.out.println(socket.getInetAddress());
System.out.println("建立了一個連接!");
while (true) {
isr = new InputStreamReader(socket.getInputStream());
br = new BufferedReader(isr);
System.out.println("客戶端回復:" + br.readLine());
osw = new OutputStreamWriter(socket.getOutputStream());
bw = new BufferedWriter(osw);
System.out.print("服務端回復:");
str = in.nextLine();
bw.write(str + "\n");
bw.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
簡單看一下 Server 端流程,首先創建了一個ServerSocket
來監聽 8899 端口,然后調用阻塞方法 accept();
獲取新的連接,當獲取到新的連接之后,然后進入了while循環體,從該連接中讀取數據,讀取數據是以字節流的方式。
客戶端
public class Client {
public static void main(String[] args) {
InputStreamReader isr;
BufferedReader br;
OutputStreamWriter osw;
BufferedWriter bw;
String str;
Scanner in = new Scanner(System.in);
try {
Socket socket = new Socket("127.0.0.1", 8899);
System.out.println("成功連接服務器");
while (true) {
osw = new OutputStreamWriter(socket.getOutputStream());
bw = new BufferedWriter(osw);
System.out.print("客戶端發送:");
str = in.nextLine();
bw.write(str + "\n");
bw.flush();
isr = new InputStreamReader(socket.getInputStream());
br = new BufferedReader(isr);
System.out.println("服務端回復:" + br.readLine());
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶端連接上服務端 8899 端口之后,進入 while 循環體,從連接中讀取數據,如下是效果圖:
上方代碼為了省事直接在主線程操作了,但即便是將代碼移植到子線程中處理,還是存在大量問題,尤其是 while 死循環。
如果將代碼放在子線程完成,那么一個連接需要一個線程來維護,一個線程包含一個死循環,一萬個線程就包含一萬個死循環.... 再就是這些 while 循環並不是每一個都能讀出數據來的,所以就會造成資源浪費,消耗性能。
總之,Socket 就是典型的傳統 IO 模型操作,IO 讀寫是面向流的,一次性只能從流中讀取一個或者多個字節,並且讀完之后流無法再讀取,你需要自己緩存數據。 而 NIO 的讀寫是面向 Buffer 的,你可以隨意讀取里面任何一個字節數據,不需要你自己緩存數據,這一切只需要移動讀寫指針即可,關於 IO 與 NIO 的區別請移步:NIO與IO的區別
Netty實現雙向通信
既然是寫netty,自然是要拿netty來實現上方紅酒窖的案例了,注:簡單的demo,這次咱先實現雙向通信,后續再根據這個系列不斷完善,今天就當入個門了。
開發環境
- IDEA
- maven
- netty版本:4.1.6
首先導入如下 Maven 依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
服務端:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
private static NioEventLoopGroup bossGroup = new NioEventLoopGroup();
private static NioEventLoopGroup workerGroup = new NioEventLoopGroup();
public static void main(String[] args) {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
// 指定Channel
.channel(NioServerSocketChannel.class)
//服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數
.option(ChannelOption.SO_BACKLOG, 1024)
//設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//將小的數據包包裝成更大的幀進行傳送,提高網絡的負載
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new NettyServerHandler());
}
});
serverBootstrap.bind(8070);
}
@PreDestroy
public void destory() throws InterruptedException {
bossGroup.shutdownGracefully().sync();
workerGroup.shutdownGracefully().sync();
}
}
簡單說一下,首先創建了兩個NioEventLoopGroup
對象,我們可以把它看做傳統IO模型中的兩大線程組,bossGroup
主要用來負責創建新連接「監聽端口,接收新連接的線程組」,workerGroup
主要用於讀取數據以及業務邏輯處理「處理每一條連接的數據讀寫的線程組」,再生動一點就是:一個是對外的銷售員,一個是負責單子落地的工人。
然后我們創建了ServerBootstrap
,這個類是用來引導我們進行服務端的啟動工作,接收兩個 NioEventLoopGroup 對象,把干活的兩個安排的明明白白。
通過.channel(NioServerSocketChannel.class)
來指定 IO 模型,NioServerSocketChannel.class
表示指定的是 NIO,可供的 IO模型 選擇無非就 NIO,BIO,BIO肯定是不能選擇的了。
通過.childOption()
可以給每條連接設置一些TCP底層相關的屬性,比如上面,我們設置了兩種TCP屬性,其中
ChannelOption.SO_KEEPALIVE
表示是否開啟TCP底層心跳機制,true為開啟ChannelOption.TCP_NODELAY
表示是否開啟Nagle算法,true表示關閉,false表示開啟,通俗地說,如果要求高實時性,有數據發送時就馬上發送,就關閉,如果需要減少發送次數減少網絡交互,就開啟。
接着,我們調用childHandler()
方法,給這個引導類創建一個ChannelInitializer
,這里主要就是定義后續每條連接的數據讀寫,業務處理邏輯,不理解沒關系,在后面我們會詳細分析。ChannelInitializer
這個類中,我們注意到有一個泛型參數NioSocketChannel
,這個類呢,就是 Netty 對 NIO 類型的連接的抽象,而我們前面NioServerSocketChannel
也是對 NIO 類型的連接的抽象,NioServerSocketChannel
和NioSocketChannel
的概念可以和 BIO 編程模型中的ServerSocket
以及Socket
兩個概念對應上。還沒完,我們需要在ChannelInitializer
中的initChannel()
方法里面給客戶端添加一個邏輯處理器,這個處理器的作用就是負責向服務端寫數據,也就是代碼中的如下部分:
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new NettyServerHandler());
}
我們簡單看一下這段代碼,其中ch.pipeline()
返回的是和這條連接相關的邏輯處理鏈,采用了責任鏈模式,類似於之前文章中提到的Spring Security過濾器鏈一樣,這里不理解沒關系,后面再細說。
然后再調用 addLast()
方法 添加一個邏輯處理器,這個邏輯處理器為的就是在客戶端建立連接成功之后,向服務端寫數據,下面是這個邏輯處理器相關的代碼:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 1. 獲取數據
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(new Date() + ": 服務端讀到數據 -> " + byteBuf.toString(Charset.forName("utf-8")));
System.out.println(new Date() + ": 服務端寫出數據");
// 2. 寫數據
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "我是發送給客戶端的數據:請重啟冰箱!".getBytes(Charset.forName("utf-8"));
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}
繼續如上這段代碼,這個邏輯處理器繼承自 ChannelInboundHandlerAdapter
,然后覆蓋了 channelRead()
方法,這個方法在接收到客戶端發來的數據之后被回調。
這里的 msg
參數指的就是 Netty 里面數據讀寫的載體,然后需要我們強轉一下為ByteBuf
類型,然后調用 byteBuf.toString()
就能夠拿到我們客戶端發過來的字符串數據。
ok,至此服務端創建完了,我們再看客戶端。
客戶端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class NettyClient {
private static String host = "127.0.0.1";
private static int MAX_RETRY = 5;
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
// 1.指定線程模型
.group(workerGroup)
// 2.指定 IO 類型為 NIO
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
// 3.IO 處理邏輯
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, 10, 0))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new NettyClientHandler());
}
});
// 4.建立連接
bootstrap.connect(host, 8070).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連接成功!");
} else {
System.err.println("連接失敗!");
connect(bootstrap, host, 80, MAX_RETRY);
}
});
}
/**
* 用於失敗重連
*/
private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
bootstrap.connect(host, port).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連接成功!");
} else if (retry == 0) {
System.err.println("重試次數已用完,放棄連接!");
} else {
// 第幾次重連
int order = (MAX_RETRY - retry) + 1;
// 本次重連的間隔
int delay = 1 << order;
System.err.println(new Date() + ": 連接失敗,第" + order + "次重連……");
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
.SECONDS);
}
});
}
}
我們可以看到客戶端的引導類不再是ServerBootstrap
了,而是換成了Bootstrap
,這個類負責客戶端以及連接服務端,跟服務端屬性大差不差,channel、option、handle等,然后同樣指定了IO模型,同時還增加了連接監聽 bootstrap.connect(host, 8070).addListener
,其中future.isSuccess()
屬性值表示了連接結果,如果連接失敗則跳入 connect
進行重連,重連嘗試5次之后不再進行嘗試,這塊我們后面文章再細講「其中包含客戶端、服務端的長連接,斷線重試等」,我們先來看看客戶端指定的業務處理類:NettyClientHandler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客戶端寫出數據");
// 1. 獲取數據
ByteBuf buffer = getByteBuf(ctx);
// 2. 寫數據
ctx.channel().writeAndFlush(buffer);
}
/**
* 數據解析
*/
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 獲取二進制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
Random random = new Random();
double value = random.nextDouble() * 14 + 8;
String temp = "獲取室內溫度:" + value;
// 2. 准備數據,指定字符串的字符集為 utf-8
byte[] bytes = temp.getBytes(Charset.forName("utf-8"));
// 3. 填充數據到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(new Date() + ": 客戶端讀到數據 -> " + msg.toString());
}
}
這個邏輯處理器同樣繼承自 ChannelInboundHandlerAdapter
,不同於服務端,客戶端邏輯處理器這次使用到了 channelActive()
方法,這個方法會在客戶端連接建立成功之后被調用,所以我們在這個方法里完成寫數據的操作「讀取室內溫度」。
我們簡單看一下這個channelActive()
方法,首先獲取傳遞的 ByteBuf
對象,這個對象怎么來的呢,我們進入 getByteBuf()
方法,我們可以看到通過調用ctx.alloc()
獲取到一個 ByteBuf
,然后我們把字符串的二進制數據填充進了 ByteBuf
,這樣我們就獲取到了 Netty 需要的一個數據格式,最后我們調用 ctx.channel().writeAndFlush()
把數據寫到服務端,至此整個客戶端的寫操作就完成了。
接下來就是讀數據量,channelRead()
方法,這個方法我們在服務端代碼中已經了解過了就不再闡述了。
測試服務端客戶端代碼
首先運行服務端 main()
方法,然后再運行客戶端 main()
方法,執行效果如下:
客戶端
服務端
至此,通過這個小demo,客戶端與服務端可以完成雙向通信了。還不急着技術文章,但畢竟是局域網嗎,如果服務端的代碼部署在外網服務端效果會怎樣呢?
測試服務端在外網服務器
我們把服務端代碼部署在外網環境中試一下看看效果會怎樣。
首先我們修改一下客戶端的host地址為外網ip地址,然后本地起一下客戶端試試:
我們可以看到返回結果沒問題,那說明服務端也是沒問題的:
至此,外網服務端與局域網客戶端的雙向通信時沒問題了,測試具體細節就不展示了,后面章節我會一步一步將代碼遷移到SpringBoot Web項目中的,但是眼下這代碼還是有點問題,我們先在本地繼續完善一下。
最新紅酒窖進度請查看:https://www.cnblogs.com/niceyoo/category/1802223.html
我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下:niceyoo