netty客戶端:
public class ClientConnector implements Runnable{ private Channel channel; private static String ip; private static int port; static { ip = BasicAppsConfig.getNettyServerIpPort().split(":")[0]; port = Integer.parseInt(BasicAppsConfig.getNettyServerIpPort().split(":")[1]); } @Override public void run() { connect(ip,port); } public Channel connect(String host, int port) { doConnect(host, port); return this.channel; } private void doConnect(String host, int port) { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<NioSocketChannel>() { @Override public void initChannel(NioSocketChannel ch) throws Exception { /*// 實體類傳輸數據,protobuf序列化 ch.pipeline().addLast("decoder", new ProtobufDecoder(MessageProto.Message.getDefaultInstance())); ch.pipeline().addLast("encoder", new ProtobufEncoder());*/ //發送消息頻率。單位秒。此設置 //第一個參數 60 表示讀操作空閑時間 //第二個參數 20 表示寫操作空閑時間 //第三個參數 6100 表示讀寫操作空閑時間 //第四個參數 單位/秒 ch.pipeline().addLast(new IdleStateHandler(70, 60, 100, TimeUnit.SECONDS)); ch.pipeline().addLast(new CliensideMessageHandler()); } }); ChannelFuture f = b.connect(host, port); f.addListener(new ConnectionListener()); channel = f.channel();//此處不需f.channel().closeFuture();否則無法實現nettyServer端斷開重連 } catch(Exception e) { e.printStackTrace(); } } public static String getIp() { return ip; } public static void setIp(String ip) { ClientConnector.ip = ip; } public static int getPort() { return port; } public static void setPort(int port) { ClientConnector.port = port; } }
Handler:
public class CliensideMessageHandler extends ChannelInboundHandlerAdapter { private ClientConnector clientConnector = new ClientConnector();
//nettyClient啟動時會執行此類 @Override public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception { String pmk = ""; try { pmk = channelHandlerContext.channel().localAddress().toString().split(":")[0]; } catch (Exception e) { pmk="/"+ SysInfoCatcher.getIP(); } ByteBuf buffer = StrConverter.getByteBuf(pmk, channelHandlerContext); System.out.println(new Date() + ": 信息收集代理啟動,連接服務器ing --->" + StrConverter.getStr(buffer)); logChannelHandlerContext = channelHandlerContext;
//此處信息的寫入,將在nettyServer端讀取寫出 channelHandlerContext.channel().writeAndFlush(buffer); }
//在nettyServer端使用管道寫入消息時,將在此處讀取寫出 @Override public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { ByteBuf byteBuf = (ByteBuf) message; String content = StrConverter.getStr(byteBuf); System.out.println("\n\n" + this.getClass().getName() + " : from server's data [ " + content + " ]\n"); if (content.startsWith("response")) { System.out.println("\n\n" + new Date() + " from server's heartbeat data---> " + content); } else { JSONObject jsonObject=JSON.parseObject(content); System.out.println("\n\n" + this.getClass().getName() + " : from server's content data < " + content + " >\n"); if(jsonObject.getString("commandTag").equals("start")){ System.out.println(jsonObject.getString("commandContent")); }else if (jsonObject.getString("commandTag").equals("ReturnResult")) {
System.out.println(jsonObject.getString("commandContent"));
} else { System.out.println(jsonObject.getString("commandContent")); } } byteBuf.release(); } /** * 用來接收心跳檢測結果,event.state()的狀態分別對應IdleStateHandler(70, 60, 100, TimeUnit.SECONDS)中的 * 三個參數的時間設置,當滿足某個時間的條件時會觸發事件 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); ClientConnector clientConnector = new ClientConnector(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { System.out.println("長期沒收到服務器推送數據"); //重新連接 clientConnector.connect(ClientConnector.getIp(), ClientConnector.getPort()); } else if (event.state().equals(IdleState.WRITER_IDLE)) { System.out.println("長期未向服務器發送數據"); //發送心跳包 ByteBuf byteBufOut = StrConverter.getByteBuf("PING/" + ctx.channel().localAddress().toString(), ctx); ctx.writeAndFlush(byteBufOut); } else if (event.state().equals(IdleState.ALL_IDLE)) { System.out.println("ALL"); } } } /** * channelInactive 被觸發一定是和服務器斷開了。分兩種情況。一種是服務端close,一種是客戶端close。 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String pmk = ""; try { pmk = ctx.channel().localAddress().toString().split(":")[0]; } catch (Exception e) { pmk="/"+ SysInfoCatcher.getIP(); } //斷開連接時,將ChannelMap中存儲的管道信息刪除 if (ChannelMap.getAllChannels().get(pmk) != null) ChannelMap.removeTimeServerChannel(pmk); System.err.println(new Date()+" : "+this.getClass().getName() + "#channelInactive()\n" + "Heartbeat data sender disconnect from server!!"); // 定時線程 斷線重連 final EventLoop eventLoop = ctx.channel().eventLoop(); //設置斷開連接后重連時間,此設置是斷開連接一分鍾(10秒)后重連 eventLoop.schedule(() -> clientConnector.connect(ClientConnector.getIp(), ClientConnector.getPort()), 10, TimeUnit.SECONDS); super.channelInactive(ctx); } /** * 在服務器端不使用心跳檢測的情況下,如果客戶端突然拔掉網線斷網(注意這里不是客戶度程序關閉,而僅是異常斷網) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { String pmk = ""; try { pmk = ctx.channel().localAddress().toString().split(":")[0]; } catch (Exception e) { pmk="/"+ SysInfoCatcher.getIP(); } //斷開連接時,將ChannelMap中存儲的管道信息刪除 if (ChannelMap.getAllChannels().get(pmk) != null) ChannelMap.removeTimeServerChannel(pmk); System.err.println("server " + ctx.channel().remoteAddress() + " 關閉連接, due to network exception !!"); cause.printStackTrace(); ctx.close(); } }
檢測netty連接狀態的監聽類:
public class ConnectionListener implements ChannelFutureListener { private ClientConnector clientConnector = new ClientConnector(); @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { final EventLoop loop = channelFuture.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { System.err.println("服務端連接不上,開始重連操作..."); clientConnector.connect(ClientConnector.getIp(), ClientConnector.getPort()); } }, 1L, TimeUnit.SECONDS); } else { System.err.println("連接到服務端成功..."); } } }
nettyServer端的handler:
public class ServersideMessageHandler extends ChannelInboundHandlerAdapter {
//在netty客戶端發送的消息,將在此方法中進行處理 @Override public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { String msg = StrConverter.getStr((ByteBuf) message);
//msg為netty客戶端傳送過來的信息,其內容是ip信息,用來當作ChannelMap的唯一key
//判斷ChannelMap中是否存在key為msg的管道信息,存在且其對象地址與此處的channelHandlerContext一致,則不操作
//否則將其添加進ChannelMap中 if (ChannelMap.getAllChannels().get(msg) != null && ChannelMap.getAllChannels().get(msg)==(channelHandlerContext)) { //接收到服務端發來的數據進行業務處理 }elseif (msg.startsWith("PING")){
//心跳信息的反饋 System.out.println(new Date() + " : 服務端響應心跳數據:response: " +channelHandlerContext.channel().remoteAddress()); ByteBuf byteBufOut = StrConverter.getByteBuf("response: "+new Date(),channelHandlerContext); channelHandlerContext.channel().writeAndFlush(byteBufOut); }else { //接收到服務端發來的數據進行業務處理 //如果map中沒有此channelHandlerContext 將連接存入map中 System.out.println("\n\n"+new Date() + ": 服務端獲取到channelID -> " + msg+"\n\n"); ChannelMap.removeTimeServerChannel(msg); ChannelMap.addTimeServerChannel(msg,channelHandlerContext); System.out.println("map size : "+ ChannelMap.getAllChannels().size()); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * channelInactive 被觸發一定是和服務器斷開了。分兩種情況。一種是服務端close,一種是客戶端close。 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String pmk = ""; try { pmk = ctx.channel().remoteAddress().toString().split(":")[0]; } catch (Exception e) { // pmk="/"+ SysInfoCatcher.getIP(); } if(ChannelMap.getAllChannels().get(pmk) != null) ChannelMap.removeTimeServerChannel(pmk); super.channelInactive(ctx); System.err.println(new Date()+" : 客戶端與服務端斷開連接,將"+pmk+"從ChannelMap中移除!"); System.out.println("map size : "+ChannelMap.getAllChannels().size()); } }
NettyServer類
public class NettyServer { private static int port=8899; public static void activateServer() { System.out.println("Start Netty Server ..."); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override public void initChannel(NioSocketChannel ch) throws Exception { // 注冊handler ch.pipeline().addLast(new ServersideMessageHandler()); } }).option(ChannelOption.SO_BACKLOG, 128)/*.childOption(ChannelOption.SO_KEEPALIVE, true)*/; ChannelFuture f = b.bind(Integer.parseInt(BasicAppsConfig.getNettyServerIpPort().split(":")[1])).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
管道實體類:
public class ChannelMap { private static Map<String, Map> parentChannelMap = new ConcurrentHashMap<>(); private static Map<String, ChannelHandlerContext> channelMap = new ConcurrentHashMap<>(); public static void addTimeServerChannel(String id, ChannelHandlerContext sc){ String key=id.split(":")[0]; channelMap.put(id, sc); parentChannelMap.put(key,channelMap); System.out.println("size = "+channelMap.size()); } public static Map<String, ChannelHandlerContext> getAllChannels(){ return channelMap; } public static ChannelHandlerContext getTimeServerChannel(String id){ return channelMap.get(id); } public static void removeTimeServerChannel(String id){ channelMap.remove(id); } }
工具類:
public class StrConverter { public static ByteBuf getByteBuf(String cotent, ChannelHandlerContext channelHandlerContext) { byte[] bs = cotent.getBytes(Charset.forName("utf-8")); ByteBuf byteBuf = channelHandlerContext.alloc().buffer(); byteBuf.writeBytes(bs); return byteBuf; } public static String getStr(ByteBuf cotent) { return cotent.toString(Charset.forName("utf-8")); } }
netty客戶端與服務器端通訊時的數據類型要一致,否則其中一端將無法發送或解析信息!
使用ChannelMap存儲的管道信息channelHandlerContext可以向管道中寫入數據,以達到server端向對應的client端傳輸數據的目的