前言:在實現過程查找過許多資料,各種波折,最后綜合多篇文章最終實現並上線使用。為了減少大家踩坑的時間,所以寫了本文,希望有用。對於實現過程中有用的參考資料直接放上鏈接,可能有些內容相對冗余,不過時間允許多看看也並不無益。
入門文章:
http://www.tuicool.com/articles/mEJvYb
netty官網:
(官網的user guide相對一般,javadoc倒是要看的)
需求場景:
實現用戶的在線離線狀態實時展現(我們的客戶端是android)。
技術選型:
在線好辦,關鍵是要監測到什么時候離線,於是我們選擇了心跳模型,當心跳失效時即為離線。如果用http發送心跳包雖然簡單但是極度不科學,耗電量太大,所以直接否決。我們選擇基於TCP實現長連接,而借助一些第三方插件可以更好更快地實現長連接,於是在mina和netty之間我們選擇了netty。(理由僅僅是在百度知道里邊看到別人說netty使用的更廣泛,沒有深入對比過)
相關版本:
netty5.0
jdk1.7
tomcat6.0
基礎流程圖如下:

服務端代碼:
HeartBeatServer.java
public class HeartBeatServer { // 端口 private int port ; public HeartBeatServer(int port) { this.port = port; } ChannelFuture f ; ServerBootstrap b ; // 檢測chanel是否接受過心跳數據時間間隔(單位秒) private static final int READ_WAIT_SECONDS = 10; public void startServer() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { b = new ServerBootstrap(); b.group(bossGroup, workerGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new HeartBeatServerInitializer()); // 服務器綁定端口監聽 f = b.bind(port).sync(); // 監聽服務器關閉監聽,此方法會阻塞 f.channel().closeFuture().sync(); // 可以簡寫為 /* b.bind(portNumber).sync().channel().closeFuture().sync(); */ } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * 消息處理器 * @author cullen edward */ private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); /* * 使用ObjectDecoder和ObjectEncoder * 因為雙向都有寫數據和讀數據,所以這里需要兩個都設置 * 如果只讀,那么只需要ObjectDecoder即可 */ pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); /* * 這里只監聽讀操作 * 可以根據需求,監聽寫操作和總得操作 */ pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS)); //pipeline.addLast("handler", new Heartbeat()); pipeline.addLast("handler", new HeartbeatHandler()); } } public void stopServer(){ if(f!=null){ f.channel().close(); } } /** * @param args */ public static void main(String[] args) { HeartbeatServer heartbeatServer = new HeartbeatServer(9597); heartbeatServer.startServer(); } }
HeartbeatHandler.java
public class HeartbeatHandler extends SimpleChannelInboundHandler<String> { // 失敗計數器:未收到client端發送的ping請求 private int unRecPingTimes = 0 ; private String userid; // 定義服務端沒有收到心跳消息的最大次數 private static final int MAX_UN_REC_PING_TIMES = 3; @Override protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("----->msg=" + msg); //msg格式約定為"operation,userid" String[] args = msg.split(","); String msg_operation = args[0]; String msg_userid = args[1]; if("LOGIN".equals(msg_operation)){ if(!Utils.isBlank(msg_userid)){ userid = msg_userid; } setUserOnlineStatus(userid, true); }else if("HEARTBEAT".equals(msg_operation)){ ctx.channel().writeAndFlush("success"); // 失敗計數器清零 unRecPingTimes = 0; } } public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { /*讀超時*/ System.out.println("===服務端===(READER_IDLE 讀超時)"); // 失敗計數器次數大於等於3次的時候,關閉鏈接,等待client重連 if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){ System.out.println("===服務端===(讀超時,關閉chanel)"); // 連續超過N次未收到client的ping消息,那么關閉該通道,等待client重連 ctx.channel().close(); }else{ // 失敗計數器加1 unRecPingTimes++; } } else if (event.state() == IdleState.WRITER_IDLE) { /*寫超時*/ System.out.println("===服務端===(WRITER_IDLE 寫超時)"); } else if (event.state() == IdleState.ALL_IDLE) { /*總超時*/ System.out.println("===服務端===(ALL_IDLE 總超時)"); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("錯誤原因:"+cause.getMessage()); ctx.channel().close(); setUserOnlineStatus(userid, false); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("Client active "); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 關閉,等待重連 ctx.close(); System.out.println("===服務端===(客戶端失效)"); setUserOnlineStatus(userid, false); } //設置用戶在線離線狀態 private void setUserOnlineStatus(String userid, boolean isOnline){ if(!Utils.isBlank(userid)){ //更新用戶信息為在線狀態(此處代碼省略) } } }
簡易的測試客戶端代碼:
SimpleClient.java
public class SimpleClient { public static void main(String[] args) throws Exception { new SimpleClient("127.0.0.1", 9597).run(); } private final String host; private final int port; public SimpleClient(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap().group(group).channel( NioSocketChannel.class).handler( new SimpleClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader( System.in)); while (true) { channel.writeAndFlush(in.readLine()); } } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
SimpleClientInitializer.java
public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); } }
備注:代碼大部分是從其他網站復制修改調整的,寫得相對簡易一點,其中還有很多安全性、合理性有待優化。
代碼參考文章:
更多相關文章: