首先,我們要明確幾件事。
聊天室需要具有什么功能?
1、存在n個未與Server建立連接的Client。當第一個Client與Server建立連接,緊接着第二個Client與Server建立連接后,Server會在控制台上
打印“xxx已上線”。當n個Client與Server建立連接之后,第n+1個Client建立連接后,Server會通知其他Client:“xxx已上線”。 ------聊天室的廣播機制
2、建立IO流,Client之間互為輸出流,Server作為消息轉發的載體,需要同步。 ------聊天室的消息回調機制
當連接一旦建立好,相應的處理器是HandlerAdded(ChannelHandlerContext ctx),所以要先獲取channel對象。但要想廣播出去,Server需要保存好所有建立連
接的channel對象。如何保存?用netty提供的channelGroup,定義這個實例,用DefaultChannelGroup對象實現。生成channelGroup對象后那channel對象add到
channelGroup中。在add之前,遍歷channelGroup的每個對象,就實現了廣播機制。
在使用channel.remoteAddress()方法拿到連接后,重寫SimpChannelHandler的子類方法channelRead0()即可。在channelRead0中,用forEach()遍歷channelGroup。
如果是別的Client通過readLine()發送的消息,則打印channel.remoteAddress()和msg,如果是自己發送的消息就只打印msg,這就實現了消息回調機制。
Server \
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * @description: 【聊天---服務器處理】 * @author: KlayHu * @create: 2019/10/8 17:23 **/
public class MyChatServerHandler extends SimpleChannelInboundHandler<String>{ //定義保存建立連接的Channel對象的實例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); channelGroup.forEach(ch->{ if(channel!=ch){ ch.writeAndFlush("【" + channel.remoteAddress() +"】" + "發送的消息:" + msg + "\n"); }else{ ch.writeAndFlush("【我:】" + msg + "\n"); } }); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("【服務器:】-" + channel.remoteAddress() + "=======已加入!=======\n"); //遍歷每一個channel對象,新連接的channel的遠程地址告訴別的client它加入了。
channelGroup.add(channel); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //獲取channel對象
channelGroup.writeAndFlush("【服務器:】-" + channel.remoteAddress() + "=======已離開!=======\n"); System.out.println(channelGroup.size()); //當有客戶端斷開連接的時候,沒有必要調用Remove,驗證一下。 //channelGroup.remove(channel); netty會自動調用
} @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("【" + channel.remoteAddress() + "】" + "====上線了===="); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); System.out.println("【" + channel.remoteAddress() + "】" + "====下線了===="); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { } }
Client \
import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader; import java.io.InputStreamReader; /** * @description: 【聊天---客戶端】 * @author: KlayHu * @create: 2019/10/8 18:40 **/
public class MyChatClient { public static void main(String[] args) throws Exception{ EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); try{ Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyChatClientInitializer()); Channel channel = bootstrap.connect("localhost",8888).sync().channel(); BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); for(;;){ channel.writeAndFlush(br.readLine() + "\r\n"); } }finally { eventLoopGroup.shutdownGracefully(); } } }
關於Server和Client的事件循環組和啟動配置類的建立在上一篇中與本篇寫法大致相同。netty的獨特性,就在於自定義的Handler。
【服務器】
【客戶端1】
【客戶端2】
【客戶端3】