基於netty的心跳機制實現


前言:在實現過程查找過許多資料,各種波折,最后綜合多篇文章最終實現並上線使用。為了減少大家踩坑的時間,所以寫了本文,希望有用。對於實現過程中有用的參考資料直接放上鏈接,可能有些內容相對冗余,不過時間允許多看看也並不無益。

 

入門文章:

http://www.tuicool.com/articles/mEJvYb

 

netty官網:
(官網的user guide相對一般,javadoc倒是要看的)
 
需求場景:
實現用戶的在線離線狀態實時展現(我們的客戶端是android)。
 
技術選型:
在線好辦,關鍵是要監測到什么時候離線,於是我們選擇了心跳模型,當心跳失效時即為離線。如果用http發送心跳包雖然簡單但是極度不科學,耗電量太大,所以直接否決。我們選擇基於TCP實現長連接,而借助一些第三方插件可以更好更快地實現長連接,於是在mina和netty之間我們選擇了netty。(理由僅僅是在百度知道里邊看到別人說netty使用的更廣泛,沒有深入對比過)
 
相關版本:
netty5.0
jdk1.7
tomcat6.0
 
基礎流程圖如下:
服務端代碼:
HeartBeatServer.java
public class HeartBeatServer {

	// 端口
	private int port ;
	public HeartBeatServer(int port) {
		this.port = port;
	}
	
	ChannelFuture f ;
	
	ServerBootstrap b ;
	
	// 檢測chanel是否接受過心跳數據時間間隔(單位秒)
	private static final int READ_WAIT_SECONDS = 10;

	
	public void startServer() {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			b = new ServerBootstrap();
			b.group(bossGroup, workerGroup);
			b.channel(NioServerSocketChannel.class);
			b.childHandler(new HeartBeatServerInitializer());
			// 服務器綁定端口監聽
			f = b.bind(port).sync();
			// 監聽服務器關閉監聽,此方法會阻塞
			f.channel().closeFuture().sync();
			// 可以簡寫為
			/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}
	/**
	 * 消息處理器
	 * @author cullen edward
	 */
	private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
		@Override
		protected void initChannel(SocketChannel ch) throws Exception {
			ChannelPipeline pipeline = ch.pipeline();
			
			/*
			 * 使用ObjectDecoder和ObjectEncoder
			 * 因為雙向都有寫數據和讀數據,所以這里需要兩個都設置
			 * 如果只讀,那么只需要ObjectDecoder即可
			 */
			pipeline.addLast("decoder", new StringDecoder());
	        pipeline.addLast("encoder", new StringEncoder());
			
			/*
			 * 這里只監聽讀操作
			 * 可以根據需求,監聽寫操作和總得操作
			 */
			pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS));
			
			//pipeline.addLast("handler", new Heartbeat());
			pipeline.addLast("handler", new HeartbeatHandler());
		}
	}
	
	public void stopServer(){
		if(f!=null){
			f.channel().close();
		}
	}
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		HeartbeatServer heartbeatServer = new HeartbeatServer(9597);
		heartbeatServer.startServer();
	}
}

  

HeartbeatHandler.java
public class HeartbeatHandler extends SimpleChannelInboundHandler<String> { 
	// 失敗計數器:未收到client端發送的ping請求
	private int unRecPingTimes = 0 ;
	private String userid;
	
	// 定義服務端沒有收到心跳消息的最大次數
	private static final int MAX_UN_REC_PING_TIMES = 3;
	
	@Override
	protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
		System.out.println("----->msg=" + msg);    //msg格式約定為"operation,userid"
        String[] args = msg.split(",");
        String msg_operation = args[0];
        String msg_userid = args[1];
        if("LOGIN".equals(msg_operation)){
        	if(!Utils.isBlank(msg_userid)){
        		userid = msg_userid;
	        }
        	setUserOnlineStatus(userid, true);
        }else if("HEARTBEAT".equals(msg_operation)){
			ctx.channel().writeAndFlush("success");
			// 失敗計數器清零
			unRecPingTimes = 0;
		}
	}
	
	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                /*讀超時*/
                System.out.println("===服務端===(READER_IDLE 讀超時)");
                // 失敗計數器次數大於等於3次的時候,關閉鏈接,等待client重連
                if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){
                	System.out.println("===服務端===(讀超時,關閉chanel)");
                	// 連續超過N次未收到client的ping消息,那么關閉該通道,等待client重連
                	ctx.channel().close();
                }else{
                	// 失敗計數器加1
                	unRecPingTimes++;
                }
            } else if (event.state() == IdleState.WRITER_IDLE) {
                /*寫超時*/   
                System.out.println("===服務端===(WRITER_IDLE 寫超時)");
            } else if (event.state() == IdleState.ALL_IDLE) {
                /*總超時*/
                System.out.println("===服務端===(ALL_IDLE 總超時)");
            }
        }
    }
	
    @Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    	System.out.println("錯誤原因:"+cause.getMessage());
    	ctx.channel().close();
	    setUserOnlineStatus(userid, false);
	}
	@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client active ");
        super.channelActive(ctx);
    }
	
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		// 關閉,等待重連
	    ctx.close();
	    System.out.println("===服務端===(客戶端失效)");
	    setUserOnlineStatus(userid, false);
	}
	
	//設置用戶在線離線狀態
	private void setUserOnlineStatus(String userid, boolean isOnline){
		if(!Utils.isBlank(userid)){
    		//更新用戶信息為在線狀態(此處代碼省略)
    	}
	}
}

  

簡易的測試客戶端代碼:

SimpleClient.java
 
public class SimpleClient {
	public static void main(String[] args) throws Exception {
		new SimpleClient("127.0.0.1", 9597).run();
	}
	private final String host;
	private final int port;
	public SimpleClient(String host, int port) {
		this.host = host;
		this.port = port;
	}
	public void run() throws Exception {
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bootstrap = new Bootstrap().group(group).channel(
					NioSocketChannel.class).handler(
					new SimpleClientInitializer());
			Channel channel = bootstrap.connect(host, port).sync().channel();
			BufferedReader in = new BufferedReader(new InputStreamReader(
					System.in));
			while (true) {
				channel.writeAndFlush(in.readLine());
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			group.shutdownGracefully();
		}
	}
}

  

SimpleClientInitializer.java

public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
    }
}

 

備注:代碼大部分是從其他網站復制修改調整的,寫得相對簡易一點,其中還有很多安全性、合理性有待優化。

 

代碼參考文章:

 
更多相關文章:
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM