SpringBoot整合Netty


前言

本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行數據傳輸的相關內容。Protobuf會簡單的介紹下用法,至於Netty在之前的文章中已經簡單的介紹過了,這里就不再過多細說了。

Protobuf

介紹

protocolbuffer(以下簡稱PB)是google 的一種數據交換的格式,它獨立於語言,獨立於平台。google 提供了多種語言的實現:java、c#、c++、go 和python,每一種實現都包含了相應語言的編譯器以及庫文件。

由於它是一種二進制的格式,比使用 xml進行數據交換快許多。可以把它用於分布式應用之間的數據通信或者異構環境下的數據交換。作為一種效率和兼容性都很優秀的二進制數據傳輸格式,可以用於諸如網絡傳輸、配置文件、數據存儲等諸多領域。

官方地址:

https://github.com/google/protobuf

使用

這里的使用就只介紹Java相關的使用。首先我們需要建立一個proto文件,在該文件定義我們需要傳輸的文件。

例如我們需要定義一個用戶的信息,包含的字段主要有編號、名稱、年齡。

那么該protobuf文件的格式如下:

注:這里使用的是proto3,相關的注釋我已寫了,這里便不再過多講述了。需要注意一點的是proto文件和生成的Java文件名稱不能一致!

syntax = "proto3";
// 生成的包名
option java_package="com.pancm.protobuf";
//生成的java名
option java_outer_classname = "UserInfo";

message UserMsg {  

     // ID  
     int32 id = 1;  

    // 姓名  
     string name = 2;  

    // 年齡  
      int32 age = 3;  

     // 狀態 
     int32 state = 4;  
} 

創建好該文件之后,我們把該文件和protoc.exe(生成Java文件的軟件)放到E盤目錄下的protobuf文件夾下,然后再到該目錄的dos界面下輸入:protoc.exe --java_out=文件絕對路徑名稱。

例如:

protoc.exe --java_out=E:\protobuf User.proto

輸入完之后,回車即可在同級目錄看到已經生成好的Java文件,然后將該文件放到項目中該文件指定的路徑下即可。

注:生成protobuf的文件軟件和測試的protobuf文件我也整合到該項目中了,可以直接獲取的。

Java文件生成好之后,我們再來看怎么使用。

這里我就直接貼代碼了,並且將注釋寫在代碼中,應該更容易理解些。

代碼示例:

// 按照定義的數據結構,創建一個對象  
        UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder();  
        userInfo.setId(1);
        userInfo.setName("xuwujing");
        userInfo.setAge(18);
        UserInfo.UserMsg userMsg = userInfo.build();  
        // 將數據寫到輸出流 
        ByteArrayOutputStream output = new ByteArrayOutputStream();  
        userMsg.writeTo(output);  
        // 將數據序列化后發送 
        byte[] byteArray = output.toByteArray();  
        // 接收到流並讀取
        ByteArrayInputStream input = new ByteArrayInputStream(byteArray);  
        // 反序列化  
        UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input);  
        System.out.println("id:" + userInfo2.getId());  
        System.out.println("name:" + userInfo2.getName());  
        System.out.println("age:" + userInfo2.getAge());  

注:這里說明一點,因為protobuf是通過二進制進行傳輸,所以需要注意下相應的編碼。還有使用protobuf也需要注意一下一次傳輸的最大字節長度。

輸出結果:

id:1
name:xuwujing
age:18
SpringBoot整合Netty

說明:如果想直接獲取工程那么可以直接跳到底部,通過鏈接下載工程代碼。

  • 開發准備

  • 環境要求

  • JDK:1.8

  • Netty: 4.0或以上(不包括5)

  • Protobuf:3.0或以上

如果對Netty不熟的話,可以看看這些文章。大神請無視~。~

https://blog.csdn.net/column/details/17640.html

首先還是Maven的相關依賴:

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <netty.version>4.1.22.Final</netty.version>
        <protobuf.version>3.5.1</protobuf.version>
        <springboot>1.5.9.RELEASE</springboot>
        <fastjson>1.2.41</fastjson>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>${springboot}</version>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>${springboot}</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <version>${springboot}</version>
            <optional>true</optional>
        </dependency>


        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>${protobuf.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson}</version>
        </dependency>


    <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency> 
</dependencies>

添加了相應的maven依賴之后,配置文件這塊暫時沒有什么可以添加的,因為暫時就一個監聽的端口而已。

代碼編寫

代碼模塊主要分為服務端和客戶端。

主要實現的業務邏輯:

服務端啟動成功之后,客戶端也啟動成功,這時服務端會發送一條protobuf格式的信息給客戶端,然后客戶端給予相應的應答。客戶端與服務端連接成功之后,客戶端每個一段時間會發送心跳指令給服務端,告訴服務端該客戶端還存過中,如果客戶端沒有在指定的時間發送信息,服務端會關閉與該客戶端的連接。當客戶端無法連接到服務端之后,會每隔一段時間去嘗試重連,只到重連成功!

服務端

首先是編寫服務端的啟動類,相應的注釋在代碼中寫得很詳細了,這里也不再過多講述了。不過需要注意的是,在之前的我寫的Netty文章中,是通過main方法直接啟動服務端,因此是直接new一個對象的。而在和SpringBoot整合之后,我們需要將Netty交給springBoot去管理,所以這里就用了相應的注解。

代碼如下:

@Service("nettyServer")
public class NettyServer {
    private static final int port = 9876; // 設置服務端端口
    private static EventLoopGroup boss = new NioEventLoopGroup(); // 通過nio方式來接收連接和處理連接
    private static EventLoopGroup work = new NioEventLoopGroup(); // 通過nio方式來接收連接和處理連接
    private static ServerBootstrap b = new ServerBootstrap();

    @Autowired
    private NettyServerFilter nettyServerFilter;


    public void run() {
        try {
            b.group(boss, work);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(nettyServerFilter); // 設置過濾器
            // 服務器綁定端口監聽
            ChannelFuture f = b.bind(port).sync();
            System.out.println("服務端啟動成功,端口是:" + port);
            // 監聽服務器關閉監聽
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 關閉EventLoopGroup,釋放掉所有資源包括創建的線程
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

服務端主類編寫完畢之后,我們再來設置下相應的過濾條件。

這里需要繼承Netty中ChannelInitializer類,然后重寫initChannel該方法,進行添加相應的設置,如心跳超時設置,傳輸協議設置,以及相應的業務實現類。

代碼如下:

  @Component
     public class NettyServerFilter extends ChannelInitializer<SocketChannel> {

    @Autowired
    private NettyServerHandler nettyServerHandler;

     @Override
     protected void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline ph = ch.pipeline();

         //入參說明: 讀超時時間、寫超時時間、所有類型的超時時間、時間格式
         ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
         // 解碼和編碼,應和客戶端一致
         //傳輸的協議 Protobuf
         ph.addLast(new ProtobufVarint32FrameDecoder());
         ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance()));
         ph.addLast(new ProtobufVarint32LengthFieldPrepender());
         ph.addLast(new ProtobufEncoder());

         //業務邏輯實現類
         ph.addLast("nettyServerHandler", nettyServerHandler);
       }
     }

服務相關的設置的代碼寫完之后,我們再來編寫主要的業務代碼。

使用Netty編寫業務層的代碼,我們需要繼承ChannelInboundHandlerAdapter 或SimpleChannelInboundHandler類,在這里順便說下它們兩的區別吧。

繼承SimpleChannelInboundHandler類之后,會在接收到數據后會自動release掉數據占用的Bytebuffer資源。並且繼承該類需要指定數據格式。

而繼承ChannelInboundHandlerAdapter則不會自動釋放,需要手動調用ReferenceCountUtil.release()等方法進行釋放。繼承該類不需要指定數據格式。
所以在這里,個人推薦服務端繼承ChannelInboundHandlerAdapter,手動進行釋放,防止數據未處理完就自動釋放了。而且服務端可能有多個客戶端進行連接,並且每一個客戶端請求的數據格式都不一致,這時便可以進行相應的處理。

客戶端根據情況可以繼承SimpleChannelInboundHandler類。好處是直接指定好傳輸的數據格式,就不需要再進行格式的轉換了。

代碼如下:

@Service("nettyServerHandler")
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /** 空閑次數 */
    private int idle_count = 1;
    /** 發送次數 */
    private int count = 1;


    /**
     * 建立連接時,發送一條消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("連接的客戶端地址:" + ctx.channel().remoteAddress());
        UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0)
                .build();
        ctx.writeAndFlush(userMsg);
        super.channelActive(ctx);
    }

    /**
     * 超時處理 如果5秒沒有接受客戶端的心跳,就觸發; 如果超過兩次,則直接關閉;
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.READER_IDLE.equals(event.state())) { // 如果讀通道處於空閑狀態,說明沒有接收到心跳命令
                System.out.println("已經5秒沒有接收到客戶端的信息了");
                if (idle_count > 1) {
                    System.out.println("關閉這個不活躍的channel");
                    ctx.channel().close();
                }
                idle_count++;
            }
        } else {
            super.userEventTriggered(ctx, obj);
        }
    }

    /**
     * 業務邏輯處理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("第" + count + "次" + ",服務端接受的消息:" + msg);
        try {
            // 如果是protobuf類型的數據
          if (msg instanceof UserMsg) {
                UserInfo.UserMsg userState = (UserInfo.UserMsg) msg;
                if (userState.getState() == 1) {
                    System.out.println("客戶端業務處理成功!");
                } else if(userState.getState() == 2){
                    System.out.println("接受到客戶端發送的心跳!");
                }else{
                    System.out.println("未知命令!");
                }
            } else {
                System.out.println("未知數據!" + msg);
                return;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
        count++;
    }

    /**
     * 異常處理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

還有個服務端的啟動類,之前是通過main方法直接啟動, 不過這里改成了通過springBoot進行啟動,差別不大。

代碼如下:

@SpringBootApplication
public class NettyServerApp {

    public static void main(String[] args) {
        // 啟動嵌入式的 Tomcat 並初始化 Spring 環境及其各 Spring 組件
        ApplicationContext context = SpringApplication.run(NettyServerApp.class, args);
        NettyServer nettyServer = context.getBean(NettyServer.class);
        nettyServer.run();
    }

}

到這里服務端相應的代碼就編寫完畢了。

客戶端

客戶端這邊的代碼和服務端的很多地方都類似,我就不再過多細說了,主要將一些不同的代碼拿出來簡單的講述下。

首先是客戶端的主類,基本和服務端的差不多,也就是多了監聽的端口和一個監聽器(用來監聽是否和服務端斷開連接,用於重連)。

主要實現的代碼邏輯如下:

    public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) {
        ChannelFuture f = null;
        try {
            if (bootstrap != null) {
                bootstrap.group(eventLoopGroup);
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
                bootstrap.handler(nettyClientFilter);
                bootstrap.remoteAddress(host, port);
                f = bootstrap.connect().addListener((ChannelFuture futureListener) -> {
                    final EventLoop eventLoop = futureListener.channel().eventLoop();
                    if (!futureListener.isSuccess()) {
                        System.out.println("與服務端斷開連接!在10s之后准備嘗試重連!");
                        eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);
                    }
                });
                if(initFalg){
                    System.out.println("Netty客戶端啟動成功!");
                    initFalg=false;
                }
                // 阻塞
                f.channel().closeFuture().sync();
            }
        } catch (Exception e) {
            System.out.println("客戶端連接失敗!"+e.getMessage());
        }
    }

注:監聽器這塊的實現用的是JDK1.8的寫法。

客戶端過濾其這塊基本和服務端一直。不過需要注意的是,傳輸協議、編碼和解碼應該一致,還有心跳的讀寫時間應該小於服務端所設置的時間。

改動的代碼如下:

   ChannelPipeline ph = ch.pipeline();
        /*
         * 解碼和編碼,應和服務端一致
         * */
        //入參說明: 讀超時時間、寫超時時間、所有類型的超時時間、時間格式
        ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS)); 

客戶端的業務代碼邏輯。

主要實現的幾點邏輯是心跳按時發送以及解析服務發送的protobuf格式的數據。

這里比服務端多個個注解, 該注解Sharable主要是為了多個handler可以被多個channel安全地共享,也就是保證線程安全。

廢話就不多說了,代碼如下:

 @Service("nettyClientHandler")
    @ChannelHandler.Sharable
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private NettyClient nettyClient;

    /** 循環次數 */
    private int fcount = 1;

    /**
     * 建立連接時
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("建立連接時:" + new Date());
        ctx.fireChannelActive();
    }

    /**
     * 關閉連接時
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("關閉連接時:" + new Date());
        final EventLoop eventLoop = ctx.channel().eventLoop();
        nettyClient.doConnect(new Bootstrap(), eventLoop);
        super.channelInactive(ctx);
    }

    /**
     * 心跳請求處理 每4秒發送一次心跳請求;
     * 
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {
        System.out.println("循環請求的時間:" + new Date() + ",次數" + fcount);
        if (obj instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) obj;
            if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果寫通道處於空閑狀態,就發送心跳命令
                UserMsg.Builder userState = UserMsg.newBuilder().setState(2);
                ctx.channel().writeAndFlush(userState);
                fcount++;
            }
        }
    }

    /**
     * 業務邏輯處理
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果不是protobuf類型的數據
        if (!(msg instanceof UserMsg)) {
            System.out.println("未知數據!" + msg);
            return;
        }
        try {

            // 得到protobuf的數據
            UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg;
            // 進行相應的業務處理。。。
            // 這里就從簡了,只是打印而已
            System.out.println(
                    "客戶端接受到的用戶信息。編號:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年齡:" + userMsg.getAge());

            // 這里返回一個已經接受到數據的狀態
            UserMsg.Builder userState = UserMsg.newBuilder().setState(1);
            ctx.writeAndFlush(userState);
            System.out.println("成功發送給服務端!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            ReferenceCountUtil.release(msg);
        }
     }
    }

那么到這里客戶端的代碼也編寫完畢了。

功能測試

首先啟動服務端,然后再啟動客戶端。

我們來看看結果是否如上述所說。

服務端輸出結果:

服務端啟動成功,端口是:9876
連接的客戶端地址:/127.0.0.1:53319
第1次,服務端接受的消息:state: 1

客戶端業務處理成功!
第2次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第3次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第4次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!

客戶端輸入結果:

Netty客戶端啟動成功!
建立連接時:Mon Jul 16 23:31:58 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發送給服務端!
循環請求的時間:Mon Jul 16 23:32:02 CST 2018,次數1
循環請求的時間:Mon Jul 16 23:32:06 CST 2018,次數2
循環請求的時間:Mon Jul 16 23:32:10 CST 2018,次數3
循環請求的時間:Mon Jul 16 23:32:14 CST 2018,次數4

通過打印信息可以看出如上述所說。

接下來我們再來看看客戶端是否能夠實現重連。

先啟動客戶端,再啟動服務端。

客戶端輸入結果:

Netty客戶端啟動成功!
與服務端斷開連接!在10s之后准備嘗試重連!
客戶端連接失敗!AbstractChannel$CloseFuture@1fbaa3ac(incomplete)
建立連接時:Mon Jul 16 23:41:33 CST 2018
客戶端接受到的用戶信息。編號:1,姓名:xuwujing,年齡:18
成功發送給服務端!
循環請求的時間:Mon Jul 16 23:41:38 CST 2018,次數1
循環請求的時間:Mon Jul 16 23:41:42 CST 2018,次數2
循環請求的時間:Mon Jul 16 23:41:46 CST 2018,次數3

服務端輸出結果:

服務端啟動成功,端口是:9876
連接的客戶端地址:/127.0.0.1:53492
第1次,服務端接受的消息:state: 1

客戶端業務處理成功!
第2次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第3次,服務端接受的消息:state: 2

接受到客戶端發送的心跳!
第4次,服務端接受的消息:state: 2

結果也如上述所說!

其它

關於SpringBoot整合Netty使用Protobuf進行數據傳輸到這里就結束了。

SpringBoot整合Netty使用Protobuf進行數據傳輸的項目工程地址:

https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf

對了,也有不使用springBoot整合的Netty項目工程地址:

https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM