netty+Protobuf (整合一)


 瘋狂創客圈,一個Java 高並發研習社群 【博客園 總入口 】
 瘋狂創客圈,傾力推出:  《Netty Zookeeper Redis 高並發實戰》  面試必備 + 面試必備 + 面試必備 的基礎原理+實戰書籍

書籍 


netty+Protobuf 整合實戰

瘋狂創客圈 死磕Netty 億級流量架構系列之12 【博客園 總入口

本文說明

本篇是 netty+Protobuf 整合實戰的 第一篇,完成一個 基於Netty + Protobuf 實戰案例。

要實現高並發、大流量,首先需要高傳輸效率的協議,Protobuf 是迄今為止最高性能之一的傳輸格式,我們首先將 Protobuf 和Netty整合起來。

本案例源代碼

源代碼下載鏈接: netty+protobuf (整合源代碼)

What is Protobuf ?

 
        

protocolbuffer(以下簡稱PB)是google 的一種數據交換的格式,它獨立於語言,獨立於平台。google 提供了多種語言的實現:java、c#、c++、go 和python,每一種實現都包含了相應語言的編譯器以及庫文件。由於它是一種二進制的格式,比使用 xml進行數據交換快許多。可以把它用於分布式應用之間的數據通信或者異構環境下的數據交換。作為一種效率和兼容性都很優秀的二進制數據傳輸格式,可以用於諸如網絡傳輸、配置文件、數據存儲等諸多領域。

Why Protobuf ?

Protobuf是由谷歌開源而來,在谷歌內部久經考驗。它將數據結構以.proto文件進行描述,通過代碼生成工具可以生成對應數據結構的POJO對象和Protobuf相關的方法和屬性。

特點如下:

  • 結構化數據存儲格式(XML,JSON等)

  • 高效的編解碼性能

  • 語言無關、平台無關、擴展性好

數據交互xml、json、protobuf格式比較
  1. json: 一般的web項目中,最流行的主要還是json。因為瀏覽器對於json數據支持非常好,有很多內建的函數支持。

  2. xml: 在webservice中應用最為廣泛,但是相比於json,它的數據更加冗余,因為需要成對的閉合標簽。json使用了鍵值對的方式,不僅壓縮了一定的數據空間,同時也具有可讀性。

  3. protobuf:是后起之秀,是谷歌開源的一種數據格式,適合高性能,對響應速度有要求的數據傳輸場景。因為profobuf是二進制數據格式,需要編碼和解碼。數據本身不具有可讀性。因此只能反序列化之后得到真正可讀的數據。

相對於其它protobuf更具有優勢
  1. 序列化后體積相比Json和XML很小,適合網絡傳輸

  2. 支持跨平台多語言

  3. 消息格式升級和兼容性還不錯

  4. 序列化反序列化速度很快,快於Json的處理速速

結論: 在一個需要大量的數據傳輸的場景中,如果數據量很大,那么選擇protobuf可以明顯的減少數據量,減少網絡IO,從而減少網絡傳輸所消耗的時間。

因而,對於打造一款高性能的通訊服務器來說,protobuf 傳輸格式,是最佳的解決方案。

windows 下安裝 protoc

1,去這里 https://github.com/google/protobuf/releases

下載對應的protoc,本實例使用的 zip文件是老版本: protoc-2.6.1-win32.zip (本人對老版本比較屬性,大家可以換成最新版本) 此工具在源代碼包中已經有,可以直接解壓縮源碼包,直接使用

2,下好之后解壓就行,然后把bin里面的 protoc.exe 加入到環境變量

3、或者,把protoc.exe拷貝到C:\Windows\System32

實戰第1步:proto文件的建立

前面講了那么多,都是一些知識鋪墊,和前期的准備。

整合protobuf 的第一步,是准備一個消息的協議文件。 協議文件的后綴名稱為 .proto , 該文件的定義我們需要傳輸的協議。實例如下:

//定義protobuf的包名稱空間
option java_package = "com.crazymakercircle.chat.common.bean.msg";
// 消息體名稱
option java_outer_classname = "ProtoMsg";
//.....
​
/*聊天消息*/
message MessageRequest{
    uint64 msg_id = 1;       //消息id
    string from = 2;     //發送方uId
    string to = 3;          //接收方uId
    uint64 time = 4;     //時間戳(單位:毫秒)
    required uint32 msg_type = 5; //消息類型  1:純文本  2:音頻 3:視頻 4:地理位置 5:其他
   string content = 6; //消息內容
    string url = 7;      //多媒體地址
    string property = 8;  //附加屬性
    string from_nick = 9; //發送者昵稱
    optional string json = 10;       //附加的json串
}

說明:

  • 協議文件中,主要定義了最終生成的Java 代碼 對應的包的名稱、類的名稱。分別使用 java_package、 java_outer_classname 來指定。

  • 協議文件中,每個具體的協議message對應於一個最終的Java類,協議的字段對應到類的屬性。

    實際上生成的Java代碼,遠遠不止這些。具體請參見源碼包。

關於的.proto文件的格式,請大家參考 史上最簡明的proto語法教程

關於的.proto消息的規則,請大家參考 史上最簡明的proto消息規則

實戰第2步:生成 proto 消息 Java代碼

創建好.proto文件之后,就需要按照好了對應版本的 protoc.exe工具protoc.exe工具是生成Java文件的工具軟件。 安裝的方法,前面已經講了。

這里需要提示一下版本。Java 的maven 配置文件中 proto 包的版本,和 .proto文件的版本, 以及生成java 代碼的protoc.exe的版本,三者需要一致。

下面開始生成 消息的 Java代碼。 需要用到下面的指令:

 
        

protoc.exe --java_out=輸出的Java文件路徑名稱   .proto文件路徑名稱

例如:

protoc.exe --java_out=./src/main/java/ ./proto/ProtoMsg.proto

輸入完之后,回車即可在目標目錄看到已經生成好的Java文件,然后將該文件放到項目中該文件指定的路徑下即可。

本案例的工程中,以及給大家准備好了.bat windows 的命令文件,在 .bat 目錄 下執行.bat 文件即可。 .bat 文件如下:

d:
cd D:\\crazymakercircleJava\nettydemo\chatcommon
protoc.exe --java_out=./src/main/java/ ./proto/ProtoMsg.proto

使用的時候,注意調整為實際的目錄。

加上對protobuf 的maven依賴

修改maven 的pom.xml文件,加上對protobuf 的依賴,代碼如下:

<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>${protobuf.version}</version>
</dependency>

實戰第3步:構建 ProtoMsg.Message 消息

生成代碼后,如果需要構建對應的消息,需要取得Java消息類型的 Builder 實例,在設置了Builder 實例的字段屬性值,然后執行 Builder 實例的build() 方法。

嵌套的消息,可以通過頂層消息的 buildPartial() 取得基礎部分的 Builder實例 ,然后再設置內嵌消息屬性,最后執行build() 方法。

比如: mb.buildPartial().toBuilder().setMessageRequest(cb).build();

具體如下面的例子所示:

/**
 * 基礎 Builder
 */
private static class BaseBuilder
{
    private  User user;
    protected ProtoMsg.HeadType type;
    private long seqId;
​
    public BaseBuilder(ProtoMsg.HeadType type,User user)
    {
        this.type = type;
        this.user=user;
    }
   /**
     * 構建消息 基礎部分
     */
    public ProtoMsg.Message buildPartial()
    {
        seqId = genSeqId();
​
        ProtoMsg.Message.Builder mb = ProtoMsg.Message.newBuilder()
                .setType(type)
                .setSequence(seqId)
                .setSessionId(user.getSessionId());
        return mb.buildPartial();
    }
​
}
​
​
/**
 * 聊天消息Builder
 */
private static class ChatMsgBuilder extends BaseBuilder
{
   //...
    public ProtoMsg.Message build()
    {
        //基礎部分
        ProtoMsg.Message message = buildPartial();
        //內嵌部分
        ProtoMsg.MessageRequest.Builder cb
                = ProtoMsg.MessageRequest.newBuilder();
        //組合起來,然后構建
     return message.toBuilder().setMessageRequest(cb).build();
    }
}
​

實戰第4步:編碼器

在發出ProtoMsg.Message 消息前,還需要對二進制消息進一步封裝。

使用2字節消息長度+Message(二進制數據)+(2字節CRC校驗(可選))

其中2字節的內容,只包含Message的長度,不包含自身和CRC的長度。如果需要也可以包含,當要記得通信雙方必須一致。

編碼器如下:

​
public class ProtobufEncoder extends MessageToByteEncoder<ProtoMsg.Message> {
​
   @Override
   protected void encode(ChannelHandlerContext ctx, ProtoMsg.Message msg, ByteBuf out)
         throws Exception {
      byte[] bytes = msg.toByteArray();// 將對象轉換為byte
      int length = bytes.length;// 讀取消息的長度
​
      ByteBuf buf = Unpooled.buffer(2 + length);
      buf.writeShort(length);// 先將消息長度寫入,也就是消息頭
      buf.writeBytes(bytes);// 消息體中包含我們要發送的數據
      out.writeBytes(buf);
​
   }

}

實戰第五步 解碼器

與編碼器的操作相反,去掉頭部的兩個字節,然后轉換成 ProtoMsg.Message 消息

/**
 * 解碼器
 *
 */
public class ProtobufDecoder extends ByteToMessageDecoder {
   @Override
   protected void decode(ChannelHandlerContext ctx, ByteBuf in,
         List<Object> out) throws Exception {
      // 標記一下當前的readIndex的位置
      in.markReaderIndex();
      // 判斷包頭長度
      if (in.readableBytes() < 2) {// 不夠包頭
         return;
      }
​
      // 讀取傳送過來的消息的長度。
      int length = in.readUnsignedShort();
​
      // 長度如果小於0
      if (length < 0) {// 非法數據,關閉連接
         ctx.close();
      }
​
      if (length > in.readableBytes()) {// 讀到的消息體長度如果小於傳送過來的消息長度
         // 重置讀取位置
         in.resetReaderIndex();
         return;
      }
​
      ByteBuf frame = Unpooled.buffer(length);
      in.readBytes(frame);
      try {
         byte[] inByte = frame.array();
​
         // 字節轉成對象
         ProtoMsg.Message msg = ProtoMsg.Message.parseFrom(inByte);

         if (msg != null) {
            // 獲取業務消息頭
            out.add(msg);
         }
      } catch (Exception e) {
         LOG.info(ctx.channel().remoteAddress() + ",decode failed.", e);
      }
​
   }
}

實戰第六步 解碼器

將編碼器和解碼器,加入pipeline中,代碼如下:

// 設置通道初始化
bootstrap.handler(
        new ChannelInitializer<SocketChannel>()
        {
            public void initChannel(SocketChannel ch) throws Exception
            {
                ch.pipeline().addLast(new ProtobufDecoder());
                ch.pipeline().addLast(new ProtobufEncoder());
                ch.pipeline().addLast(chatClientHandler);
​
            }
        }
);

這一塊,很簡單。

寫在最后

終於大功告成。

為了方便大家理解 netty 和 protobuf 整合的過程, 實例進行了裁剪,僅僅剩下了 上面這塊非常很重要的部分。

如果需要真正的理解上面的內容,建議大家一定要去跑實例。


瘋狂創客圈 實戰計划
  • Netty 億級流量 高並發 IM后台 開源項目實戰

  • Netty 源碼、原理、JAVA NIO 原理

  • Java 面試題 一網打盡

  • 瘋狂創客圈 【 博客園 總入口 】



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM