netty實例之netty斷開重連,並通過server端向chient端發送信息


netty客戶端:

public class ClientConnector implements Runnable{
    private Channel channel;
    private static String ip;
    private static int port;

    static {
        ip = BasicAppsConfig.getNettyServerIpPort().split(":")[0];
        port = Integer.parseInt(BasicAppsConfig.getNettyServerIpPort().split(":")[1]);
    }
    @Override
    public void run() {
        connect(ip,port);
    }

    public Channel connect(String host, int port) {
        doConnect(host, port);
        return this.channel;
    }

    private void doConnect(String host, int port) {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                public void initChannel(NioSocketChannel ch) throws Exception {
                    /*// 實體類傳輸數據,protobuf序列化
                    ch.pipeline().addLast("decoder",
                            new ProtobufDecoder(MessageProto.Message.getDefaultInstance()));
                    ch.pipeline().addLast("encoder",
                            new ProtobufEncoder());*/
                    //發送消息頻率。單位秒。此設置
                    //第一個參數 60 表示讀操作空閑時間
                    //第二個參數 20 表示寫操作空閑時間
                    //第三個參數 6100 表示讀寫操作空閑時間
                    //第四個參數 單位/秒
                    ch.pipeline().addLast(new IdleStateHandler(70, 60, 100, TimeUnit.SECONDS));

                    ch.pipeline().addLast(new CliensideMessageHandler());
                }
            });

            ChannelFuture f = b.connect(host, port);
            f.addListener(new ConnectionListener());
            channel = f.channel();//此處不需f.channel().closeFuture();否則無法實現nettyServer端斷開重連
        } catch(Exception e) {
            e.printStackTrace();
        }
    }

    public static String getIp() {
        return ip;
    }

    public static void setIp(String ip) {
        ClientConnector.ip = ip;
    }

    public static int getPort() {
        return port;
    }

    public static void setPort(int port) {
        ClientConnector.port = port;
    }
}

 

Handler:

public class CliensideMessageHandler extends ChannelInboundHandlerAdapter {
  private ClientConnector clientConnector = new ClientConnector();

  //nettyClient啟動時會執行此類 @Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception { String pmk = ""; try { pmk = channelHandlerContext.channel().localAddress().toString().split(":")[0]; } catch (Exception e) { pmk="/"+ SysInfoCatcher.getIP(); } ByteBuf buffer = StrConverter.getByteBuf(pmk, channelHandlerContext); System.out.println(new Date() + ": 信息收集代理啟動,連接服務器ing --->" + StrConverter.getStr(buffer)); logChannelHandlerContext = channelHandlerContext;
    //此處信息的寫入,將在nettyServer端讀取寫出 channelHandlerContext.channel().writeAndFlush(buffer); }
  //在nettyServer端使用管道寫入消息時,將在此處讀取寫出 @Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { ByteBuf byteBuf = (ByteBuf) message; String content = StrConverter.getStr(byteBuf); System.out.println("\n\n" + this.getClass().getName() + " : from server's data [ " + content + " ]\n"); if (content.startsWith("response")) { System.out.println("\n\n" + new Date() + " from server's heartbeat data---> " + content); } else { JSONObject jsonObject=JSON.parseObject(content); System.out.println("\n\n" + this.getClass().getName() + " : from server's content data < " + content + " >\n"); if(jsonObject.getString("commandTag").equals("start")){ System.out.println(jsonObject.getString("commandContent")); }else if (jsonObject.getString("commandTag").equals("ReturnResult")) {

