【概述】
實現一個網絡群聊工具。參與聊天的客戶端消息是通過服務端進行廣播的。
主要由兩塊組成:聊天服務器端(ChatServer)和聊天客戶端(ChatClient)。
聊天服務器(ChatServer)功能概述 :
1.監聽所有客戶端的接入、斷線
2.有客戶端A接入聊天室時,將接入消息發給除了客戶端A的其他客戶端
3.當客戶端A退出聊天室時,將退出消息發給除了客戶端A的其他客戶端
4.當客戶端A發送消息到聊天室時,將消息轉發給除了客戶端A的其他客戶端
聊天客戶端(ChatClient)功能概述 :
1.發送消息至聊天服務器
2.接收聊天服務器發送過來的所有消息
【pom依賴】
<!--netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency> <!--lombok依賴--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> <scope>provided</scope> </dependency>
【服務端啟動器 ChatServer 】
package com.test.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 群聊服務端 * */ public class ChatServer { private final int port; public ChatServer(int port) { this.port = port; } public void start() throws InterruptedException { EventLoopGroup parentGroup = new NioEventLoopGroup(); EventLoopGroup childGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { ChannelPipeline pipeline = sc.pipeline(); //添加一個基於行的解碼器 pipeline.addLast(new LineBasedFrameDecoder(2048)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ChatServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = serverBootstrap.bind(port).sync(); System.out.println("服務器已啟動"); future.channel().closeFuture().sync(); } finally { parentGroup.shutdownGracefully(); childGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new ChatServer(8888).start(); } }
【ChatServerHandler】
package com.test.server; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * 處理聊天服務器的各種情況 * */ public class ChatServerHandler extends ChannelInboundHandlerAdapter { // 創建一個ChannelGroup,其是一個線程安全的集合,其中存放着與當前服務器相連接的所有Active狀態的Channel // GlobalEventExecutor是一個單例、單線程的EventExecutor,是為了保證對當前group中的所有Channel的處理 // 線程是同一個線程 private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 只要有客戶端Channel給當前的服務端發送了消息,那么就會觸發該方法的執行 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 獲取到向服務器發送消息的channel Channel channel = ctx.channel(); // 這里要實現將消息廣播給所有group中的客戶端Channel // 發送給自己的消息與發送給大家的消息是不一樣的 group.forEach(ch -> { if (ch != channel) { ch.writeAndFlush(channel.remoteAddress() + ":" + msg + "\n"); } else { channel.writeAndFlush("me:" + msg + "\n"); } }); } // 只要有客戶端Channel與服務端連接成功就會執行這個方法 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 獲取到當前與服務器連接成功的channel Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + "---上線"); group.writeAndFlush(channel.remoteAddress() + "---上線\n"); // 將當前channel添加到group中 group.add(channel); } // 只要有客戶端Channel斷開與服務端的連接就會執行這個方法 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 獲取到當前要斷開連接的Channel Channel channel = ctx.channel(); System.out.println(channel.remoteAddress() + "------下線"); group.writeAndFlush(channel.remoteAddress() + "下線,當前在線人數:" + group.size() + "\n"); // group中存放的都是Active狀態的Channel,一旦某Channel的狀態不再是Active, // group會自動將其從集合中踢出,所以,下面的語句不用寫 // remove()方法的應用場景是,將一個Active狀態的channel移出group時使用 // group.remove(channel); } /** * 當Channel中的數據在處理過程中出現異常時會觸發該方法的執行 * * @param ctx 上下文 * @param cause 發生的異常對象 * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
【客戶端啟動類ChatClient】
package com.test.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; /** * 群聊客戶端 * */ public class ChatClient { private final String host; private final int port; public ChatClient(String host, int port) { this.host = host; this.port = port; } public void start() { NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); try { bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LineBasedFrameDecoder(2048)); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new ChatClientHandler()); } }); ChannelFuture future = bootstrap.connect(host, port).sync(); // 獲取鍵盤輸入 InputStreamReader is = new InputStreamReader(System.in, "UTF-8"); BufferedReader br = new BufferedReader(is); // 將輸入的內容寫入到Channel while (true) { //br.readLine()中執行fill()方法獲取輸入數據,獲取不到時會發生阻塞,直到獲取到數據為止 future.channel().writeAndFlush(br.readLine() + "\r\n"); } } catch (InterruptedException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) { new ChatClient("127.0.0.1", 8888).start(); } }
【ChatClientHandler】
package com.test.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 客戶端處理 * */ public class ChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
【運行結果】
[1.啟動群聊服務端]
[2.啟動群聊客戶端01]
[2.啟動群聊客戶端02]
[3.發送消息測試]
[4.下線測試]