Netty筆記(4) - 對Http和WebSocket的支持、心跳檢測機制


對HTTP的支持

服務端代碼:

向 PipeLine中 注冊 HttpServerCodec Http協議的編碼解碼一體的Handler 處理Http請求 封裝Http響應

public class TestServer {
    public static void main(String[] args) throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>{
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {

                        //向管道加入處理器
                        //得到管道
                        ChannelPipeline pipeline = ch.pipeline();

                        //加入一個netty 提供的httpServerCodec codec =>[coder - decoder]
                        //HttpServerCodec 說明
                        //1. HttpServerCodec 是netty 提供的處理http的 編-解碼器
                        pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
                        //2. 增加一個自定義的handler
                        pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
                        System.out.println("ok~~~~");

                    }
                });

            ChannelFuture channelFuture = serverBootstrap.bind(6668).sync();
            
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

自定義Handler:

過濾 瀏覽器請求 favicon.ico 的請求 並回送信息

public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {


    //channelRead0 讀取客戶端數據
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        System.out.println("對應的channel=" + ctx.channel() + " pipeline=" + ctx
        .pipeline() + " 通過pipeline獲取channel" + ctx.pipeline().channel());

        System.out.println("當前ctx的handler=" + ctx.handler());

        //判斷 msg 是不是 httprequest請求
        if(msg instanceof HttpRequest) {

            System.out.println("msg 類型=" + msg.getClass());
            System.out.println("客戶端地址" + ctx.channel().remoteAddress());

            //獲取到
            HttpRequest httpRequest = (HttpRequest) msg;
            //獲取uri, 過濾指定的資源
            URI uri = new URI(httpRequest.uri());
            if("/favicon.ico".equals(uri.getPath())) {
                System.out.println("請求了 favicon.ico, 不做響應");
                return;
            }
            //回復信息給瀏覽器 [http協議]

            ByteBuf content = Unpooled.copiedBuffer("hello, 我是服務器", CharsetUtil.UTF_8);

            //構造一個http的相應,即 httpresponse
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);

            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
            response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());

            //將構建好 response返回
            ctx.writeAndFlush(response);

        }
    }
}

瀏覽器 地址欄輸入 http://127.0.0.1:6668 連接服務端 並收到服務端信息

對WebSocket 的支持

服務端代碼:

添加 將Http協議升級為 webSocket協議的攔截器 WebSocketServerProtocolHandler 並指定路徑

public class MyServer {
    public static void main(String[] args) throws Exception{


        //創建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();

                    //因為基於http協議,使用http的編碼和解碼器
                    pipeline.addLast(new HttpServerCodec());
                    //是以塊方式寫,添加ChunkedWriteHandler處理器
                    pipeline.addLast(new ChunkedWriteHandler());

                    /*
                    說明
                    1. http數據在傳輸過程中是分段, HttpObjectAggregator ,就是可以將多個段聚合
                    2. 這就就是為什么,當瀏覽器發送大量數據時,就會發出多次http請求
                     */
                    pipeline.addLast(new HttpObjectAggregator(8192));
                    /*
                    說明
                    1. 對應websocket ,它的數據是以 幀(frame) 形式傳遞
                    2. 可以看到WebSocketFrame 下面有六個子類
                    3. 瀏覽器請求時 ws://localhost:7000/hello 表示請求的uri
                    4. WebSocketServerProtocolHandler 核心功能是將 http協議升級為 ws協議 , 保持長連接
                     */
                    pipeline.addLast(new WebSocketServerProtocolHandler("/hello"));

                    //自定義的handler ,處理業務邏輯
                    pipeline.addLast(new MyTextWebSocketFrameHandler());
                }
            });

            //啟動服務器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服務端Handler:

websocket 協議中傳輸數據為數據幀 (TextWebSocketFrame)

//這里 TextWebSocketFrame 類型,表示一個文本幀(frame)
public class MyTextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        System.out.println("服務器收到消息 " + msg.text());
        //回復消息
        ctx.channel().writeAndFlush(new TextWebSocketFrame("服務器時間" + LocalDateTime.now() + " " + msg.text()));
    }

    //當web客戶端連接后, 觸發方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
        System.out.println("handlerAdded 被調用" + ctx.channel().id().asLongText());
        System.out.println("handlerAdded 被調用" + ctx.channel().id().asShortText());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        System.out.println("handlerRemoved 被調用" + ctx.channel().id().asLongText());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常發生 " + cause.getMessage());
        ctx.close(); //關閉連接
    }
}