        System.out.println(jsonObject.getString("commandContent"));
} else { System.out.println(jsonObject.getString("commandContent")); } } byteBuf.release(); } /** * 用來接收心跳檢測結果,event.state()的狀態分別對應IdleStateHandler(70, 60, 100, TimeUnit.SECONDS)中的 * 三個參數的時間設置,當滿足某個時間的條件時會觸發事件 */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); ClientConnector clientConnector = new ClientConnector(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { System.out.println("長期沒收到服務器推送數據"); //重新連接 clientConnector.connect(ClientConnector.getIp(), ClientConnector.getPort()); } else if (event.state().equals(IdleState.WRITER_IDLE)) { System.out.println("長期未向服務器發送數據"); //發送心跳包 ByteBuf byteBufOut = StrConverter.getByteBuf("PING/" + ctx.channel().localAddress().toString(), ctx); ctx.writeAndFlush(byteBufOut); } else if (event.state().equals(IdleState.ALL_IDLE)) { System.out.println("ALL"); } } } /** * channelInactive 被觸發一定是和服務器斷開了。分兩種情況。一種是服務端close,一種是客戶端close。 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String pmk = ""; try { pmk = ctx.channel().localAddress().toString().split(":")[0]; } catch (Exception e) { pmk="/"+ SysInfoCatcher.getIP(); }     //斷開連接時,將ChannelMap中存儲的管道信息刪除 if (ChannelMap.getAllChannels().get(pmk) != null) ChannelMap.removeTimeServerChannel(pmk); System.err.println(new Date()+" : "+this.getClass().getName() + "#channelInactive()\n" + "Heartbeat data sender disconnect from server!!"); // 定時線程 斷線重連 final EventLoop eventLoop = ctx.channel().eventLoop(); //設置斷開連接后重連時間,此設置是斷開連接一分鍾(10秒)后重連 eventLoop.schedule(() -> clientConnector.connect(ClientConnector.getIp(), ClientConnector.getPort()), 10, TimeUnit.SECONDS); super.channelInactive(ctx); } /** * 在服務器端不使用心跳檢測的情況下,如果客戶端突然拔掉網線斷網(注意這里不是客戶度程序關閉,而僅是異常斷網) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { String pmk = ""; try { pmk = ctx.channel().localAddress().toString().split(":")[0]; } catch (Exception e) { pmk="/"+ SysInfoCatcher.getIP(); }     //斷開連接時,將ChannelMap中存儲的管道信息刪除 if (ChannelMap.getAllChannels().get(pmk) != null) ChannelMap.removeTimeServerChannel(pmk); System.err.println("server " + ctx.channel().remoteAddress() + " 關閉連接, due to network exception !!"); cause.printStackTrace(); ctx.close(); } }

 

檢測netty連接狀態的監聽類:

public class ConnectionListener implements ChannelFutureListener {

    private ClientConnector clientConnector = new ClientConnector();

    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        if (!channelFuture.isSuccess()) {
            final EventLoop loop = channelFuture.channel().eventLoop();
            loop.schedule(new Runnable() {
                @Override
                public void run() {
                    System.err.println("服務端連接不上,開始重連操作...");
                    clientConnector.connect(ClientConnector.getIp(), ClientConnector.getPort());
                }
            }, 1L, TimeUnit.SECONDS);
        } else {
            System.err.println("連接到服務端成功...");
        }
    }
}

 

nettyServer端的handler:

public class ServersideMessageHandler extends ChannelInboundHandlerAdapter {

  //在netty客戶端發送的消息,將在此方法中進行處理 @Override
public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception { String msg = StrConverter.getStr((ByteBuf) message);
    //msg為netty客戶端傳送過來的信息,其內容是ip信息,用來當作ChannelMap的唯一key
    //判斷ChannelMap中是否存在key為msg的管道信息,存在且其對象地址與此處的channelHandlerContext一致,則不操作
    //否則將其添加進ChannelMap
if (ChannelMap.getAllChannels().get(msg) != null && ChannelMap.getAllChannels().get(msg)==(channelHandlerContext)) { //接收到服務端發來的數據進行業務處理 }elseif (msg.startsWith("PING")){
      //心跳信息的反饋 System.out.println(
new Date() + " : 服務端響應心跳數據:response: " +channelHandlerContext.channel().remoteAddress()); ByteBuf byteBufOut = StrConverter.getByteBuf("response: "+new Date(),channelHandlerContext); channelHandlerContext.channel().writeAndFlush(byteBufOut); }else { //接收到服務端發來的數據進行業務處理 //如果map中沒有此channelHandlerContext 將連接存入map中 System.out.println("\n\n"+new Date() + ": 服務端獲取到channelID -> " + msg+"\n\n"); ChannelMap.removeTimeServerChannel(msg); ChannelMap.addTimeServerChannel(msg,channelHandlerContext); System.out.println("map size : "+ ChannelMap.getAllChannels().size()); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } /** * channelInactive 被觸發一定是和服務器斷開了。分兩種情況。一種是服務端close,一種是客戶端close。 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { String pmk = ""; try { pmk = ctx.channel().remoteAddress().toString().split(":")[0]; } catch (Exception e) { // pmk="/"+ SysInfoCatcher.getIP(); } if(ChannelMap.getAllChannels().get(pmk) != null) ChannelMap.removeTimeServerChannel(pmk); super.channelInactive(ctx); System.err.println(new Date()+" : 客戶端與服務端斷開連接,將"+pmk+"從ChannelMap中移除!"); System.out.println("map size : "+ChannelMap.getAllChannels().size()); } }

 

NettyServer類
public class NettyServer {
    private static int port=8899;

    public static void activateServer() {
        System.out.println("Start Netty Server ...");

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        public void initChannel(NioSocketChannel ch) throws Exception {
                            // 注冊handler
                            ch.pipeline().addLast(new ServersideMessageHandler());
                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)/*.childOption(ChannelOption.SO_KEEPALIVE, true)*/;

            ChannelFuture f = b.bind(Integer.parseInt(BasicAppsConfig.getNettyServerIpPort().split(":")[1])).sync();

            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

管道實體類:

public class ChannelMap {
    private static Map<String, Map> parentChannelMap = new ConcurrentHashMap<>();
    private static Map<String, ChannelHandlerContext> channelMap = new ConcurrentHashMap<>();

    public static void addTimeServerChannel(String id, ChannelHandlerContext sc){
        String key=id.split(":")[0];
        channelMap.put(id, sc);
        parentChannelMap.put(key,channelMap);
        System.out.println("size = "+channelMap.size());
    }

    public static Map<String, ChannelHandlerContext> getAllChannels(){
        return channelMap;
    }

    public static ChannelHandlerContext getTimeServerChannel(String id){
        return channelMap.get(id);
    }

    public static  void removeTimeServerChannel(String id){
        channelMap.remove(id);
    }
}

 

 

工具類:

public class StrConverter {
    public static ByteBuf getByteBuf(String cotent, ChannelHandlerContext channelHandlerContext) {
        byte[] bs = cotent.getBytes(Charset.forName("utf-8"));
        ByteBuf byteBuf = channelHandlerContext.alloc().buffer();
        byteBuf.writeBytes(bs);
        return byteBuf;
    }
    public static String getStr(ByteBuf cotent) {
        return cotent.toString(Charset.forName("utf-8"));
    }
}

 

netty客戶端與服務器端通訊時的數據類型要一致,否則其中一端將無法發送或解析信息!

使用ChannelMap存儲的管道信息channelHandlerContext可以向管道中寫入數據,以達到server端向對應的client端傳輸數據的目的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM