前文需求回顧
完成對紅酒窖的室內溫度采集及監控功能。由本地應用程序+溫度傳感器定時采集室內溫度上報至服務器,如果溫度 >20 °C 則由服務器下發重啟空調指令,如果本地應用長時間不上傳溫度給服務器,則給戶主手機發送一條預警短信。
Netty入門篇-從雙向通信開始「上文」
上篇算是完成簡單的雙向通信了,我們接着看看 “如果本地應用長時間不上傳溫度給服務器...”,很明顯客戶端有可能掛了嘛,所以怎么實現客戶端與服務端的長連接就是本文要實現的了。
什么是心跳機制
百度百科:心跳機制是定時發送一個自定義的結構體(心跳包),讓對方知道自己還活着,以確保連接的有效性的機制。
簡單說,這個心跳機制是由客戶端主動發起的消息,每隔一段時間就向服務端發送消息,告訴服務端自己還沒死,可不要給戶主發送預警短信啊。
如何實現心跳機制
1、客戶端代碼修改
我們需要改造一下上節中客戶端的代碼,首先是在責任鏈中增加一個心跳邏輯處理類HeartbeatHandler
public class NettyClient {
private static String host = "127.0.0.1";
public static void main(String[] args) {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap
// 1.指定線程模型
.group(workerGroup)
// 2.指定 IO 類型為 NIO
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
// 3.IO 處理邏輯
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, 10, 0))
.addLast(new StringDecoder())
.addLast(new StringEncoder())
.addLast(new HeartbeatHandler())
.addLast(new NettyClientHandler());
}
});
// 4.建立連接
bootstrap.connect(host, 8070).addListener(future -> {
if (future.isSuccess()) {
System.out.println("連接成功!");
} else {
System.err.println("連接失敗!");
}
});
}
}
沒什么變化,主要是增加了HeartbeatHandler,我們來看看這個類:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.nio.charset.Charset;
import java.time.LocalTime;
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
System.out.println("10秒了,需要發送消息給服務端了" + LocalTime.now());
//向服務端送心跳包
ByteBuf buffer = getByteBuf(ctx);
//發送心跳消息,並在發送失敗時關閉該連接
ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("捕獲的異常:" + cause.getMessage());
ctx.channel().close();
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 獲取二進制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
String time = "heartbeat:客戶端心跳數據:" + LocalTime.now();
// 2. 准備數據,指定字符串的字符集為 utf-8
byte[] bytes = time.getBytes(Charset.forName("utf-8"));
// 3. 填充數據到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
}
還是繼承自ChannelInboundHandlerAdapter,不過這次重寫的是userEventTriggered()方法,這個方法在客戶端的所有ChannelHandler中,如果10s內沒有發生write事件時觸發,所以我們在該方法中給服務端發送心跳消息。
業務邏輯處理類NettyClientHandler沒有改動,代碼如下:
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Random;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客戶端寫出數據");
// 1. 獲取數據
ByteBuf buffer = getByteBuf(ctx);
// 2. 寫數據
ctx.channel().writeAndFlush(buffer);
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
// 1. 獲取二進制抽象 ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
Random random = new Random();
double value = random.nextDouble() * 14 + 8;
String temp = "獲取室內溫度:" + value;
// 2. 准備數據,指定字符串的字符集為 utf-8
byte[] bytes = temp.getBytes(Charset.forName("utf-8"));
// 3. 填充數據到 ByteBuf
buffer.writeBytes(bytes);
return buffer;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println(new Date() + ": 客戶端讀到數據 -> " + msg.toString());
}
}
對如上代碼不了解的可以回看上一節:Netty入門篇-從雙向通信開始
2、服務端代碼修改
服務端代碼主要是開啟TCP底層心跳機制支持,.childOption(ChannelOption.SO_KEEPALIVE, true) ,其他的代碼並沒有改動:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
public static void main(String[] args) {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
// 指定Channel
.channel(NioServerSocketChannel.class)
//服務端可連接隊列數,對應TCP/IP協議listen函數中backlog參數
.option(ChannelOption.SO_BACKLOG, 1024)
//設置TCP長連接,一般如果兩個小時內沒有數據的通信時,TCP會自動發送一個活動探測數據報文
.childOption(ChannelOption.SO_KEEPALIVE, true)
//將小的數據包包裝成更大的幀進行傳送,提高網絡的負載
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new NettyServerHandler());
}
});
serverBootstrap.bind(8070);
}
}
我們再來看看服務端的業務處理類 NettyServerHandler
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.nio.charset.Charset;
import java.util.Date;
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
String message = byteBuf.toString(Charset.forName("utf-8"));
System.out.println(new Date() + ": 服務端讀到數據 -> " + message);
/** 心跳數據是不發送數據 **/
if(!message.contains("heartbeat")){
ByteBuf out = getByteBuf(ctx);
ctx.channel().writeAndFlush(out);
}
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "我是發送給客戶端的數據:請重啟冰箱!".getBytes(Charset.forName("utf-8"));
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}
對channelRead() 方法增加了一個 if 判斷,判斷如果包含heartbeat字符串就認為這是客戶端發過來的心跳,這種判斷是非常low的,因為到目前為止我們一直是用簡單字符串來傳遞數據的,上邊傳遞的數據就直接操作字符串;那么問題來了,如果我們想傳遞對象怎么搞呢?下節寫。我們先來看一下如上代碼客戶端與服務端運行截圖:
服務端

客戶端

至此,整個心跳機制就完成了,這樣每隔10秒客戶端就會給服務端發送一個心跳消息,下節我們通過了解通協議以完善心跳機制的代碼。
我創建了一個java相關的公眾號,用來記錄自己的學習之路,感興趣的小伙伴可以關注一下:小偉后端筆記

