Netty搭建服務端的簡單應用


Netty簡介

Netty是由JBOSS提供的一個java開源框架,現為 Github上的獨立項目。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
也就是說,Netty 是一個基於NIO的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶、服務端應用。Netty相當於簡化和流線化了網絡應用的編程開發過程,例如:基於TCP和UDP的socket服務開發。
“快速”和“簡單”並不用產生維護性或性能上的問題。Netty 是一個吸收了多種協議(包括FTP、SMTP、HTTP等各種二進制文本協議)的實現經驗,並經過相當精心設計的項目。最終,Netty 成功的找到了一種方式,在保證易於開發的同時還保證了其應用的性能,穩定性和伸縮性。

說明:本文只介紹netty框架最基本的應用,而且是每次客戶端請求完畢會關閉連接,后續會寫一篇客戶端先與服務端建立連接,然后一條條發送數據,發送完畢主動關閉連接的博客。

Netty搭建WebSocket服務端

Netty服務端

1.引入依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version> <!-- 我這里用的1.5.9 -->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.blaze</groupId>
    <artifactId>netty-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>netty-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>

        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.50</version>
        </dependency>

        <!--netty依賴-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.43.Final</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.blaze.nettydemo.server.NettyServer</mainClass>
                </configuration>
            </plugin>
        </plugins>
        <finalName>netty-demo</finalName>
    </build>

</project>

2.Netty服務端

NettyServer

package com.blaze.nettydemo.server;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.springframework.stereotype.Component;

/**
 * create by zy 2019/11/15 9:14
 * TODO
 */
@Component
public class NettyServer {
    public static void main(String[] args) {
        int port = 9898;
        new NettyServer().bind(port);
    }

    public void bind(int port) {
        /**
         * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
         * 配置服務端的 NIO 線程池,用於網絡事件處理,實質上他們就是 Reactor 線程組
         * bossGroup 用於服務端接受客戶端連接,workerGroup 用於進行 SocketChannel 網絡讀寫
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            /**
             * ServerBootstrap 是 Netty 用於啟動 NIO 服務端的輔助啟動類,用於降低開發難度
             */
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            /**服務器啟動輔助類配置完成后,調用 bind 方法綁定監聽端口,調用 sync 方法同步等待綁定操作完成*/
            ChannelFuture f = b.bind(port).sync();

            System.out.println(Thread.currentThread().getName() + ",服務器開始監聽端口,等待客戶端連接.........");

            /**下面會進行阻塞,等待服務器連接關閉之后 main 方法退出,程序結束*/
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            /**優雅退出,釋放線程池資源*/
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 初始化連接
     */
    @Component
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        public void initChannel(SocketChannel socketChannel) throws Exception {
            /**
             * 設置 netty 服務端的 handler
             */
            socketChannel.pipeline().addLast(new NettyServerHandler());

            /**
             * 如果使用 netty 搭建 http 服務端,則用下面三個設置代替上面一個設置
             */
            //socketChannel.pipeline().addLast(new HttpServerCodec());// http 編解碼
            //socketChannel.pipeline().addLast("httpAggregator", new HttpObjectAggregator(512 * 1024)); // http 消息聚合器
            //socketChannel.pipeline().addLast(new HttpServerHandler());
        }
    }
}

NettyServerHandler

package com.blaze.nettydemo.server;

import com.alibaba.fastjson.JSON;
import com.blaze.nettydemo.model.RequestModel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Component;

/**
 * create by zy 2019/11/15 10:06
 * TODO
 */
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 收到客戶端消息,自動觸發
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        /**
         * 將 msg 轉為 Netty 的 ByteBuf 對象,類似 JDK 中的 java.nio.ByteBuffer,不過 ButeBuf 功能更強,更靈活
         */
        ByteBuf buf = (ByteBuf) msg;
        /**
         * readableBytes:獲取緩沖區可讀字節數,然后創建字節數組
         * 從而避免了像 java.nio.ByteBuffer 時,只能盲目的創建特定大小的字節數組,比如 1024
         */
        byte[] reg = new byte[buf.readableBytes()];
        /**
         * readBytes:將緩沖區字節數組復制到新建的 byte 數組中
         * 然后將字節數組轉為字符串
         */
        buf.readBytes(reg);
        String body = new String(reg, "UTF-8");
        System.out.println(Thread.currentThread().getName() + ",The server receive  order : " + body);

        String respMsg = "I am Server, success!";

        /**
         * 業務處理代碼 此處省略
         * ......
         */


        /**
         * 回復消息
         * copiedBuffer:創建一個新的緩沖區,內容為里面的參數
         * 通過 ChannelHandlerContext 的 write 方法將消息異步發送給客戶端
         */
        ByteBuf respByteBuf = Unpooled.copiedBuffer(respMsg.getBytes());
        ctx.write(respByteBuf);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        /**
         * flush:將消息發送隊列中的消息寫入到 SocketChannel 中發送給對方,為了頻繁的喚醒 Selector 進行消息發送
         * Netty 的 write 方法並不直接將消息寫如 SocketChannel 中,調用 write 只是把待發送的消息放到發送緩存數組中,再通過調用 flush
         * 方法,將發送緩沖區的消息全部寫入到 SocketChannel 中
         */
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**當發生異常時,關閉 ChannelHandlerContext,釋放和它相關聯的句柄等資源 */
        ctx.close();

    }
}

3.Netty客戶端

NettyClient

package com.blaze.nettydemo.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.springframework.stereotype.Component;

/**
 * create by zy 2019/11/15 10:08
 * TODO
 */
@Component
public class NettyClient {
    /**
     * 使用 3 個線程模擬三個客戶端
     */
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new Thread(new MyThread()).start();
        }
    }

    static class MyThread implements Runnable {
        /**服務端 ip 及端口*/
        @Override
        public void run() {
            connect("193.168.19.25", 9898);
        }

        public void connect(String host, int port) {
            /**配置客戶端 NIO 線程組/池*/
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                /**
                 * Bootstrap 與 ServerBootstrap 都繼承(extends)於 AbstractBootstrap
                 * 創建客戶端輔助啟動類,並對其配置,與服務器稍微不同,這里的 Channel 設置為 NioSocketChannel
                 * 然后為其添加 Handler,這里直接使用匿名內部類,實現 initChannel 方法
                 * 作用是當創建 NioSocketChannel 成功后,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡I/O事件
                 */
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyClientHandler());
                            }
                        });

                /**connect:發起異步連接操作,調用同步方法 sync 等待連接成功*/
                ChannelFuture channelFuture = b.connect(host, port).sync();
                System.out.println(Thread.currentThread().getName() + ",客戶端發起異步連接..........");

                /**等待客戶端鏈路關閉*/
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                /**優雅退出,釋放NIO線程組*/
                group.shutdownGracefully();
            }
        }

    }
}

NettyClientHandler

package com.blaze.nettydemo.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.springframework.stereotype.Component;

import java.util.logging.Logger;


/**
 * create by zy 2019/11/15 10:09
 * TODO
 */
@Component
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private static final Logger logger = Logger.getLogger(NettyClientHandler.class.getName());
    /**
     * 當客戶端和服務端 TCP 鏈路建立成功之后,Netty 的 NIO 線程會調用 channelActive 方法
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String reqMsg = "客戶端請求服務端發送的數據";

        byte[] reqMsgByte = reqMsg.getBytes("UTF-8");
        ByteBuf reqByteBuf = Unpooled.buffer(reqMsgByte.length);
        /**
         * writeBytes:將指定的源數組的數據傳輸到緩沖區
         * 調用 ChannelHandlerContext 的 writeAndFlush 方法將消息發送給服務器
         */
        reqByteBuf.writeBytes(reqMsgByte);
        ctx.writeAndFlush(reqByteBuf);
    }

    /**
     * 當服務端返回應答消息時,channelRead 方法被調用,從 Netty 的 ByteBuf 中讀取並打印應答消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println(Thread.currentThread().getName() + ",Server return Message:" + body);
        ctx.close();
    }

    /**
     * 當發生異常時,打印異常 日志,釋放客戶端資源
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        /**釋放資源*/
        logger.warning("Unexpected exception from downstream : " + cause.getMessage());
        ctx.close();

    }
}

 

4.Netty Http服務端

NettyServer

package com.blaze.nettydemo.server;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import org.springframework.stereotype.Component;

/**
 * create by zy 2019/11/15 9:14
 * TODO
 */
@Component
public class NettyServer {
    public static void main(String[] args) {
        int port = 9898;
        new NettyServer().bind(port);
    }

    public void bind(int port) {
        /**
         * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService
         * 配置服務端的 NIO 線程池,用於網絡事件處理,實質上他們就是 Reactor 線程組
         * bossGroup 用於服務端接受客戶端連接,workerGroup 用於進行 SocketChannel 網絡讀寫
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            /**
             * ServerBootstrap 是 Netty 用於啟動 NIO 服務端的輔助啟動類,用於降低開發難度
             */
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

            /**服務器啟動輔助類配置完成后,調用 bind 方法綁定監聽端口,調用 sync 方法同步等待綁定操作完成*/
            ChannelFuture f = b.bind(port).sync();

            System.out.println(Thread.currentThread().getName() + ",服務器開始監聽端口,等待客戶端連接.........");

            /**下面會進行阻塞,等待服務器連接關閉之后 main 方法退出,程序結束*/
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            /**優雅退出,釋放線程池資源*/
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 初始化連接
     */
    @Component
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
        @Override
        public void initChannel(SocketChannel socketChannel) throws Exception {
            /**
             * 設置 netty 服務端的 handler
             */
            //socketChannel.pipeline().addLast(new NettyServerHandler());

            /**
             * 如果使用 netty 搭建 http 服務端,則用下面三個設置代替上面一個設置
             */
            socketChannel.pipeline().addLast(new HttpServerCodec());// http 編解碼
            socketChannel.pipeline().addLast("httpAggregator", new HttpObjectAggregator(512 * 1024)); // http 消息聚合器
            socketChannel.pipeline().addLast(new HttpServerHandler());
        }
    }
}

HttpServerHandler

package com.blaze.nettydemo.server;

import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;
import org.springframework.stereotype.Component;


/**
 * create by zy 2019/11/19 9:21
 * TODO
 */
@Component
public class HttpServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if (msg instanceof FullHttpRequest) {
            FullHttpRequest req = (FullHttpRequest) msg;

            try {
                // 1.獲取URI
                String uri = req.uri();
                System.out.println("uri:" + uri);
                // 2.獲取請求體
                ByteBuf buf = req.content();
                String content = buf.toString(CharsetUtil.UTF_8);

                // 3.根據請求的方法uri不同處理不同的邏輯
                Object rc = new Object();
                switch (uri) {
                    case "/test1":
                        // ......
                        break;
                    case "/ltest2":
                        // ......
                        break;
                    default:
                        break;
                }
                // 4.返回結果
                response(ctx, rc);
            } finally {
                req.release();
            }
        }
    }

    private void response(ChannelHandlerContext ctx, Object c) {

        // 1.設置響應
        FullHttpResponse resp = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                HttpResponseStatus.OK,
                Unpooled.copiedBuffer(JSONObject.toJSONString(c), CharsetUtil.UTF_8));
        resp.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
        // 2.發送
        // 注意必須在使用完之后,close channel
        ctx.writeAndFlush(resp).addListener(ChannelFutureListener.CLOSE);
    }
}

Http客戶端,使用postman請求服務端進行測試即可。

 

本文參考:https://blog.csdn.net/wangmx1993328/article/details/83036285

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM