SuperSocket與Netty之實現protobuf協議,包括服務端和客戶端


今天准備給大家介紹一個c#服務器框架(SuperSocket)和一個c#客戶端框架(SuperSocket.ClientEngine)。這兩個框架的作者是園區里面的江大漁。 首先感謝他的無私開源貢獻。之所以要寫這個文章是因為群里經常有人問這個客戶端框架要如何使用。原因在於服務端框架的文檔比較多,客戶端的文檔比較少,所以很多c#基礎比較差的人就不懂怎么玩起來。今天就這里寫一個例子希望能給部分人拋磚引玉吧。

參考資料:

SuperSocket文檔 http://docs.supersocket.net/

我以前在開源中國的一部分文章:http://my.oschina.net/caipeiyu/blog

這篇文章選擇 protobuf 來實現,選擇protobuf是因為服務器有可能用的是java的netty,客戶端想用SuperSocket.ClientEngine,而netty我看很多人經常用protobuf。


一、SuperSocket服務器

新建一個項目 ProtobufServer 然后添加 SuperSocket 和 protobuf 的依賴包。

添加protobuf依賴包 輸入的搜索詞是 Google.ProtocolBuffers
添加SuperSocket依賴包 輸入搜索詞是 SuperSocket,要添加兩個SuperSocket.Engine 和 SuperSocket

上面的工作完成后,我們就應該來實現我們的傳輸協議了。傳輸協議打算參考netty的ProtobufVarint32FrameDecoder.java

* BEFORE DECODE (302 bytes)       AFTER DECODE (300 bytes)
* +--------+---------------+      +---------------+
* | Length | Protobuf Data |----->| Protobuf Data |
* | 0xAC02 |  (300 bytes)  |      |  (300 bytes)  |
* +--------+---------------+      +---------------+

Protobuf Data是protobuf的序列化結果。Length(Base 128 Varints)是表示Protobuf Data的長度。protobuf本身的序列號協議可以參考:https://developers.google.com/protocol-buffers/docs/encoding

我們先看一下SuperSocket的內置的常用協議實現模版看看有沒有合適我們可以直接拿來用的。因為Length使用的是Base 128 Varints一種處理整數的變長二進制編碼算法,所以呢內置的協議實現模板並不能直接拿來使用,所以我們只能自己來實現接口IRequestInfo和IReceiveFilter了,參考:使用 IRequestInfo 和 IReceiveFilter 等等其他對象來實現自定義協議

這里說明一下:為什么protobuf明明序列化成Protobuf Data 了為什么還要再加一個Length來打包,因為tcp這個流發送會參數粘包、分包,如果不加個協議來解析會讀取錯誤的數據而導致無法反序列化 Protobuf Data (自行谷歌 tcp 粘包、分包)

ProtobufRequestInfo的實現

在實現ProtobufRequestInfo之前要先來考慮一個問題,那就是我們的傳輸協議是長度+protobuf數據,那么我們根本就無法知道獲取到的protobuf數據該如何反序列化。在官方網站提供了一種解決思路:https://developers.google.com/protocol-buffers/docs/techniques#union
就是我們可以弄唯一個數據包,然后這個數據包里面必須包含一個枚舉值,然后還包含了其他類型的數據包,每一個枚舉值對應一個數據包,然后傳送過來后,可以用分支判斷來獲取值。

那我們先設計一個 DefeatMessage.proto包含內容:

import "BackMessage.proto";
import "CallMessage.proto";

message DefeatMessage {
    enum Type { CallMessage = 1; BackMessage = 2; }

    required Type type = 1;

    optional CallMessage callMessage = 2;
    optional BackMessage backMessage = 3;
}

然后再把CallMessage和BackMessage補全

message BackMessage {

    optional string content = 1;

}

message CallMessage {

    optional string content = 1;

}

然后在我們的路徑packages\Google.ProtocolBuffers.2.4.1.555\tools里面有兩個工具protoc.exe 和 protogen.exe,我們可以執行下面的命令來生成我們的c#代碼

protoc --descriptor_set_out=DefeatMessage.protobin --proto_path=pack --include_imports pack\DefeatMessage.proto

protogen DefeatMessage.protobin

注意路徑要自己修改

如果有報Expected top-level statement (e.g. "message").這么一個錯誤,那就是你cmd的編碼和proto的編碼不一致,要改成一致。

相關文件:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/tree/master/ProtobufServer/Pack

生成完c#代碼后,我們就要設計ProtobufRequestInfo了。這個比較簡單,只要實現IRequestInfo接口。我們這里在實現接口帶的屬性外另加一個 DefeatMessage 和 DefeatMessage.Types.Type,其中DefeatMessage是為了存儲我們解包完數據后反序列化出來的對象,Type是為了方便區分我們應該取出DefeatMessage里面的哪個值。

using System;
using SuperSocket.SocketBase.Protocol;

namespace ProtobufServer
{
    public class ProtobufRequestInfo : IRequestInfo
    {
        	public string Key {
                get;
                private set;
            }

            public DefeatMessage.Types.Type Type
            {
                get;
                private set;
            }

            public DefeatMessage Body { get; private set; }

            public ProtobufRequestInfo (DefeatMessage.Types.Type type, DefeatMessage body)
            {
                Type = type;
                Key = type.ToString();
                Body = body;
            }
    }
}

ProtobufReceiveFilter的實現

代碼比較長,直接看在github上的代碼ProtobufReceiveFilter的實現
實現的注意點參考:使用 IRequestInfo 和 IReceiveFilter 等等其他對象來實現自定義協議。主要是對ss里面給我們緩存的數據流進行協議解析。

  1. readBuffer: 接收緩沖區, 接收到的數據存放在此數組里

  2. offset: 接收到的數據在接收緩沖區的起始位置

  3. length: 本輪接收到的數據的長度

  4. toBeCopied: 表示當你想緩存接收到的數據時,是否需要為接收到的數據重新創建一個備份而不是直接使用接收緩沖區

  5. rest: 這是一個輸出參數, 它應該被設置為當解析到一個為正的請求后,接收緩沖區還剩余多少數據未被解析

     public  ProtobufRequestInfo Filter (byte[] readBuffer, int offset, int length, bool toBeCopied, out int rest)
         {
             rest = 0;
             var readOffset = offset - m_OffsetDelta;//我們重新計算緩存區的起始位置,這里要說明的是如果前一次解析還有剩下沒有解析到的數據,那么就需要把起始位置移到之前最后要解析的那個位置
    
         CodedInputStream stream = CodedInputStream.CreateInstance (readBuffer, readOffset, length);//這個類是Google.ProtocolBuffers提供的
     	var varint32 = (int)stream.ReadRawVarint32 ();//這里是計算我們這個數據包是有多長(不包含length本身)
     	if(varint32 <= 0) return null;
    
         var headLen = (int) stream.Position - readOffset;//計算協議里面length占用幾位
         rest = length - varint32 - headLen + m_ParsedLength;//本次解析完后緩存區還剩下多少數據沒有解析
    
     	if (rest >= 0)//緩存區里面的數據足夠本次解析
     	{	
     		byte[] body = stream.ReadRawBytes(varint32);
             DefeatMessage message = DefeatMessage.ParseFrom(body);
     		var requestInfo = new ProtobufRequestInfo(message.Type,message);
     		InternalReset();
     		return requestInfo;
     	}
     	else//緩存區里面的數據不夠本次解析,(名詞為分包)
     	{
     		m_ParsedLength += length;
     		m_OffsetDelta = m_ParsedLength;
     		rest = 0;
    
     		var expectedOffset = offset + length;
     		var newOffset = m_OrigOffset + m_OffsetDelta;
    
     		if (newOffset < expectedOffset)
     		{
     			Buffer.BlockCopy(readBuffer, offset - m_ParsedLength + length, readBuffer, m_OrigOffset, m_ParsedLength);
     		}
    
     		return null;
     	}
     }
    

ProtobufAppSession 的實現

using System;
using SuperSocket.SocketBase;

namespace ProtobufServer
{
    public class ProtobufAppSession : AppSession<ProtobufAppSession,ProtobufRequestInfo>
    {
        public ProtobufAppSession ()
        {
        }
    }
}

ProtobufAppServer 的實現

using System;
using SuperSocket.SocketBase;
using SuperSocket.SocketBase.Protocol;

namespace ProtobufServer
{
    public class ProtobufAppServer : AppServer<ProtobufAppSession,ProtobufRequestInfo>
    {
        public ProtobufAppServer ()
            :base(new DefaultReceiveFilterFactory< ProtobufReceiveFilter, ProtobufRequestInfo >())
        {
        }
    }
}

服務器的實例啟動實現

參考:http://docs.supersocket.net/v1-6/zh-CN/A-Telnet-Example

代碼:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/ProtobufServer/Program.cs

主要是接收到數據的一個方法實現,當然ss里面還帶了命令模式的實現,不過這個不在本文章里面說。這里的實現了接收到不同的數據給打印出來,然后接收到CallMessage數據的話就給客戶端回發一條信息

private static void AppServerOnNewRequestReceived(ProtobufAppSession session, ProtobufRequestInfo requestInfo)
    {
        switch (requestInfo.Type)
        {
            case DefeatMessage.Types.Type.BackMessage:
                Console.WriteLine("BackMessage:{0}", requestInfo.Body.BackMessage.Content);
                break;
            case DefeatMessage.Types.Type.CallMessage:
                Console.WriteLine("CallMessage:{0}", requestInfo.Body.CallMessage.Content);

                var backMessage = BackMessage.CreateBuilder()
				.SetContent("Hello I am form C# server by SuperSocket").Build();
                var message = DefeatMessage.CreateBuilder()
                    .SetType(DefeatMessage.Types.Type.BackMessage)
                    .SetBackMessage(backMessage).Build();

                using (var stream = new MemoryStream())
                {

                    CodedOutputStream os = CodedOutputStream.CreateInstance(stream);

                    os.WriteMessageNoTag(message);

                    os.Flush();

                    byte[] data = stream.ToArray();
                    session.Send(new ArraySegment<byte>(data));

                }


                break;

        }
    }

服務器的代碼就到這里,可以編譯運行起來看看有無錯誤。

二、SuperSocket.ClientEngine客戶端

與服務器實現相同,先通過NuGet添加 SuperSocket.ClientEngine 和 protobuf 的依賴包。
有三個實現:

ProtobufPackageInfo的實現

把前面實現服務器時候生成的通訊數據包拷貝過來,然后和實現服務器的ProtobufRequestInfo一樣,只不過這里只是實現接口IPackageInfo而已

using SuperSocket.SocketBase.Protocol;using SuperSocket.ProtoBase;

namespace ProtobufClient
{
    public class ProtobufPackageInfo : IPackageInfo
    {
        public ProtobufPackageInfo(DefeatMessage.Types.Type type, DefeatMessage body)
        {
            Type = type;
            Key = type.ToString();
            Body = body;
        }

        public string Key { get; private set; }

        public DefeatMessage Body { get; private set; }
        public DefeatMessage.Types.Type Type { get; private set; }
    }
}

ProtobufReceiveFilter的實現

代碼:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/ProtobufClient/ProtobufReceiveFilter.cs
這里的數據解析的實現與服務器的實現有點不同,不過下一個版本可能會統一,如果統一起來的話,那么以后數據解析就可以做成和插件一樣,同時可以給服務器和客戶端使用。

  1. data:也是數據緩存區
  2. rest:緩存區還剩下多少

這個實現與服務器的不同就在BufferList本身就已經有處理分包,就不需要我們自己再做處理。

public ProtobufPackageInfo Filter(BufferList data, out int rest)
    {
        rest = 0;
        var buffStream = new BufferStream();
        buffStream.Initialize(data);

        var stream = CodedInputStream.CreateInstance(buffStream);
        var varint32 = (int) stream.ReadRawVarint32();
        if (varint32 <= 0) return default(ProtobufPackageInfo);

        var total = data.Total;
        var packageLen = varint32 + (int) stream.Position;

        if (total >= packageLen)
        {
            rest = total - packageLen;
            var body = stream.ReadRawBytes(varint32);
            var message = DefeatMessage.ParseFrom(body);
            var requestInfo = new ProtobufPackageInfo(message.Type, message);
            return requestInfo;
        }

        return default(ProtobufPackageInfo);
    }

運行主程序實現

具體實現看:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/ProtobufClient/Program.cs

