依賴
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency>
還用到了
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
ChatDto.java
import lombok.Data; import lombok.experimental.Accessors; /** * 傳輸實體類 */ @Data @Accessors(chain = true) public class ChatDto { /** * 客戶端ID 唯一 */ private String clientId; /** * 發送的消息 */ private String msg; }
NettyChannelMap.java
import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 存放連接的channel對象 */ public class NettyChannelMap { private static Map<String, SocketChannel> map = new ConcurrentHashMap<String, SocketChannel>(); public static void add(String clientId, SocketChannel socketChannel) { map.put(clientId, socketChannel); } public static Channel get(String clientId) { return map.get(clientId); } public static void remove(SocketChannel socketChannel) { for (Map.Entry entry : map.entrySet()) { if (entry.getValue() == socketChannel) { map.remove(entry.getKey()); } } } }
NettyTcpServerBootstrap.java
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * 服務端啟動類 */ public class NettyTcpServerBootstrap { private int port; private SocketChannel socketChannel; public NettyTcpServerBootstrap(int port) throws InterruptedException { this.port = port; } public void start() throws InterruptedException { /** * 創建兩個線程組 bossGroup 和 workerGroup * bossGroup 只是處理連接請求,真正的和客戶端業務處理,會交給 workerGroup 完成 * 兩個都是無線循環 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //創建服務器端的啟動對象,配置參數 ServerBootstrap bootstrap = new ServerBootstrap(); //設置兩個線程組 bootstrap.group(bossGroup, workerGroup) //使用NioServerSocketChannel 作為服務器的通道實現 .channel(NioServerSocketChannel.class) //設置線程隊列得到連接個數 .option(ChannelOption.SO_BACKLOG, 128) //設置保持活動連接狀態 .childOption(ChannelOption.SO_KEEPALIVE, true) //通過NoDelay禁用Nagle,使消息立即發出去,不用等待到一定的數據量才發出去 .childOption(ChannelOption.TCP_NODELAY, true) //可以給 bossGroup 加個日志處理器 .handler(new LoggingHandler(LogLevel.INFO)) //給workerGroup 的 EventLoop 對應的管道設置處理器 .childHandler(new ChannelInitializer<SocketChannel>() { //給pipeline 設置處理器 @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new ObjectEncoder()); p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); p.addLast(new NettyServerHandler()); } }); //啟動服務器並綁定一個端口並且同步生成一個 ChannelFuture 對象 ChannelFuture cf = bootstrap.bind(port).sync(); if (cf.isSuccess()) { System.out.println("socket server start---------------"); } //對關閉通道進行監聽 cf.channel().closeFuture().sync(); } finally { //發送異常關閉 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
NettyServerHandler.java
import com.alibaba.fastjson.JSON; import com.example.netty.dto.ChatDto; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.socket.SocketChannel; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyServerHandler extends SimpleChannelInboundHandler<Object> { /** * 定義一個channel組管理所有channel * GlobalEventExecutor.INSTANCE 是一個全局事件執行器 是一個單例 */ private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 標識 channel處於活動狀態 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } /** * 表示連接建立 第一個被執行 * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { channelGroup.add(ctx.channel()); /** * 該方法會將 channelGroup 中所有的channel 遍歷一遍然后發送消息 不用我們自己遍歷 * 這里只是做個說明 不用 */ // channelGroup.writeAndFlush("發送所有給所有channel"); } /** * 斷開連接 * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } /** * 標識channel處於非活動狀態 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { NettyChannelMap.remove((SocketChannel) ctx.channel()); } /** * 服務端 接收到 客戶端 發的數據 * @param context * @param obj * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext context, Object obj) throws Exception { log.info(">>>>>>>>>>>服務端接收到客戶端的消息:{}",obj); SocketChannel socketChannel = (SocketChannel) context.channel(); ChatDto dto = JSON.parseObject(obj.toString(), ChatDto.class); /** * 客戶端ID */ String clientId = dto.getClientId(); if (clientId == null) { /** * 心跳包處理 */ ChatDto pingDto=new ChatDto(); pingDto.setMsg("服務端收到心跳包,返回響應"); socketChannel.writeAndFlush(JSON.toJSONString(pingDto)); return; } Channel channel = NettyChannelMap.get(clientId); if (channel==null){ /** * 存放所有連接客戶端 */ NettyChannelMap.add(clientId, socketChannel); channel=socketChannel; } /** * 服務器返回客戶端消息 */ ChatDto returnDto=new ChatDto(); returnDto.setClientId(clientId).setMsg("我是服務端,收到你的消息了"); channel.writeAndFlush(JSON.toJSONString(returnDto)); /** * 在這里可以設置異步執行 提交任務到該channel的taskQueue 中 */ context.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10*1000); log.info(">>>>>>>>>休眠十秒"); } catch (InterruptedException e) { e.printStackTrace(); } } }); /** * 可以設置多個異步任務 * 但是這個會在上面異步任務執行完之后才執行 */ context.channel().eventLoop().execute(new Runnable() { @Override public void run() { try { Thread.sleep(10*1000); log.info(">>>>>>>>>休眠二十秒"); } catch (InterruptedException e) { e.printStackTrace(); } } }); ReferenceCountUtil.release(obj); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); } }
客戶端
NettyClientBootstrap.java
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.DefaultEventExecutorGroup; import io.netty.util.concurrent.EventExecutorGroup; /** * 客戶端啟動類 */ public class NettyClientBootstrap { private int port; private String host; public SocketChannel socketChannel; private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20); public NettyClientBootstrap(int port, String host) throws InterruptedException { this.port = port; this.host = host; start(); } private void start() throws InterruptedException { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { /** * 0 表示禁用 * readerIdleTime讀空閑超時時間設定,如果channelRead()方法超過readerIdleTime時間未被調用則會觸發超時事件調用userEventTrigger()方法; * * writerIdleTime寫空閑超時時間設定,如果write()方法超過writerIdleTime時間未被調用則會觸發超時事件調用userEventTrigger()方法; * * allIdleTime所有類型的空閑超時時間設定,包括讀空閑和寫空閑; */ socketChannel.pipeline().addLast(new IdleStateHandler(20, 10, 0)); socketChannel.pipeline().addLast(new ObjectEncoder()); socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null))); socketChannel.pipeline().addLast(new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); if (future.isSuccess()) { socketChannel = (SocketChannel) future.channel(); System.out.println("connect server 成功---------"); } //給關閉通道進行監聽 future.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully(); } } }
NettyClientHandler.java
import com.alibaba.fastjson.JSON; import com.example.netty.dto.ChatDto; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyClientHandler extends SimpleChannelInboundHandler<Object> { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>連接"); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>退出"); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; switch (e.state()) { case WRITER_IDLE: /** * 利用寫空閑發送心跳檢測消息 */ ChatDto pingDto=new ChatDto(); pingDto.setMsg("我是心跳包"); ctx.writeAndFlush(JSON.toJSONString(pingDto)); log.info("send ping to server----------"); break; default: break; } } } /** * 客戶端接收到服務端發的數據 * @param channelHandlerContext * @param obj * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object obj) { log.info(">>>>>>>>>>>>>客戶端接收到消息:{}", obj); ReferenceCountUtil.release(obj); } /** * socket通道處於活動狀態 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>>socket建立了"); super.channelActive(ctx); } /** * socket通道不活動了 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>>socket關閉了"); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); ctx.close(); } }
SprigBoot啟動類添加服務端啟動代碼
@SpringBootApplication public class NettyApplication { public static void main(String[] args) { SpringApplication.run(NettyApplication.class, args); try { NettyTcpServerBootstrap bootstrap = new NettyTcpServerBootstrap(9999); bootstrap.start(); } catch (Exception e) { e.printStackTrace(); System.out.println("server socket 啟動失敗"); } } }
ChatController.java
import com.alibaba.fastjson.JSON; import com.example.netty.dto.ChatDto; import com.example.netty.socket.NettyClientBootstrap; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; /** * 客戶端消息發送控制器 */ @RestController @Slf4j public class ChatController { private static String clientId=UUID.randomUUID().toString(); public static NettyClientBootstrap bootstrap; /** * 發送消息demo * @param msg */ @PostMapping(value = "/send") public void send(String msg) { if (bootstrap == null) { try { /** * 連接 輸入服務器的端口和ip */ bootstrap = new NettyClientBootstrap(9999, "localhost"); } catch (InterruptedException e) { e.printStackTrace(); log.error(">>>>>>>>> server socket 連接失敗"); } } /** * 發送消息 */ ChatDto dto=new ChatDto(); dto.setClientId(clientId).setMsg(msg); /** * json字符串發送 */ bootstrap.socketChannel.writeAndFlush(JSON.toJSONString(dto)); } }
訪問