一 簡述
Tcp消息的處理本身是與Tcp消息傳輸過程獨立的,是消息的兩個不同階段,從前面的會話生命周期我們已經知道消息的傳輸主要有SocketSession實現,而真正處理則交由AppSession實現,SuperSocket的層次划分也是非常清晰明了。
SuperSocket消息處理主要流程:接收=》原始過濾=》協議解析=》命令路由並執行=》找不到命令則直接一分不動發給客戶端
二 消息接收
1 開始接收
代碼位置:AsyncSocketSession=》StartReceive

1 private void StartReceive(SocketAsyncEventArgs e) 2 { 3 StartReceive(e, 0); 4 } 5 6 private void StartReceive(SocketAsyncEventArgs e, int offsetDelta) 7 { 8 bool willRaiseEvent = false; 9 10 try 11 { 12 if (offsetDelta < 0 || offsetDelta >= Config.ReceiveBufferSize) 13 throw new ArgumentException(string.Format("Illigal offsetDelta: {0}", offsetDelta), "offsetDelta"); 14 15 var predictOffset = SocketAsyncProxy.OrigOffset + offsetDelta; 16 17 if (e.Offset != predictOffset) 18 { 19 e.SetBuffer(predictOffset, Config.ReceiveBufferSize - offsetDelta); 20 } 21 22 if (IsInClosingOrClosed) 23 return; 24 25 OnReceiveStarted(); 26 willRaiseEvent = Client.ReceiveAsync(e); 27 } 28 catch (Exception exc) 29 { 30 LogError(exc); 31 32 OnReceiveError(CloseReason.SocketError); 33 return; 34 } 35 36 if (!willRaiseEvent) 37 { 38 ProcessReceive(e); 39 } 40 }
在接收數據前后觸發ReceiveStarted,ReceiveEnd事件
2 接收有消息處理並轉入處理

1 public void ProcessReceive(SocketAsyncEventArgs e) 2 { 3 if (!ProcessCompleted(e)) 4 { 5 OnReceiveError(CloseReason.ClientClosing); 6 return; 7 } 8 9 OnReceiveEnded(); 10 11 int offsetDelta; 12 13 try 14 { 15 //交給app會話處理接收到的數據,而appsession又交給接收請求處理器處理 16 offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true); 17 } 18 catch (Exception exc) 19 { 20 LogError("Protocol error", exc); 21 this.Close(CloseReason.ProtocolError); 22 return; 23 } 24 25 //read the next block of data sent from the client 26 StartReceive(e, offsetDelta); 27 }
三 消息處理
1 入口:按照協議解析,每次只處理一個數據包,因此便有了如下的入口代碼

1 int IAppSession.ProcessRequest(byte[] readBuffer, int offset, int length, bool toBeCopied) 2 { 3 int rest, offsetDelta; 4 5 while (true) 6 { 7 var requestInfo = FilterRequest(readBuffer, offset, length, toBeCopied, out rest, out offsetDelta); 8 9 if (requestInfo != null) 10 { 11 try 12 { 13 AppServer.ExecuteCommand(this, requestInfo); 14 } 15 catch (Exception e) 16 { 17 HandleException(e); 18 } 19 } 20 21 if (rest <= 0) 22 { 23 return offsetDelta; 24 } 25 26 //Still have data has not been processed 27 offset = offset + length - rest; 28 length = rest; 29 } 30 }
2 原始過濾
此處以原始數據接收事件方式,預留給AppServer子類處理該原始數據包,意味着可以對原始數據包執行一次攔截處理,如果經過一些邏輯處理后不能滿足,則將終止該數據包繼續傳播
3 協議解析
協議解析由AppSession的m_ReceiveFilter成員完成,該成員的實例化有2種實現方式
默認實例化 :默認命令行協議,該協議舉例
下面是2行命令,行使用\r\n 作為結束標識,也就是回車換行,命令行內部使用空格分隔,第一個控制之前 如echo 為消息頭也就是key,其后的字符串也使用空格分隔,作為參數,其整體=body
echo cc xxd
cdc ds mmm
解析后為2個StringRequestInfo對象
默認的協議解析工廠
實例化默認協議解析對象
通過AppServer構造函數傳遞解析工廠
4 命令路由
上面我們已經解析到客戶端發來的2條命令分別為echo 參數為cc xxd;cdc ds mmm;
其中key分別為echo和cdc,對於命令模式來說命令本身使用name字段進行標識,如果我們的key與name匹配那么我們即可路由到一個已有的命令,進而執行該命令,來看代碼
對於能夠路由到的命令我們執行命令
對於路由失敗來說SuperSocket又是怎么做的呢?
找不到命令來處理該消息,將該命令名字發送會客戶端,意思說明服務器沒有實現該命令,那么命令從何而來?
5 命令
這還的從CommandLoader說起,CommandLoader又追溯到AppServer的構建過程
在沒有顯示配置CommandLoader的情況下默認為ReflectCommandLoader
ReflectCommandLoader創建命令
ReflectCommandLoader將掃描應用程序根目錄下所有程序,並將實現了命令接口的實例通過反射創建出來

1 public override bool TryLoadCommands(out IEnumerable<TCommand> commands) 2 { 3 commands = null; 4 5 var commandAssemblies = new List<Assembly>(); 6 7 if (m_AppServer.GetType().Assembly != this.GetType().Assembly) 8 commandAssemblies.Add(m_AppServer.GetType().Assembly); 9 10 string commandAssembly = m_AppServer.Config.Options.GetValue("commandAssembly"); 11 12 if (!string.IsNullOrEmpty(commandAssembly)) 13 { 14 OnError("The configuration attribute 'commandAssembly' is not in used, please try to use the child node 'commandAssemblies' instead!"); 15 return false; 16 } 17 18 19 if (m_AppServer.Config.CommandAssemblies != null && m_AppServer.Config.CommandAssemblies.Any()) 20 { 21 try 22 { 23 var definedAssemblies = AssemblyUtil.GetAssembliesFromStrings(m_AppServer.Config.CommandAssemblies.Select(a => a.Assembly).ToArray()); 24 25 if (definedAssemblies.Any()) 26 commandAssemblies.AddRange(definedAssemblies); 27 } 28 catch (Exception e) 29 { 30 OnError(new Exception("Failed to load defined command assemblies!", e)); 31 return false; 32 } 33 } 34 35 if (!commandAssemblies.Any()) 36 { 37 commandAssemblies.Add(Assembly.GetEntryAssembly()); 38 } 39 40 var outputCommands = new List<TCommand>(); 41 42 foreach (var assembly in commandAssemblies) 43 { 44 try 45 { 46 outputCommands.AddRange(assembly.GetImplementedObjectsByInterface<TCommand>()); 47 } 48 catch (Exception exc) 49 { 50 OnError(new Exception(string.Format("Failed to get commands from the assembly {0}!", assembly.FullName), exc)); 51 return false; 52 } 53 } 54 55 commands = outputCommands; 56 57 return true; 58 }
因此默認的我們只需要定義一些實現命令接口ICommand<TAppSession, TRequestInfo>的命令出來,
6 自定義命令
直接繼承CommandBase抽象類即可
到此SuperSocket對消息的處理流程差不多就是這樣了,SuperSocket的框架的使用 我們只需要自定義自己的AppServer,以及AppServer配套的AppSession,ReciverFilter,以及命令等即可,這些在官方提供的例子中已經很清晰