這個真的沒有什么好說了。運行效果如下:

這里的打印信息是相對比較簡單,大家可以自己下載源碼來加些打印數據,讓看起來更好看點。

三、java的Netty實現

既然前面提到了Netty,那就順便實現一個簡單的服務器來通訊看看。
使用的是Netty 4.x 參考資料:http://netty.io/wiki/user-guide-for-4.x.html

Netty的實現在網絡上有超級多的例子,這里就簡單的介紹一下就可以,首先先生成java的通訊包代碼

protoc --proto_path=pack --java_out=./ pack/DefeatMessage.proto

protoc --proto_path=pack --java_out=./ pack/BackMessage.proto

protoc --proto_path=pack --java_out=./ pack/CallMessage.proto

這幾個命令要自己靈活改變們不要死死的硬搬。

生成的代碼:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/tree/master/java/NettyProtobufServer/src/main/java

然后創建一個maven項目,添加Netty 和 protobuf 依賴:https://github.com/kotcmm/SuperSocket.ClientEngine.QuickStart/blob/master/java/NettyProtobufServer/pom.xml

<dependencies>
        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>2.6.1</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-microbench</artifactId>
            <version>4.1.0.Final</version>
        </dependency>
    </dependencies>

ProtobufServerHandler的實現

因為Netty里面已經有幫我們實現了protobuf的解析,所以我們不需要自己實現。我們只要繼承ChannelInboundHandlerAdapter然后通過channelRead就可以拿到解析好的對象,然后轉換成我們自己的類型,就可以直接使用。這里同樣是實現不同類型的消息打印和CallMessage消息就回復信息給客戶端。

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
* Created by caipeiyu on 16/6/4.
*/
public class ProtobufServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)

        try {
            DefeatMessageOuterClass.DefeatMessage in = (DefeatMessageOuterClass.DefeatMessage) msg;
        if(in.getType() == DefeatMessageOuterClass.DefeatMessage.Type.BackMessage){
            System.out.print("BackMessage:");
            System.out.print(in.getBackMessage());
            System.out.flush();
        }else if(in.getType() == DefeatMessageOuterClass.DefeatMessage.Type.CallMessage){
            System.out.print("CallMessage:");
            System.out.print(in.getCallMessage());
            System.out.flush();

            DefeatMessageOuterClass.DefeatMessage out =
                    DefeatMessageOuterClass.DefeatMessage.newBuilder()
                            .setType(DefeatMessageOuterClass.DefeatMessage.Type.BackMessage)
                            .setBackMessage(BackMessageOuterClass.BackMessage
                                    .newBuilder().setContent("Hello I from server by Java Netty").build())
                            .build();

            ctx.write(out);
            ctx.flush();
        }

        } finally {
            ReferenceCountUtil.release(msg); // (2)
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

ProtobufServer的實現

主要是添加已經有的編碼解碼和消息接收的類就可以了。

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

/**
* Created by caipeiyu on 16/6/4.
*/
public class ProtobufServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            //解碼
                            p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
                            //構造函數傳遞要解碼成的類型
                            p.addLast("protobufDecoder", new ProtobufDecoder(DefeatMessageOuterClass.DefeatMessage.getDefaultInstance()));
                            //編碼
                            p.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());
                            p.addLast("protobufEncoder", new ProtobufEncoder());
                            //業務邏輯處理
                            p.addLast("handler", new ProtobufServerHandler());

                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // Bind and start to accept incoming connections.
            ChannelFuture f = b.bind(2012).sync(); // (7)

            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }

    }
}

整個代碼編寫完成后,直接運行並打開我們前面的客戶端進通訊發數據。運行結果如下:

當然這三個例子直接簡單的說明如何使用框架來解決我們的問題,實際開發過程中肯定不只是我們例子的這么點東西,需要考慮的東西還很多,這里只是寫一些可以運行起來的例子作為拋磚引玉。希望能給不懂的同學有點啟發作用。謝謝您百忙中抽出時間來觀看我的分享。


由於本人水平有限,知識有限,文章難免會有錯誤,歡迎大家指正。如果有什么問題也歡迎大家回復交流。要是你覺得本文還可以,那么點擊一下推薦。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM