轉自 http://www.spring4all.com/article/889
用Netty實現長連接服務,當發生下面的情況時,會發生斷線的情況。
- 網絡問題
- 客戶端啟動時服務端掛掉了,連接不上服務端
- 客戶端已經連接服務端,服務端突然掛掉了
- 其它問題等...
##如何解決上面的問題?
1.心跳機制檢測連接存活
長連接是指建立的連接長期保持,不管有無數據包的發送都要保持連接通暢。心跳是用來檢測一個系統是否存活或者網絡鏈路是否通暢的一種方式,一般的做法是客戶端定時向服務端發送心跳包,服務端收到心跳包后進行回復,客戶端收到回復說明服務端存活。
通過心跳檢測機制,可以檢測客戶端與服務的長連接是否保持,當客戶端發送的心跳包沒有收到服務端的響應式,可以認為服務端已經出故障了,這個時候可以重新連接或者選擇其他的可用的服務進行連接。
在Netty中提供了一個IdleStateHandler類用於心跳檢測,用法如下:
ch.pipeline().addLast("ping", new IdleStateHandler(60, 20, 60 * 10, TimeUnit.SECONDS));
- 第一個參數 60 表示讀操作空閑時間
- 第二個參數 20 表示寫操作空閑時間
- 第三個參數 60*10 表示讀寫操作空閑時間
- 第四個參數 單位/秒
在處理數據的ClientPoHandlerProto中增加userEventTriggered用來接收心跳檢測結果,event.state()的狀態分別對應上面三個參數的時間設置,當滿足某個時間的條件時會觸發事件。
public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter { private ImConnection imConnection = new ImConnection(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MessageProto.Message message = (MessageProto.Message) msg; System.out.println("client:" + message.getContent()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state().equals(IdleState.READER_IDLE)) { System.out.println("長期沒收到服務器推送數據"); //可以選擇重新連接 } else if (event.state().equals(IdleState.WRITER_IDLE)) { System.out.println("長期未向服務器發送數據"); //發送心跳包 ctx.writeAndFlush(MessageProto.Message.newBuilder().setType(1)); } else if (event.state().equals(IdleState.ALL_IDLE)) { System.out.println("ALL"); } } } }
服務端收到客戶端發送的心跳消息后,回復一條信息
public class ServerPoHandlerProto extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MessageProto.Message message = (MessageProto.Message) msg; if (ConnectionPool.getChannel(message.getId()) == null) { ConnectionPool.putChannel(message.getId(), ctx); } System.err.println("server:" + message.getId()); // ping if (message.getType() == 1) { ctx.writeAndFlush(message); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
當客戶端20秒沒往服務端發送過數據,就會觸發IdleState.WRITER_IDLE事件,這個時候我們就像服務端發送一條心跳數據,跟業務無關,只是心跳。服務端收到心跳之后就會回復一條消息,表示已經收到了心跳的消息,只要收到了服務端回復的消息,那么就不會觸發IdleState.READER_IDLE事件,如果觸發了IdleState.READER_IDLE事件就說明服務端沒有給客戶端響應,這個時候可以選擇重新連接。
2.啟動時連接重試
在Netty中實現重連的操作比較簡單,Netty已經封裝好了,我們只需要稍微擴展一下即可。
連接的操作是客戶端這邊執行的,重連的邏輯也得加在客戶端,首先我們來看啟動時要是連接不上怎么去重試
增加一個負責重試邏輯的監聽器,代碼如下:
import java.util.concurrent.TimeUnit; import com.netty.im.client.ImClientApp; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoop; /** * 負責監聽啟動時連接失敗,重新連接功能 * @author yinjihuan * */ public class ConnectionListener implements ChannelFutureListener { private ImConnection imConnection = new ImConnection(); @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { final EventLoop loop = channelFuture.channel().eventLoop(); loop.schedule(new Runnable() { @Override public void run() { System.err.println("服務端鏈接不上,開始重連操作..."); imConnection.connect(ImClientApp.HOST, ImClientApp.PORT); } }, 1L, TimeUnit.SECONDS); } else { System.err.println("服務端鏈接成功..."); } } }
通過channelFuture.isSuccess()可以知道在連接的時候是成功了還是失敗了,如果失敗了我們就啟動一個單獨的線程來執行重新連接的操作。
只需要在ConnectionListener添加到ChannelFuture中去即可使用
public class ImConnection { private Channel channel; public Channel connect(String host, int port) { doConnect(host, port); return this.channel; } private void doConnect(String host, int port) { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { // 實體類傳輸數據,protobuf序列化 ch.pipeline().addLast("decoder", new ProtobufDecoder(MessageProto.Message.getDefaultInstance())); ch.pipeline().addLast("encoder", new ProtobufEncoder()); ch.pipeline().addLast(new ClientPoHandlerProto()); } }); ChannelFuture f = b.connect(host, port); f.addListener(new ConnectionListener()); channel = f.channel(); } catch(Exception e) { e.printStackTrace(); } } }
可以按照如下步驟進行測試:
- 直接啟動客戶端,不啟動服務端
- 當連接失敗的時候會進入ConnectionListener中的operationComplete方法執行我們的重連邏輯
3.運行中連接斷開時重試
使用的過程中服務端突然掛了,就得用另一種方式來重連了,可以在處理數據的Handler中進行處理。
public class ClientPoHandlerProto extends ChannelInboundHandlerAdapter { private ImConnection imConnection = new ImConnection(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { MessageProto.Message message = (MessageProto.Message) msg; System.out.println("client:" + message.getContent()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("掉線了..."); //使用過程中斷線重連 final EventLoop eventLoop = ctx.channel().eventLoop(); eventLoop.schedule(new Runnable() { @Override public void run() { imConnection.connect(ImClientApp.HOST, ImClientApp.PORT); } }, 1L, TimeUnit.SECONDS); super.channelInactive(ctx); } }
在連接斷開時都會觸發 channelInactive 方法, 處理重連的邏輯跟上面的一樣。
可以按照如下步驟進行測試:
- 啟動服務端
- 啟動客戶端,連接成功
- 停掉服務端就會觸發channelInactive進行重連操作