前端html:

可以給客戶端發送信息 可以接受客戶端信息

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<script>
    var socket;
    //判斷當前瀏覽器是否支持websocket
    if(window.WebSocket) {
        socket = new WebSocket("ws://localhost:7000/hello");
        //相當於channelReado, ev 收到服務器端回送的消息
        socket.onmessage = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + ev.data;
        }

        //相當於連接開啟(感知到連接開啟)
        socket.onopen = function (ev) {
            var rt = document.getElementById("responseText");
            rt.value = "連接開啟了.."
        }

        //相當於連接關閉(感知到連接關閉)
        socket.onclose = function (ev) {

            var rt = document.getElementById("responseText");
            rt.value = rt.value + "\n" + "連接關閉了.."
        }
    } else {
        alert("當前瀏覽器不支持websocket")
    }
    //發送消息到服務器
    function send(message) {
        if(!window.socket) { //先判斷socket是否創建好
            return;
        }
        if(socket.readyState == WebSocket.OPEN) {
            //通過socket 發送消息
            socket.send(message)
        } else {
            alert("連接沒有開啟");
        }
    }
</script>
    <form onsubmit="return false">
        <textarea name="message" style="height: 300px; width: 300px"></textarea>
        <input type="button" value="發生消息" onclick="send(this.form.message.value)">
        <textarea id="responseText" style="height: 300px; width: 300px"></textarea>
        <input type="button" value="清空內容" onclick="document.getElementById('responseText').value=''">
    </form>
</body>
</html>

Netty 的心跳檢測機制

向pipeLine中加入心跳檢測的Handler ,監聽讀空閑 寫空閑 讀寫空閑,並設置時間.,如果在設定時間內沒有發生 讀寫事件, 則會產生一個相關事件,並傳遞到下一個 Handler 中 (自定義處理Handler)

服務端代碼:

心跳檢測Handler 在監聽到相應的事件后 會交由注冊的下一個Handler的userEventTriggered方法處理 ,這里注冊一個自定義Handler

public class MyServer {
    public static void main(String[] args) throws Exception{


        //創建兩個線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
        try {

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    //加入一個netty 提供 IdleStateHandler
                    /*
                    說明
                    1. IdleStateHandler 是netty 提供的處理空閑狀態的處理器
                    2. long readerIdleTime : 表示多長時間沒有讀, 就會發送一個心跳檢測包檢測是否連接
                    3. long writerIdleTime : 表示多長時間沒有寫, 就會發送一個心跳檢測包檢測是否連接
                    4. long allIdleTime : 表示多長時間沒有讀寫, 就會發送一個心跳檢測包檢測是否連接

                   
 *                  5. 當 IdleStateEvent 觸發后 , 就會傳遞給管道 的下一個handler去處理
 *                  通過調用(觸發)下一個handler 的 userEventTiggered , 在該方法中去處理 IdleStateEvent(讀空閑,寫空閑,讀寫空閑)
                     */
                    pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
                    //加入一個對空閑檢測進一步處理的handler(自定義)
                    pipeline.addLast(new MyServerHandler());
                }
            });

            //啟動服務器
            ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
            channelFuture.channel().closeFuture().sync();

        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

處理事件的Handler (userEventTriggered方法中處理) :

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     *
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        if(evt instanceof IdleStateEvent) {
            //將  evt 向下轉型 IdleStateEvent
            IdleStateEvent event = (IdleStateEvent) evt;
            String eventType = null;
            switch (event.state()) {
                case READER_IDLE:
                  eventType = "讀空閑";
                  break;
                case WRITER_IDLE:
                    eventType = "寫空閑";
                    break;
                case ALL_IDLE:
                    eventType = "讀寫空閑";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "--超時時間--" + eventType);
            System.out.println("服務器做相應處理..");
            //如果發生空閑,我們關閉通道
           // ctx.channel().close();
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM