前提:假設你的客戶端和服務端已經寫好了的情況下
1.短線重連是客戶端的事情——————》客戶端一般這樣寫###
客戶端:
private Channel channel;
private Bootstrap bootstrap;
public void client() throws InterruptedException {
String ip = "localhost";
Integer port = 9898;
bootstrap = new Bootstrap();
System.out.println(this.getClass());
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
NettyTest t = this;
bootstrap.group(loopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(2048))
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 自己的處理器
channel.pipeline().addLast(new TestClientHandler(t));
}
});
try {
ChannelFuture channelFuture = bootstrap.connect(ip,port).sync();
clientChannelFuture.channel().closeFuture().sync()
} finally {
/**優雅退出,釋放NIO線程組*/
loopGroup.shutdownGracefully();
}
處理器:
public class TestClientHandler extends ChannelInboundHandlerAdapter {
// NettyTest nettyTest;
//
// public TestClientHandler(NettyTest nettyTest) {
// this.nettyTest = nettyTest;
// }
NettyTest nettyTest;
public TestClientHandler(netty.test.NettyTest client) {
// super("client");
this.nettyTest = client;
}
public TestClientHandler(){}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("連接成功");
String reqMsg = "我是客戶端 110 " + Thread.currentThread().getName();
byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
/**
* writeBytes:將指定的源數組的數據傳輸到緩沖區
* 調用 ChannelHandlerContext 的 writeAndFlush 方法將消息發送給服務器
*/
reqByteBuf.writeBytes(reqMsgByte);
ctx.writeAndFlush(reqByteBuf);
}
/**
* 當服務端返回應答消息時,channelRead 方法被調用,從 Netty 的 ByteBuf 中讀取並打印應答消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(Thread.currentThread().getName() + ",Server return Message:" + body);
// ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("服務端斷開:" + ctx.channel().remoteAddress());
//TODO 斷開后的重連接
}
/**
* 當發生異常時,打印異常 日志,釋放客戶端資源
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// /**釋放資源*/
// ctx.close();
}
斷線重連改寫
客戶端:
private Channel channel;
private Bootstrap bootstrap;
ClientChannelFuture clientChannelFuture;
public void client() throws InterruptedException {
String ip = "localhost";
Integer port = 9898;
bootstrap = new Bootstrap();
System.out.println(this.getClass());
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
NettyTest t = this;
bootstrap.group(loopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1000)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(2048))
.handler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
// channel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
// String delimiter = "\r\n";
// int length = 40960;
// ByteBuf byteBuf = Unpooled.copiedBuffer(delimiter, CharsetUtil.UTF_8);
// channel.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(length, byteBuf));
channel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
channel.pipeline().addLast(new TestClientHandler(t));
}
});
try {
this.doConnect();
clientChannelFuture.channel().closeFuture().sync();
} catch (Exception e) {
log.info("啟動出錯 e", e);
} finally { //優雅關閉功能
log.info("SecureDeviceServer run finnally shutdown.");
// loopGroup.shutdownGracefully();
}
}
protected void doConnect() {
if (channel != null && channel.isActive()) {
return;
}
clientChannelFuture = bootstrap.connect("127.0.0.1", 9898);
clientChannelFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture futureListener) throws Exception {
if (futureListener.isSuccess()) {
channel = futureListener.channel();
System.out.println("Connect to server successfully!");
} else {
System.out.println("Failed to connect to server, try connect after 10s");
futureListener.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConnect();
}
}, 10, TimeUnit.SECONDS);
}
}
});
}
處理器:
NettyTest nettyTest;
//
// public TestClientHandler(NettyTest nettyTest) {
// this.nettyTest = nettyTest;
// }
NettyTest nettyTest;
public TestClientHandler(netty.test.NettyTest client) {
// super("client");
this.nettyTest = client;
}
public TestClientHandler(){}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("連接成功");
String reqMsg = "我是客戶端 110 " + Thread.currentThread().getName();
byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
/**
* writeBytes:將指定的源數組的數據傳輸到緩沖區
* 調用 ChannelHandlerContext 的 writeAndFlush 方法將消息發送給服務器
*/
reqByteBuf.writeBytes(reqMsgByte);
ctx.writeAndFlush(reqByteBuf);
}
/**
* 當服務端返回應答消息時,channelRead 方法被調用,從 Netty 的 ByteBuf 中讀取並打印應答消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(Thread.currentThread().getName() + ",Server return Message:" + body);
// ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("服務端斷開:" + ctx.channel().remoteAddress());
//TODO 斷開后的重連接
// 加入短線重連
nettyTest.doConnect();
}
/**
* 當發生異常時,打印異常 日志,釋放客戶端資源
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// /**釋放資源*/
// ctx.close();
}