說到netty通訊,回憶了下,還是18年的時候,學了了netty,至今就學習的時候寫過一個項目。最近閑生,被要求做一個netty通訊的項目,順手,總結一下,之前寫的項目。
當時是寫了一款訪微信聊天的軟件,所以用到了netty通訊,廢話不過說,我們來直接上代碼吧。
import org.springframework.stereotype.Component; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; @Slf4j @Component public class WSServer { private static class SingletionWSServer { static final WSServer instance = new WSServer(); } public static WSServer getInstance() { return SingletionWSServer.instance; } private EventLoopGroup mainGroup; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public WSServer() { // 主線程組 mainGroup = new NioEventLoopGroup(); // 子線程組 subGroup = new NioEventLoopGroup(); // netty服務器的創建,ServerBootstrap是一個啟動類 server = new ServerBootstrap(); server.group(mainGroup, subGroup)// 設置主從線程組 .channel(NioServerSocketChannel.class)// 設置nio雙向通道 .childHandler(new WSServerInitializer());// 子處理器,用於處理subGroup } /** * 啟動 */ public void start() { this.future = server.bind(9700); System.err.println("netty websocket server 啟動完畢..."); log.info("netty websocket server 啟動完畢..."); } }
這個類用於創建netty的服務端鏈接,下面我們來配置啟動的一些配置。
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class WSServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { System.out.println("初始化 SocketChannel"); log.info("初始化 SocketChannel"); ChannelPipeline pipeline = ch.pipeline(); //websocket 基於http協議,所以要http編解碼器 pipeline.addLast(new HttpServerCodec()); //對寫大數據流的支持 pipeline.addLast(new ChunkedWriteHandler()); //對httpMessage進行聚合,聚合成FullHttpRequest或FullHttpResponse //幾乎在netty中的編程,都會使用到此hanler pipeline.addLast(new HttpObjectAggregator(1024*64)); //========================以上是用於支持http協議======================== //========================增加心跳支持 start ======================== //針對客戶端,如果在1分鍾時沒有想服務端發送寫心跳(ALL),則主動斷開 //如果是讀空閑或者寫空閑,不處理 pipeline.addLast(new IdleStateHandler(8, 10, 12)); //自定義的空閑檢測 pipeline.addLast(new HeartBeatHandler()); //========================增加心跳支持 end ======================== /** * websocket服務器處理的協議,用於指定給客戶端連接訪問的路由: /ws * 本Handler會幫助你處理一些繁重的復雜的事 * 會幫你處理握手動作: handshaking(close, ping, pong) ping + pong = 心跳 * 對於websocket來講,都是以frames進行傳輸的,不同的數據類型對應的frames也不同 */ pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); //自定義hanler pipeline.addLast(new ChatHandler()); } }
自定義空閑檢測
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * 用於檢測channel的心跳的handler 繼承 ChannelInboundHandlerAdapter 從而不需要實現 channelRead0方法 * * @author wb0024 * */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 判斷evt是否是IdleStateEvent(用於觸發用戶事件,包含 讀空閑/寫空閑/讀寫空閑) if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; // 強制類型轉換 if (event.state() == IdleState.READER_IDLE) { System.out.println("進入讀空閑..."); } else if (event.state() == IdleState.WRITER_IDLE) { System.out.println("進入寫空閑..."); } else if (event.state() == IdleState.ALL_IDLE) { System.out.println("channel關閉前users數量為:"+ChatHandler.users.size()); System.out.println("進入讀寫空閑..."); Channel channel = ctx.channel(); //關閉無用的channel,以防資源浪費 channel.close(); System.out.println("channel關閉后users數量為:"+ChatHandler.users.size()); } } } }
自定義hanler
import com.imooc.enums.MsgActionEnum; import com.imooc.utils.JsonUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; /** * 處理消息的handler * * @author wb0024 TextWebSocketFrame:在netty中,是用於為websocket專門處理文本的對象,frame是消息的載體 */ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 用於記錄和管理所有客戶端的channel public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 獲取客戶端傳輸過來的消息 String content = msg.text(); System.out.println("處理消息的handler:" + content); Channel currentChannel = ctx.channel(); // 1. 獲取客戶端發送的消息 DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class); Integer action = dataContent.getAction(); System.out.println("action:" + action); // 2. 判斷消息類型,根據不同的類型來處理不同的業務 if (action == MsgActionEnum.CONNECT.type) { // 2.1 當websocket 第一次open的時候,初始化channel,把用戶的channel和userId關聯起來 String senderId = dataContent.getChatMsg().getSenderId(); UserChannelRel.put(senderId, currentChannel); // 測試 for (Channel channel : users) { System.out.println(channel.id().asLongText()); } UserChannelRel.output(); } else if (action == MsgActionEnum.CHAT.type) { // 2.2 聊天類型的消息,把聊天記錄保存到數據庫,同時標記消息的簽收狀態[未簽收] ChatMsg chatMsg = dataContent.getChatMsg(); String receiverId = chatMsg.getReceiverId(); DataContent dataContentMsg = new DataContent(); dataContentMsg.setChatMsg(chatMsg); // 發送消息 // 從全局用戶channel關系中獲取接受方的channel Channel receiverChannel = UserChannelRel.get(receiverId); if (receiverChannel != null) { // 當receiverChannel不為空的時候,從 ChannelGroup 去查找對應的channel是否存在 Channel findChannel = users.find(receiverChannel.id()); if (findChannel != null) { // 用戶在線 receiverChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.objectToJson(dataContentMsg))); } } } else if (action == MsgActionEnum.KEEPALIVE.type) { // 2.4 心跳類型的消息 System.out.println("收到來自channel為[" + currentChannel + "]的心跳包"); } } /** * 當客戶連接服務端之后(打開鏈接) 獲取客戶端的channel,並且放到ChannelGroup中去進行管理 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { users.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String channelId = ctx.channel().id().asLongText(); System.out.println("客戶端被移除,channelId為:" + channelId); // 當觸發handlerRemoved,ChannelGroup會自動移除對應的客戶端channel users.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 發生異常之后關鍵channel。隨后從ChannelGroup 中移除 ctx.channel().close(); users.remove(ctx.channel()); } }
定義channel管理
import java.util.HashMap; import io.netty.channel.Channel; /** * 用戶id和channel的關聯關系處理 * * @author wb0024 * */ public class UserChannelRel { private static HashMap<String, Channel> manager = new HashMap<>(); public static void put(String senderId, Channel channel) { manager.put(senderId, channel); } public static Channel get(String senderId) { return manager.get(senderId); } public static void output() { for (HashMap.Entry<String, Channel> entry : manager.entrySet()) { System.out.println("UserId:" + entry.getKey() + ",ChannelId:" + entry.getValue().id().asLongText()); } } }
其他類
import java.io.Serializable; import lombok.Data; @Data public class ChatMsg implements Serializable { /** * */ private static final long serialVersionUID = 1L; private String senderId; // 發送者的用戶id private String receiverId; // 接受者的用戶id private String msg; // 聊天內容 private String msgId; // 用於消息的簽收 }
import java.io.Serializable; import lombok.Data; @Data public class DataContent implements Serializable { /** * */ private static final long serialVersionUID = 1L; private Integer action; // 動作類型 private ChatMsg chatMsg; // 用戶的聊天內容對象 private String extand; // 擴展字段 }
好了,到了這里一個簡單的netty通訊就做好了