場景
Netty的Socket編程詳解-搭建服務端與客戶端並進行數據傳輸:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/108615023
在此基礎上要實現多個客戶端之間通信,實現類似群聊或者聊天室的功能。
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。
實現
在上面實現的服務端與客戶端通信的基礎上,在src下新建com.badao.Char包,包下新建ChatServer類作為聊天室的服務端。
package com.badao.Chat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class ChatServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChatServerInitializer()); //綁定端口 ChannelFuture channelFuture = serverBootstrap.bind(70).sync(); channelFuture.channel().closeFuture().sync(); }finally { //關閉事件組 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
在上面中綁定70端口並添加了一個服務端的初始化器ChatServerInitializer
所以新建類ChatServerInitializer
package com.badao.Chat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class ChatServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ChatServerHandler()); } }
使其繼承ChannelInitializer,並重寫InitChannel方法,在方法中使用Netty自帶的處理器進行編碼的處理並最后添加一個自定義的處理器ChatServerHandler
新建處理器類ChatServerHandler
package com.badao.Chat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class ChatServerHandler extends SimpleChannelInboundHandler<String> { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(ch->{ if(channel!=ch) { ch.writeAndFlush(channel.remoteAddress()+"發送的消息:"+msg+"\n"); } else { ch.writeAndFlush("[自己]:"+msg+"\n"); } }); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"加入\n"); channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"離開\n"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"上線了\n"); System.out.println("當前在線人數:"+channelGroup.size()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"下線了\n"); System.out.println("當前在線人數:"+channelGroup.size()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
使處理器繼承SimpleChannelinboundHandler並重寫channelRead0方法。
在最上面聲明了一個通道組的通過 DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
獲取其單例,只要是建立連接的客戶端都會自動添加進此通道組中。
然后只要是客戶端與服務端發送消息后就會執行該方法。
在此方法中直接遍歷通道組,判斷通道組里面的每一個客戶端是不是當前發消息的客戶端。
如果是就顯示自己發送消息,如果不是則獲取遠程地址並顯示發送消息。
然后就是實現客戶端的上線功能以及在線人數統計的功能。
在上面的處理器中重寫channelActive方法,此方法會在通道激活即建立連接后調用
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"上線了\n"); System.out.println("當前在線人數:"+channelGroup.size()); }
同理重寫channelInactive方法,此方法會在斷掉連接后調用
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println(channel.remoteAddress()+"下線了\n"); System.out.println("當前在線人數:"+channelGroup.size()); }
然后就是實現向所有的客戶端廣播新建客戶端加入聊天室的功能
重寫handlerAdded方法,此方法會在將通道添加到通道組中調用,所以在此方法中獲取加入到通道組的遠程地址
並使用channelGroup的writeAndFlush方法就能實現向所有建立連接的客戶端發送消息,新的客戶端剛上線時不用向自己
發送上線消息,所以在廣播完上線消息后再講此channel添加到channelGroup中。
@Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"加入\n"); channelGroup.add(channel); }
同理實現下線提醒需要重寫handlerRemoved方法
@Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[服務器]:"+channel.remoteAddress()+"離開\n"); }
但是此方法中不用手動從channelGroup中手動去掉channel,因為Netty會自動將其移除掉。
服務端搭建完成之后再搭建客戶端,新建ChatClient類並編寫main方法,在main方法中
package com.badao.Chat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader; import java.io.InputStreamReader; public class ChatClient { public static void main(String[] args) throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class) .handler(new ChatClientInitializer()); //綁定端口 Channel channel = bootstrap.connect("localhost", 70).channel(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for(;;) { channel.writeAndFlush(br.readLine()+"\r\n"); } } finally { //關閉事件組 eventLoopGroup.shutdownGracefully(); } } }
在客戶端中讀取輸入的內容並在一個無限循環中將輸入的內容發送至服務端。
在Client中建立對服務端的連接同理也要設置一個初始化器ChatClientInitializer
新建初始化器的類ChatClientInitializer
package com.badao.Chat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; public class ChatClientInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter())); pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast(new ChatClientHandler()); } }
使用Netty自帶的處理器對編碼進行處理並添加一個自定義的處理器ChatClientHandler
新建類ChatClientHandler
package com.badao.Chat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); } }
在重寫的channelRead0方法中只需要將收到的消息進行輸出即可。
現在運行服務端的main方法
為了能運行多個客戶端在IDEA中客戶端編輯
然后將下面的勾選上
然后首先運行一個客戶端
那么在服務端中就會輸出上線的客戶端以及在線人數
再次運行客戶端的main方法,此時服務端會輸出兩個客戶端上線
同時在第二個客戶端上線時第一個客戶端會收到加入的提示
此時停掉第二個客戶端即將第二個客戶端下線
服務端會提示下線並更新在線人數
同時在第一個客戶端會收到服務端的推送
再運行第二個客戶端,並在控制台輸入消息,回車發送
此時第一個客戶端就會收到第二個客戶端發送的消息。
然后第一個客戶端再輸入一個消息並回車
那么第二個客戶端也能收到消息
示例代碼下載:
https://download.csdn.net/download/BADAO_LIUMANG_QIZHI/12850228