SuperSocket源碼解析之消息處理


一 簡述

  Tcp消息的處理本身是與Tcp消息傳輸過程獨立的,是消息的兩個不同階段,從前面的會話生命周期我們已經知道消息的傳輸主要有SocketSession實現,而真正處理則交由AppSession實現,SuperSocket的層次划分也是非常清晰明了。

  SuperSocket消息處理主要流程:接收=》原始過濾=》協議解析=》命令路由並執行=》找不到命令則直接一分不動發給客戶端

二 消息接收

1 開始接收

代碼位置:AsyncSocketSession=》StartReceive

 1  private void StartReceive(SocketAsyncEventArgs e)
 2         {
 3             StartReceive(e, 0);
 4         }
 5 
 6         private void StartReceive(SocketAsyncEventArgs e, int offsetDelta)
 7         {
 8             bool willRaiseEvent = false;
 9 
10             try
11             {
12                 if (offsetDelta < 0 || offsetDelta >= Config.ReceiveBufferSize)
13                     throw new ArgumentException(string.Format("Illigal offsetDelta: {0}", offsetDelta), "offsetDelta");
14 
15                 var predictOffset = SocketAsyncProxy.OrigOffset + offsetDelta;
16 
17                 if (e.Offset != predictOffset)
18                 {
19                     e.SetBuffer(predictOffset, Config.ReceiveBufferSize - offsetDelta);
20                 }
21 
22                 if (IsInClosingOrClosed)
23                     return;
24 
25                 OnReceiveStarted();
26                 willRaiseEvent = Client.ReceiveAsync(e);
27             }
28             catch (Exception exc)
29             {
30                 LogError(exc);
31 
32                 OnReceiveError(CloseReason.SocketError);
33                 return;
34             }
35 
36             if (!willRaiseEvent)
37             {
38                 ProcessReceive(e);
39             }
40         }
View Code

在接收數據前后觸發ReceiveStarted,ReceiveEnd事件

2 接收有消息處理並轉入處理

 1 public void ProcessReceive(SocketAsyncEventArgs e)
 2         {
 3             if (!ProcessCompleted(e))
 4             {
 5                 OnReceiveError(CloseReason.ClientClosing);
 6                 return;
 7             }
 8 
 9             OnReceiveEnded();
10 
11             int offsetDelta;
12 
13             try
14             {
15                 //交給app會話處理接收到的數據,而appsession又交給接收請求處理器處理
16                 offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
17             }
18             catch (Exception exc)
19             {
20                 LogError("Protocol error", exc);
21                 this.Close(CloseReason.ProtocolError);
22                 return;
23             }
24 
25             //read the next block of data sent from the client
26             StartReceive(e, offsetDelta);
27         }
View Code

三 消息處理

1 入口:按照協議解析,每次只處理一個數據包,因此便有了如下的入口代碼

 1 int IAppSession.ProcessRequest(byte[] readBuffer, int offset, int length, bool toBeCopied)
 2         {
 3             int rest, offsetDelta;
 4 
 5             while (true)
 6             {
 7                 var requestInfo = FilterRequest(readBuffer, offset, length, toBeCopied, out rest, out offsetDelta);
 8 
 9                 if (requestInfo != null)
10                 {
11                     try
12                     {
13                         AppServer.ExecuteCommand(this, requestInfo);
14                     }
15                     catch (Exception e)
16                     {
17                         HandleException(e);
18                     }
19                 }
20 
21                 if (rest <= 0)
22                 {
23                     return offsetDelta;
24                 }
25 
26                 //Still have data has not been processed
27                 offset = offset + length - rest;
28                 length = rest;
29             }
30         }
View Code

2 原始過濾

 此處以原始數據接收事件方式,預留給AppServer子類處理該原始數據包,意味着可以對原始數據包執行一次攔截處理,如果經過一些邏輯處理后不能滿足,則將終止該數據包繼續傳播

3 協議解析

協議解析由AppSession的m_ReceiveFilter成員完成,該成員的實例化有2種實現方式

默認實例化 :默認命令行協議,該協議舉例

下面是2行命令,行使用\r\n 作為結束標識,也就是回車換行,命令行內部使用空格分隔,第一個控制之前 如echo 為消息頭也就是key,其后的字符串也使用空格分隔,作為參數,其整體=body

echo cc xxd    
cdc ds mmm

 

解析后為2個StringRequestInfo對象

 

 默認的協議解析工廠

 

 實例化默認協議解析對象

 

 通過AppServer構造函數傳遞解析工廠

 

 

 4 命令路由

上面我們已經解析到客戶端發來的2條命令分別為echo 參數為cc xxd;cdc ds mmm;

其中key分別為echo和cdc,對於命令模式來說命令本身使用name字段進行標識,如果我們的key與name匹配那么我們即可路由到一個已有的命令,進而執行該命令,來看代碼

 對於能夠路由到的命令我們執行命令

 

 對於路由失敗來說SuperSocket又是怎么做的呢?

 

找不到命令來處理該消息,將該命令名字發送會客戶端,意思說明服務器沒有實現該命令,那么命令從何而來?

 

5  命令

這還的從CommandLoader說起,CommandLoader又追溯到AppServer的構建過程

 

在沒有顯示配置CommandLoader的情況下默認為ReflectCommandLoader

 

 ReflectCommandLoader創建命令

 

 

  ReflectCommandLoader將掃描應用程序根目錄下所有程序,並將實現了命令接口的實例通過反射創建出來

 1 public override bool TryLoadCommands(out IEnumerable<TCommand> commands)
 2         {
 3             commands = null;
 4 
 5             var commandAssemblies = new List<Assembly>();
 6 
 7             if (m_AppServer.GetType().Assembly != this.GetType().Assembly)
 8                 commandAssemblies.Add(m_AppServer.GetType().Assembly);
 9 
10             string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly");
11 
12             if (!string.IsNullOrEmpty(commandAssembly))
13             {
14                 OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!");
15                 return false;
16             }
17 
18 
19             if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any())
20             {
21                 try
22                 {
23                     var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray());
24 
25                     if (definedAssemblies.Any())
26                         commandAssemblies.AddRange(definedAssemblies);
27                 }
28                 catch (Exception e)
29                 {
30                     OnError(new Exception("Failed to load defined command assemblies!", e));
31                     return false;
32                 }
33             }
34 
35             if (!commandAssemblies.Any())
36             {
37                 commandAssemblies.Add(Assembly.GetEntryAssembly());
38             }
39 
40             var outputCommands = new List<TCommand>();
41 
42             foreach (var assembly in commandAssemblies)
43             {
44                 try
45                 {
46                     outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>());
47                 }
48                 catch (Exception exc)
49                 {
50                     OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc));
51                     return false;
52                 }
53             }
54 
55             commands = outputCommands;
56 
57             return true;
58         }
View Code

因此默認的我們只需要定義一些實現命令接口ICommand<TAppSession, TRequestInfo>的命令出來,

6 自定義命令

 直接繼承CommandBase抽象類即可

 

 到此SuperSocket對消息的處理流程差不多就是這樣了,SuperSocket的框架的使用 我們只需要自定義自己的AppServer,以及AppServer配套的AppSession,ReciverFilter,以及命令等即可,這些在官方提供的例子中已經很清晰


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM