前言:
公司的加密機調度系統一直使用的是http請求調度的方式去調度,但是會出現網絡故障導致某個客戶端或者服務端斷線的情況,導致很多請求信息以及回執信息丟失的情況,接着我們拋棄了http的方式,改為Tcp的方式去建立客戶端和服務器之間的連接,並且要去實現斷線重連的功能,經過討論后決定使用java中成熟的nio框架 – netty去解決這一系列的問題。
1. netty簡單介紹:
在百度中對netty的解釋是:
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
Netty框架並不只是封裝了多路復用的IO模型,也包括提供了傳統的阻塞式/非阻塞式 同步IO的模型封,Netty 是一個利用 Java 的高級網絡的能力,隱藏其背后的復雜性而提供一個易於使用的 API 的客戶端/服務器框架。其並發高、傳輸快、封裝好的特性受到了許多大公司的青睞,在這里我們就不過多的分析netty的原理和特性了,之后我會寫一篇文章專門寫一下從io到nio,再到netty的整個過程。重點講一下netty的魅力所在,今天我們已代碼實現為主,講解一下在springboot架構中,用netty實現服務端和客戶端之間的通信以及斷線重連等機制。
2. 服務端代碼:
首先,引入netty的pom依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
然后我們在配置文件中寫入服務端的ip和端口號,用於連接

在springboot的application啟動類中寫入服務端的啟動start方法,用於在啟動項目時自動啟動服務端
1 @SpringBootApplication 2 public class Application implements CommandLineRunner { 3 4 @Value("${netty.server.port}") 5 private int port; 6 7 @Value("${netty.server.host}") 8 private String host; 9 10 @Autowired 11 NettyServer server; 12 13 public static void main(String[] args) { 14 SpringApplication.run(Application.class, args); 15 } 16 17 18 @Override 19 public void run(String... strings) throws Exception { 20 this.startServer(); 21 22 } 23 24 //啟動service 25 public void startServer(){
//這個類實現一個IP套接字地址(IP地址+端口號) 26 InetSocketAddress address = new InetSocketAddress(host, port); 27 ChannelFuture future = server.start(address); 28 29 Runtime.getRuntime().addShutdownHook(new Thread(){ 30 @Override 31 public void run() { 32 server.destroy(); 33 } 34 }); 35 36 future.channel().closeFuture().syncUninterruptibly(); 37 } 38 } 39 40 41 }
ChannelFuture:
Future最早出現於JDK的java.util.concurrent.Future,它用於表示異步操作的結果.由於Netty的Future都是與異步I/O操作相關的,因此命名為ChannelFuture,代表它與Channel操作相關.由於Netty中的所有I / O操作都是異步的,因此Netty為了解決調用者如何獲取異步操作結果的問題而專門設計了ChannelFuture接口.
因此,Channel與ChannelFuture可以說形影不離的.
然后我們要去重點看server.start()
public class NettyServer { private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class); private final ChannelGroup channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); private final EventLoopGroup bossGroup = new NioEventLoopGroup(); private final EventLoopGroup workGroup = new NioEventLoopGroup(); private Channel channel; /** * 開啟及服務線程 */ public ChannelFuture start(InetSocketAddress address) { //服務端引導類 ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workGroup)//通過ServerBootstrap的group方法,設置(1)中初始化的主從"線程池" .channel(NioServerSocketChannel.class)//指定通道channel的類型,由於是服務端,故而是NioServerSocketChannel .childHandler(new NettyServerInitializer())//設置ServerSocketChannel的處理器 .option(ChannelOption.SO_BACKLOG, 100)// 設置tcp協議的請求等待隊列 .childOption(ChannelOption.SO_KEEPALIVE, true);//配置子通道也就是SocketChannel的選項 ChannelFuture future = bootstrap.bind(address).syncUninterruptibly(); logger.info("准備接收——————"); channel = future.channel(); return future; } public void destroy() { if(channel != null) { channel.close(); } channelGroup.close(); workGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } }
在這里的設置中,.childHandler(new NettyServerInitializer()) 用於設置了服務器管道 NioServerSocketChannel 的處理器handler,
這個handler是我們自定義封裝的一些對channel的public class NettyServerInitializer extends ChannelInitializer<Channel>{
@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//處理日志
//pipeline.addLast(new LoggingHandler(LogLevel.INFO));
//處理心跳
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
//消息編碼
pipeline.addLast(new MessageEncoder());
//粘包長度控制
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4));
//消息解碼
pipeline.addLast(new MessageDecoder());
//自定義hander
pipeline.addLast(new TcpMsgHandler());
}
}
ChannelPipeline :
Netty 的 Channel 過濾器實現原理與 Servlet Filter 機制一致,它將 Channel 的數據管道抽象為 ChannelPipeline,消息在 ChannelPipeline 中流動和傳遞。ChannelPipeline 持有 I/O 事件攔截器 ChannelHandler 的鏈表,由 ChannelHandler 來對 I/O 事件進行具體的攔截和處理,可以方便地通過新增和刪除 ChannelHandler 來實現不同業務邏輯的定制,能夠實現對修改封閉和對擴展到支持。
我們看到我們添加了idleStateHandler用來處理心跳,那么心跳究竟是什么呢,我們先來介紹一下心跳
心跳機制
- 心跳是在TCP長連接中,客戶端和服務端定時向對方發送數據包通知對方自己還在線,保證連接的有效性的一種機制
- 在服務器和客戶端之間一定時間內沒有數據交互時, 即處於 idle 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文后, 也立即發送一個特殊的數據報文, 回應發送方, 此即一個 PING-PONG 交互. 自然地, 當某一端收到心跳消息后, 就知道了對方仍然在線, 這就確保 TCP 連接的有效性
在我們的服務端中,不會主動發心跳給客戶端,只會對對應的心跳消息,進行回應,告訴那些給我發心跳的客戶端說:我還活着!
-
服務端添加IdleStateHandler心跳檢測處理器,並添加自定義處理Handler類實現userEventTriggered()方法作為超時事件的邏輯處理;
-
設定IdleStateHandler心跳檢測每五秒進行一次讀檢測,如果五秒內ChannelRead()方法未被調用則觸發一次userEventTrigger()方法
TcpMsgHandler.java
@Component
public class TcpMsgHandler extends ChannelInboundHandlerAdapter {
private final static Logger logger = LoggerFactory.getLogger("");
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) { }
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
ctx.close();
}
} else {
super.userEventTriggered(ctx, evt);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {
TcpMsg msg = (TcpMsg) obj;
try {
//處理心跳
...
ctx.writeAndFlush(msg);
}
}catch(Exception ex){
logger.info(ex.getMessage());
}
}
}
在這里,我們的channelRead比較簡單,只是將客戶端發來的心跳直接發回去了,實現了響應客戶端心跳請求的目的,除了心跳,我們還可以去定義不同的消息類別,比如說是加密請求,還是處理數據的請求,入庫的請求等等,
我們可以自己從channel中獲取到客戶端發過來的信息做處理,記得要即使響應,比如,心跳中,我們將msg又返回給了channel:
ctx.writeAndFlush(msg);
在handler中,decoder用於解碼的作用,將客戶端發來的ByteBuf流的形式,轉為我們需要的格式,可以轉為我們要的對象,或者是一個string字符串
MessageDecoder.java
public class MessageDecoder extends ByteToMessageDecoder {
private Logger logger = LoggerFactory.getLogger("");
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int len = in.readableBytes();
byte[] bytes = new byte[len];
//將ByteBuf轉為byte數組
in.readBytes(bytes);
try {
TcpMsg msg = TcpMsg.ByteToObj(bytes);
out.add(msg);
} catch (Exception ex) {
logger.error("MessageDecoder",ex);
}
}
}
encoder負責在我們發送數據的時候,將我們的對象、或者是字符串轉為byte數組,然后輸出
public class MessageEncoder extends MessageToByteEncoder<TcpMsg>{
private Logger logger = LoggerFactory.getLogger("");
@Override
protected void encode(ChannelHandlerContext ctx, TcpMsg msg, ByteBuf out) throws Exception {
try{
if (msg.getType() != 0){
//logger.info("send: " + msg.getType() + ":" + msg.getGuid() + ":" + msg.getBody());
}
byte[] src = msg.ToBytes();
out.writeBytes(src);
}catch (Exception e){
logger.error("MessageEncoder",e);
}
}
}
3. 客戶端代碼:
在application配置文件中加入服務端的主機名和端口號
netty.server.host = 127.0.0.1 netty.server.port = 9090
啟動類Application:
@SpringBootApplication
public class Application{
@Autowired
private NettyClient client;
@Value("${netty.server.port}")
private int port;
@Value("${netty.server.host}")
private String host;
public static void main(String[] args) throws Exception {
SpringApplication.run(NettyClientApplication.class, args);
}
@Bean
public NettyClient nettyClient() {
return new NettyClient();
}
@Override
public void run(String... arg0) throws Exception {
client.start(host, port);
}
}
NettyClient.java: 客戶端啟動類
@Component
public class NettyClient {
//日志輸出
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
//主要連接地址
private static String nettyHost = "";
//備用連接地址
private static String nettyHostRe = "";
private static Integer nettyPort = 0;
public boolean start(String host1,String host2,int port){
//主要連接地址
nettyHost = host1;
//備用連接地址
nettyHostRe = host2;
nettyPort = port;
//EventLoopGroup可以理解為是一個線程池,這個線程池用來處理連接、接受數據、發送數據
EventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
//NioEventLoop
//客戶端引導類
Bootstrap bootstrap = new Bootstrap();
//多線程處理
bootstrap.group(nioEventLoopGroup);
//指定通道類型為NioServerSocketChannel,一種異步模式
bootstrap.channel(NioSocketChannel.class);
//指定請求地址
bootstrap.remoteAddress(new InetSocketAddress(nettyHost,port));
bootstrap.option(ChannelOption.TCP_NODELAY,true);
final ConnectionWatchdog watchDog = new ConnectionWatchdog(bootstrap, new HashedWheelTimer(), nettyHost,nettyHostRe, port) {
@Override
public ChannelHandler[] handlers() {
return new ChannelHandler[]{
new MessageEncoder(),
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4),
new MessageDecoder(),
this,
// 每隔5s的時間觸發一次userEventTriggered的方法,並且指定IdleState的狀態位是WRITER_IDLE
new IdleStateHandler(0, 1, 0, TimeUnit.SECONDS),
// 實現userEventTriggered方法,並在state是WRITER_IDLE的時候發送一個心跳包到sever端,告訴server端我還活着
new ClientHeartBeatHandler(),
};
}
};
final ChannelFuture future;
try {
synchronized (bootstrap) {
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(watchDog.handlers());
}
});
future = bootstrap.connect().sync();// 鏈接服務器.調用sync()方法會同步阻塞
//服務端連接ip:
logger.info("目前服務端連接ip為" + nettyHost);
}
if (!future.isSuccess()) {
logger.info("---- 連接服務器失敗,2秒后重試 ---------port=" + port);
future.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
start(nettyHost,nettyHostRe,nettyPort);
}
}, 2L, TimeUnit.SECONDS);
}
} catch (Exception e) {
logger.info("exception happends e {}", e);
return false;
}
return true;
}
}
ConnectionWatchdog.java :重連檢測狗,當發現當前的鏈路不穩定關閉之后,進行重連
@ChannelHandler.Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask,ChannelHandlerHolder{
//日志輸出
private static final Logger logger = LoggerFactory.getLogger(ConnectionWatchdog.class);
//客戶端引導類
private Bootstrap bootstrap;
private Timer timer;
private final String host;
//備用服務端ip
private final String host2;
//使用ip
private String useHost;
private final int port;
private volatile boolean reconnect = true;
private int attempts;
//刷新時間
private volatile long refreshTime = 0L;
//心跳連接標識
private volatile boolean heartBeatCheck = false;
//通道
private volatile Channel channel;
//失敗次數
private static int failCount;
public ConnectionWatchdog(Bootstrap boot, Timer timer, String host,String host2, int port) {
this.bootstrap = boot;
this.timer = timer;
this.host = host;
this.host2 = host2;
this.port = port;
}
public boolean isReconnect() {
return reconnect;
}
public void setReconnect(boolean reconnect) {
this.reconnect = reconnect;
}
//連接成功時調用的方法
@Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
channel = ctx.channel();
attempts = 0;
reconnect =false;
refreshTime = new Date().getTime();
if (!heartBeatCheck) {
heartBeatCheck = true;
channel.eventLoop().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
long time = new Date().getTime() - refreshTime;
logger.info(String.valueOf(time));
if (time > 5 * 1000L) {
channel.close();
logger.info("心跳檢查失敗");
} else {
logger.info("心跳檢查Successs");
}
}
}, 5L, 5L, TimeUnit.SECONDS);
}
logger.info("Connects with {}.", channel);
ctx.fireChannelActive();
}
/**
* 因為鏈路斷掉之后,會觸發channelInActive方法,進行重連 2秒重連一次
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
reconnect = true;
logger.warn("Disconnects with {}, doReconnect = {},attempts == {}", ctx.channel(), reconnect, attempts);
if (reconnect) {
/*if (attempts < 12) {
attempts++;
} else {
reconnect = false;
}*/
long timeout = 2;
logger.info("再過 {} 秒客戶端將進行重連",timeout);
timer.newTimeout(this, timeout, TimeUnit.SECONDS);
}
}
/*
* run啟動方法
* */
public void run(Timeout timeout) throws Exception {
//Future表示異步操作的結果
final ChannelFuture future;
if(failCount > 2){
//使用備用ip
if(host.equals(useHost)){
useHost = host2;
}else{
useHost = host;
}
}else {
if(StrUtil.IsNullOrEmpty(useHost)) {
//首次重連
useHost = host;
}
}
synchronized (bootstrap) {
future = bootstrap.connect(useHost, port);
}
//使用future監聽結果,執行異步操作結束后的回調.
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(final ChannelFuture f) throws Exception {
boolean succeed = f.isSuccess();
logger.warn("連接通過 {}, {}.", useHost + ":" + port, succeed ? "成功" : "失敗");
if (!succeed) {
logger.info("重連失敗");
failCount ++;
f.channel().pipeline().fireChannelInactive();
}else{
failCount = 0;
logger.info("重連成功");
}
}
});
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof TcpMsg) {
TcpMsg heartMsg = (TcpMsg) msg;
if (heartMsg.getType()>=0) {
refreshTime = new Date().getTime();
}
logger.warn("得到服務器響應,響應內容為"+ ((TcpMsg) msg).getBody());
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Channel channel = ctx.channel();
logger.info("客戶端:"+channel.remoteAddress()+"網絡異常");
cause.printStackTrace();
if(channel.isActive())ctx.close();
}
}
這里我們定義了一個變量: refreshTime,當我們從channel中read到了服務端發來的心跳響應消息的話,就刷新refreshTime為當前時間
當連接成功時,會觸發channelActive 方法,在這里我們開啟了一個定時任務去判斷refreshTime和當前時間的時間差,超過5秒說明斷線了,要進行重連,我這里由於配置了兩個服務器,所有在我的邏輯中,嘗試連接2次以上連不上就去連另一個服務器去了
下面的handler用於發送心跳消息,實現userEventTriggered方法,並在state是WRITER_IDLE的時候發送一個心跳包到sever端,告訴server端我還活着
@Component
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(ClientHeartBeatHandler.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
clientname = ReadFileUtil.readFile("C:/CrawlProgram/wrapper_nettyClient/name.txt");
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
//用於心跳的客戶端類型為0
int type = 0;
//客戶端機器名
String body = clientname;
TcpMsg msg = new TcpMsg(type,body);
try {
ctx.writeAndFlush(msg).sync();
logger.info("發送消息成功,消息類型為:"+type+",請求id為" + msg.getGuid() + ",客戶端機器號為:" + msg.getBody());
} catch (Exception ex) {
ex.printStackTrace();
logger.info("發送失敗");
}
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
然后就是和服務端一樣的decoder、encoder過程,不同的是,我們在decoder的時候使用了線程池去將任務放入隊列中去,防止請求慢的時候丟失任務請求
MessageDecoder.java
public class MessageDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(MessageDecoder.class);
@Autowired
private VisiableThreadPoolTaskExecutor visiableThreadPoolTaskExecutor;
//線程池常量
public static VisiableThreadPoolTaskExecutor executor;
private TcpMsg tcpMsg;
List<Object> out;
// 用@PostConstruct方法引導綁定
@PostConstruct
public void init() {
executor = visiableThreadPoolTaskExecutor;
encryptService = encrypt;
orderService = order;
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
this.context = ctx;
this.out = out;
int len = in.readableBytes();
if (len > 0) {
logger.info("得到返回數據,長度為" + len);
byte[] bytes = new byte[len];
in.readBytes(bytes);
TcpMsg msg = TcpMsg.ByteToObj(bytes);
this.tcpMsg = msg;
logger.info("start asyncServiceExecutor");
executor.execute(new Runnable() {
@Override
public void run() {
executeTask();
}
});
logger.info("end executeAsync");
}
}
}
這里,我們使用了netty來實現了服務端、客戶端通信、心跳檢測的功能。體會到了netty的傳輸效率高、封裝好的特性,用起來簡單、實用。我們不僅可以做斷線重連、還可以做很多業務請求,可以配置多台客戶端去做不同的事情,來達到服務器調度的目的。
歸根結底,netty還是一個框架的東西,我們還是沒有過多的去看透nio的本質、我們要做的不僅僅是會用netty,而且還要了解nio、了解netty的實現原理,它的底層是如何封裝的,希望大家多去研究,我們一起去搞懂它
Netty 的 Channel 過濾器實現原理與 Servlet Filter 機制一致,它將 Channel 的數據管道抽象為 ChannelPipeline,消息在 ChannelPipeline 中流動和傳遞。ChannelPipeline 持有 I/O 事件攔截器 ChannelHandler 的鏈表,由 ChannelHandler 來對 I/O 事件進行具體的攔截和處理,可以方便地通過新增和刪除 ChannelHandler 來實現不同業務邏輯的定制,能夠實現對修改封閉和對擴展到支持。
