實例要求:
1) 編寫一個 Netty 群聊系統, 實現服務器端和客戶端之間的數據簡單通訊(非阻塞)
2) 實現多人群聊
3) 服務器端: 可以監測用戶上線, 離線, 並實現消息轉發功能
4) 客戶端: 通過 channel 可以無阻塞發送消息給其它所有用戶, 同時可以接受其它用戶發送的消息(有服務器轉發得到)
5) 目的: 進一步理解 Netty
代碼:
GroupChatServer

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class GroupChatServer { private int port; //監聽端口 public GroupChatServer(int port) { this.port = port; } //編寫run方法,處理客戶端的請求 public void run() throws Exception{ //創建兩個線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //獲取到pipeline ChannelPipeline pipeline = ch.pipeline(); //向pipeline加入解碼器 pipeline.addLast("decoder", new StringDecoder()); //向pipeline加入編碼器 pipeline.addLast("encoder", new StringEncoder()); //加入自己的業務處理handler pipeline.addLast(new GroupChatServerHandler()); } }); System.out.println("netty 服務器啟動"); ChannelFuture channelFuture = b.bind(port).sync(); //監聽關閉 channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatServer(7000).run(); } }
GroupChatServerHandler

import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> { //public static List<Channel> channels = new ArrayList<Channel>(); //使用一個hashmap 管理 //public static Map<String, Channel> channels = new HashMap<String,Channel>(); //定義一個channle 組,管理所有的channel //GlobalEventExecutor.INSTANCE) 是全局的事件執行器,是一個單例 private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //handlerAdded 表示連接建立,一旦連接,第一個被執行 //將當前channel 加入到 channelGroup @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //將該客戶加入聊天的信息推送給其它在線的客戶端 /* 該方法會將 channelGroup 中所有的channel 遍歷,並發送 消息, 我們不需要自己遍歷 */ channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n"); channelGroup.add(channel); } //斷開連接, 將xx客戶離開信息推送給當前在線的客戶 @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 離開了\n"); System.out.println("channelGroup size" + channelGroup.size()); } //表示channel 處於活動狀態, 提示 xx上線 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 上線了~"); } //表示channel 處於不活動狀態, 提示 xx離線了 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println(ctx.channel().remoteAddress() + " 離線了~"); } //讀取數據 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //獲取到當前channel Channel channel = ctx.channel(); //這時我們遍歷channelGroup, 根據不同的情況,回送不同的消息 channelGroup.forEach(ch -> { if(channel != ch) { //不是當前的channel,轉發消息 ch.writeAndFlush("[客戶]" + channel.remoteAddress() + " 發送了消息" + msg + "\n"); }else {//回顯自己發送的消息給自己 ch.writeAndFlush("[自己]發送了消息" + msg + "\n"); } }); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //關閉通道 ctx.close(); } }
GroupChatClient

import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class GroupChatClient { //屬性 private final String host; private final int port; public GroupChatClient(String host, int port) { this.host = host; this.port = port; } public void run() throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //得到pipeline ChannelPipeline pipeline = ch.pipeline(); //加入相關handler pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); //加入自定義的handler pipeline.addLast(new GroupChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //得到channel Channel channel = channelFuture.channel(); System.out.println("-------" + channel.localAddress()+ "--------"); //客戶端需要輸入信息,創建一個掃描器 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //通過channel 發送到服務器端 channel.writeAndFlush(msg + "\r\n"); } }finally { group.shutdownGracefully(); } } public static void main(String[] args) throws Exception { new GroupChatClient("127.0.0.1", 7000).run(); } }
GroupChatClientHandler

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg.trim()); } }
User

public class User { private int id; private String pwd; }
Netty 心跳檢測機制案例
實例要求:
1) 編寫一個 Netty 心跳檢測機制案例, 當服務器超過 3 秒沒有讀時, 就提示讀空閑
2) 當服務器超過 5 秒沒有寫操作時, 就提示寫空閑
3) 實現當服務器超過 7 秒沒有讀或者寫操作時, 就提示讀寫空閑
4) 代碼如下: MyServer

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class MyServer { public static void main(String[] args) throws Exception{ //創建兩個線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一個netty 提供 IdleStateHandler /* 說明 1. IdleStateHandler 是netty 提供的處理空閑狀態的處理器 2. long readerIdleTime : 表示多長時間沒有讀, 就會發送一個心跳檢測包檢測是否連接 3. long writerIdleTime : 表示多長時間沒有寫, 就會發送一個心跳檢測包檢測是否連接 4. long allIdleTime : 表示多長時間沒有讀寫, 就會發送一個心跳檢測包檢測是否連接 5. 文檔說明 triggers an {@link IdleStateEvent} when a {@link Channel} has not performed * read, write, or both operation for a while. * 6. 當 IdleStateEvent 觸發后 , 就會傳遞給管道 的下一個handler去處理 * 通過調用(觸發)下一個handler 的 userEventTiggered , 在該方法中去處理 IdleStateEvent(讀空閑,寫空閑,讀寫空閑) */ pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS)); //加入一個對空閑檢測進一步處理的handler(自定義) pipeline.addLast(new MyServerHandler()); } }); //啟動服務器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyServerHandler

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; public class MyServerHandler extends ChannelInboundHandlerAdapter { /** * * @param ctx 上下文 * @param evt 事件 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent) { //將 evt 向下轉型 IdleStateEvent IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "讀空閑"; break; case WRITER_IDLE: eventType = "寫空閑"; break; case ALL_IDLE: eventType = "讀寫空閑"; break; } System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType); System.out.println("服務器做相應處理.."); //如果發生空閑,我們關閉通道 // ctx.channel().close(); } } }
Netty 通過 WebSocket 編程實現服務器和客戶端長連接
實例要求:
1) Http 協議是無狀態的, 瀏覽器和服務器間的請求響應一次, 下一次會重新創建連接.
2) 要求: 實現基於 webSocket 的長連接的全雙工的交互
3) 改變 Http 協議多次請求的約束, 實現長連接了, 服務器可以發送消息給瀏覽器
4) 客戶端瀏覽器和服務器端會相互感知, 比如服務器關閉了, 瀏覽器會感知, 同樣瀏覽器關閉了, 服務器會感知
5) 運行界面
MyServer:

import com.atguigu.netty.heartbeat.MyServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class MyServer { public static void main(String[] args) throws Exception{ //創建兩個線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //因為基於http協議,使用http的編碼和解碼器 pipeline.addLast(new HttpServerCodec()); //是以塊方式寫,添加ChunkedWriteHandler處理器 pipeline.addLast(new ChunkedWriteHandler()); /* 說明 1. http數據在傳輸過程中是分段, HttpObjectAggregator ,就是可以將多個段聚合 2. 這就就是為什么,當瀏覽器發送大量數據時,就會發出多次http請求 */ pipeline.addLast(new HttpObjectAggregator(8192)); /* 說明 1. 對應websocket ,它的數據是以 幀(frame) 形式傳遞 2. 可以看到WebSocketFrame 下面有六個子類 3. 瀏覽器請求時 ws://localhost:7000/hello 表示請求的uri 4. WebSocketServerProtocolHandler 核心功能是將 http協議升級為 ws協議 , 保持長連接 5. 是通過一個 狀態碼 101 */ pipeline.addLast(new WebSocketServerProtocolHandler("/hello2")); //自定義的handler ,處理業務邏輯 pipeline.addLast(new MyTextWebSocketFrameHandler()); } }); //啟動服務器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
MyTextWebSocketFrameHandler

import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.time.LocalDateTime; //這里 TextWebSocketFrame 類型,表示一個文本幀(frame) public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { System.out.println("服務器收到消息 " + msg.text()); //回復消息 ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器時間" + LocalDateTime.now() + " " + msg.text())); } //當web客戶端連接后, 觸發方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一 System.out.println("handlerAdded 被調用" + ctx.channel().id().asLongText()); System.out.println("handlerAdded 被調用" + ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被調用" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常發生 " + cause.getMessage()); ctx.close(); //關閉連接 } }
hello.html

<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <script> var socket; //判斷當前瀏覽器是否支持websocket if(window.WebSocket) { //go on socket = new WebSocket("ws://localhost:7000/hello2"); //相當於channelReado, ev 收到服務器端回送的消息 socket.onmessage = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + ev.data; } //相當於連接開啟(感知到連接開啟) socket.onopen = function (ev) { var rt = document.getElementById("responseText"); rt.value = "連接開啟了.." } //相當於連接關閉(感知到連接關閉) socket.onclose = function (ev) { var rt = document.getElementById("responseText"); rt.value = rt.value + "\n" + "連接關閉了.." } } else { alert("當前瀏覽器不支持websocket") } //發送消息到服務器 function send(message) { if(!window.socket) { //先判斷socket是否創建好 return; } if(socket.readyState == WebSocket.OPEN) { //通過socket 發送消息 socket.send(message) } else { alert("連接沒有開啟"); } } </script> <form onsubmit="return false"> <textarea name="message" style="height: 300px; width: 300px"></textarea> <input type="button" value="發生消息" onclick="send(this.form.message.value)"> <textarea id="responseText" style="height: 300px; width: 300px"></textarea> <input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''"> </form> </body> </html>