1.導入pom依賴
<dependencies> ...... <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency> </dependencies>
2.服務端
a.創建服務端啟動類
public class StartServer { //端口號 private int port; public StartServer(int port) { this.port = port; } //啟動方法 public void start() throws Exception { //負責接收客戶端的連接的線程。線程數設置為1即可,netty處理鏈接事件默認為單線程,過度設置反而浪費cpu資源 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //負責處理數據傳輸的工作線程。線程數默認為CPU核心數乘以2 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); //在ServerChannelInitializer中初始化ChannelPipeline責任鏈,並添加到serverBootstrap中 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) { //添加編解碼 channel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); channel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); channel.pipeline().addLast("socketHandler", new SocketHandler()); } }); //標識當服務器請求處理線程全滿時,用於臨時存放已完成三次握手的請求的隊列的最大長度 bootstrap.option(ChannelOption.SO_BACKLOG, 1024); //是否啟用心跳保活機制 bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); //綁定端口后,開啟監聽 ChannelFuture future = bootstrap.bind(port).sync(); //等待服務監聽端口關閉 future.channel().closeFuture().sync(); } finally { //釋放資源 workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } //測試代碼 public static void main(String[] args) { try { int port = 8080; new StartServer(port).start(); } catch (Exception e) { e.printStackTrace(); } } }
b.創建Socket處理器
public class SocketHandler extends SimpleChannelInboundHandler<String> { /** * 客戶端發消息會觸發 */ @Override public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("[" + this.getIP(ctx) + "]收到消息:" + msg); ClientManager.getInstance().handleMsg(this.getIP(ctx), "This is response"); } /** * 客戶端連接會觸發 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加channel信息 ClientManager.getInstance().putChannel(this.getIP(ctx), ctx.channel()); System.out.println("[" + this.getIP(ctx) + "]已連接。。。"); } /** * 客戶端斷開連接會觸發 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //刪除失效的channel ClientManager.getInstance().removeChannel(getIP(ctx)); ctx.close(); System.out.println("[" + this.getIP(ctx) + "]已斷開。。。"); } /** * 發生異常觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { System.out.println("[" + this.getIP(ctx) + "]發生異常:" + t); ctx.close(); } /** * 獲取IP地址 */ private String getIP(ChannelHandlerContext ctx) { String socketString = ctx.channel().remoteAddress().toString(); int index = socketString.indexOf(":"); String ip = socketString.substring(1, index); return ip; } }
c.創建客戶端管理類
public class ClientManager { private static ClientManager instance = new ClientManager(); private ClientManager(){} public static ClientManager getInstance(){ return instance; } //IP與信道的對應關系 private Map<String, Channel> channelMap = new ConcurrentHashMap<>(); //添加信道 public void putChannel(String ip, Channel channel){ this.channelMap.put(ip, channel); } //刪除信道 public void removeChannel(String ip){ this.channelMap.remove(ip); } //發送消息 public void sendMsg(String ip, String msg){ Channel channel = this.channelMap.get(ip); if(channel != null){ channel.writeAndFlush(msg); } } //處理消息 public void handleMsg(String ip, String msg){ this.sendMsg(ip, msg); } }
3.客戶端
a.創建客戶端啟動類
public class StartClient { //主機名/IP private String host; //端口號 private int port; public StartClient(String host, int port) { this.host = host; this.port = port; } //啟動方法 public void start() throws Exception { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast("decoder", new StringDecoder()); channel.pipeline().addLast("encoder", new StringEncoder()); channel.pipeline().addLast(new SimpleChannelInboundHandler<String>(){ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("收到響應:" + msg); } }); } }); //建立連接 ChannelFuture future = bootstrap.connect(host, port).sync(); //發送消息 future.channel().writeAndFlush("This is Request"); //等待服務監聽端口關閉 future.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } //測試代碼 public static void main(String[] args) { try { String host = "127.0.0.1"; int port = 8080; new StartClient(host, port).start(); } catch (Exception e) { e.printStackTrace(); } } }
4.參考文章
https://www.cnblogs.com/happy2010/p/10895209.html
https://www.jianshu.com/p/b60180a0a0e6