上一篇中,我們編寫了客戶端功能。
這一篇將講解ISocketHandler的實現。
再來回顧一下ISocketHandler接口。
public interface ISocketHandler { /// <summary> /// 開始接收 /// </summary> /// <param name="stream">Socket網絡流</param> /// <param name="callback">回調函數</param> /// <param name="state">自定義狀態</param> /// <returns>異步結果</returns> IAsyncResult BeginReceive(Stream stream, AsyncCallback callback, object state); /// <summary> /// 結束接收 /// </summary> /// <param name="asyncResult">異步結果</param> /// <returns>接收到的數據</returns> byte[] EndReceive(IAsyncResult asyncResult); /// <summary> /// 開始發送 /// </summary> /// <param name="data">要發送的數據</param> /// <param name="offset">數據偏移</param> /// <param name="count">發送長度</param> /// <param name="stream">Socket網絡流</param> /// <param name="callback">回調函數</param> /// <param name="state">自定義狀態</param> /// <returns>異步結果</returns> IAsyncResult BeginSend(byte[] data, int offset, int count, Stream stream, AsyncCallback callback, object state); /// <summary> /// 結束發送 /// </summary> /// <param name="asyncResult">異步結果</param> /// <returns>發送是否成功</returns> bool EndSend(IAsyncResult asyncResult); }
做一個類SocketHandler繼承ISocketHandler接口
/// <summary> /// Socket處理程序 /// </summary> public class SocketHandler : ISocketHandler { /// <summary> /// 開始接收 /// </summary> /// <param name="stream">Socket網絡流</param> /// <param name="callback">回調函數</param> /// <param name="state">自定義狀態</param> /// <returns>異步結果</returns> public IAsyncResult BeginReceive(Stream stream, AsyncCallback callback, object state) { } /// <summary> /// 結束接收 /// </summary> /// <param name="asyncResult">異步結果</param> /// <returns>接收到的數據</returns> public byte[] EndReceive(IAsyncResult asyncResult) { } /// <summary> /// 開始發送 /// </summary> /// <param name="data">要發送的數據</param> /// <param name="offset">數據偏移</param> /// <param name="count">發送長度</param> /// <param name="stream">Socket網絡流</param> /// <param name="callback">回調函數</param> /// <param name="state">自定義狀態</param> /// <returns>異步結果</returns> public IAsyncResult BeginSend(byte[] data, int offset, int count, Stream stream, AsyncCallback callback, object state) { } /// <summary> /// 結束發送 /// </summary> /// <param name="asyncResult">異步結果</param> /// <returns>發送是否成功</returns> public bool EndSend(IAsyncResult asyncResult) { } }
增加兩個屬性與構造函數。
//異步處理關系集合 private Dictionary<IAsyncResult, SocketHandlerState> StateSet; //發送隊列 private List<SocketHandlerState> SendQueue; /// <summary> /// 實例化Socket處理程序 /// </summary> public SocketHandler() { StateSet = new Dictionary<IAsyncResult, SocketHandlerState>(); SendQueue = new List<SocketHandlerState>(); }
StateSet可以保存我們的異步調用結果等數據
SendQueue用來做一個發送隊列
接下來我們從發送數據開始。
由於需要用到Stream的異步方法,我們需要定義一個State類。
internal class SocketHandlerState { /// <summary> /// 數據 /// </summary> public byte[] Data { get; set; } /// <summary> /// 異步結果 /// </summary> public IAsyncResult AsyncResult { get; set; } /// <summary> /// Socket網絡流 /// </summary> public Stream Stream { get; set; } /// <summary> /// 異步回調函數 /// </summary> public AsyncCallback AsyncCallBack { get; set; } /// <summary> /// 是否完成 /// </summary> public bool Completed { get; set; } /// <summary> /// 數據長度 /// </summary> public int DataLength { get; set; } }
因為我們需要返回IAsyncResult,所以我們繼承該接口做一個SocketAsyncResult類。
/// <summary> /// Socket異步操作狀態 /// </summary> public class SocketAsyncResult : IAsyncResult { /// <summary> /// 實例化Socket異步操作狀態 /// </summary> /// <param name="state"></param> public SocketAsyncResult(object state) { AsyncState = state; AsyncWaitHandle = new AutoResetEvent(false); } /// <summary> /// 獲取用戶定義的對象,它限定或包含關於異步操作的信息。 /// </summary> public object AsyncState { get; private set; } /// <summary> /// 獲取用於等待異步操作完成的 System.Threading.WaitHandle。 /// </summary> public WaitHandle AsyncWaitHandle { get; private set; } /// <summary> /// 獲取一個值,該值指示異步操作是否同步完成。 /// </summary> public bool CompletedSynchronously { get { return false; } } /// <summary> /// 獲取一個值,該值指示異步操作是否已完成。 /// </summary> public bool IsCompleted { get; internal set; } }
然后開始編寫發送數據相關函數。
這里我將發送數據的大小限制為最大65535。
只需發送長度為2的頭信息即可把數據長度發送到對方。
/// <summary> /// 開始發送 /// </summary> /// <param name="data">要發送的數據</param> /// <param name="offset">數據偏移</param> /// <param name="count">發送長度</param> /// <param name="stream">Socket網絡流</param> /// <param name="callback">回調函數</param> /// <param name="state">自定義狀態</param> /// <returns>異步結果</returns> public IAsyncResult BeginSend(byte[] data, int offset, int count, Stream stream, AsyncCallback callback, object state) { //data不能為null if (data == null) throw new ArgumentNullException("data"); //offset不能小於0和超過data長度 if (offset > data.Length || offset < 0) throw new ArgumentOutOfRangeException("offset"); //count不能大於65535 if (count <= 0 || count > data.Length - offset || count > ushort.MaxValue) throw new ArgumentOutOfRangeException("count"); //stream不能為null if (stream == null) throw new ArgumentNullException("stream"); //回調函數不能為null if (callback == null) throw new ArgumentNullException("callback"); //stream異常 if (!stream.CanWrite) throw new ArgumentException("stream不支持寫入。"); SocketAsyncResult result = new SocketAsyncResult(state); //初始化SocketHandlerState SocketHandlerState shs = new SocketHandlerState(); shs.Data = data; shs.AsyncResult = result; shs.Stream = stream; shs.AsyncCallBack = callback; shs.DataLength = 0; //鎖定SendQueue //避免多線程同時發送數據 lock (SendQueue) { //添加狀態 SendQueue.Add(shs); //如果SendQueue數量大於1,則表示有數據尚未發送完成 if (SendQueue.Count > 1) return result; } //獲取數據長度 //ushort的最大值為65535 //轉換為byte[]長度為2 var dataLength = BitConverter.GetBytes((ushort)data.Length); //向對方發送長度為2的頭信息,表示接下來要發送的數據長度 stream.Write(dataLength, 0, dataLength.Length); //開始異步發送數據 stream.BeginWrite(shs.Data, 0, shs.Data.Length, EndWrite, shs).AsyncWaitHandle.WaitOne(); return result; } //stream異步結束寫入 private void EndWrite(IAsyncResult ar) { SocketHandlerState state = (SocketHandlerState)ar.AsyncState; //鎖定StateSet lock (StateSet) StateSet.Add(state.AsyncResult, state); try { state.Stream.EndWrite(ar); } catch { //出現Socket異常,發送失敗 state.Completed = false; //允許等待線程繼續 ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set(); //執行異步回調函數 state.AsyncCallBack(state.AsyncResult); return; } //發送成功 state.Completed = true; //允許等待線程繼續 ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set(); //執行異步回調函數 state.AsyncCallBack(state.AsyncResult); //鎖定SendQueue lock (SendQueue) { SocketHandlerState prepare = null; //移除當前發送完成的數據 SendQueue.Remove(state); //如果SendQueue還有數據存在,則繼續發送 if (SendQueue.Count > 0) { prepare = SendQueue[0]; } if (prepare != null) { //獲取數據長度 //ushort的最大值為65535 //轉換為byte[]長度為2 var dataLength = BitConverter.GetBytes((ushort)prepare.Data.Length); //向對方發送長度為2的頭信息,表示接下來要發送的數據長度 prepare.Stream.Write(dataLength, 0, dataLength.Length); //開始異步發送數據 prepare.Stream.BeginWrite(prepare.Data, 0, prepare.Data.Length, EndWrite, prepare).AsyncWaitHandle.WaitOne(); } } } /// <summary> /// 結束發送 /// </summary> /// <param name="asyncResult">異步結果</param> /// <returns>發送是否成功</returns> public bool EndSend(IAsyncResult asyncResult) { //判斷異步操作狀態是否屬於當前處理程序 if (!StateSet.ContainsKey(asyncResult)) throw new ArgumentException("無法識別的asyncResult。"); SocketHandlerState state = StateSet[asyncResult]; lock (StateSet) StateSet.Remove(asyncResult); return state.Completed; }
接下來是接收數據的相關方法。
/// <summary> /// 開始接收 /// </summary> /// <param name="stream">Socket網絡流</param> /// <param name="callback">回調函數</param> /// <param name="state">自定義狀態</param> /// <returns>異步結果</returns> public IAsyncResult BeginReceive(Stream stream, AsyncCallback callback, object state) { //stream不能為null if (stream == null) throw new ArgumentNullException("stream"); //回調函數不能為null if (callback == null) throw new ArgumentNullException("callback"); //stream異常 if (!stream.CanRead) throw new ArgumentException("stream不支持讀取。"); SocketAsyncResult result = new SocketAsyncResult(state); //初始化SocketHandlerState SocketHandlerState shs = new SocketHandlerState(); shs.Data = new byte[2]; shs.AsyncResult = result; shs.Stream = stream; shs.AsyncCallBack = callback; shs.Completed = true; //開始異步接收長度為2的頭信息 //該頭信息包含要接收的主要數據長度 stream.BeginRead(shs.Data, 0, 2, EndRead, shs); return result; } //stream異步結束讀取 private void EndRead(IAsyncResult ar) { SocketHandlerState state = (SocketHandlerState)ar.AsyncState; int dataLength; try { dataLength = state.Stream.EndRead(ar); } catch { dataLength = 0; } //dataLength為0則表示Socket斷開連接 if (dataLength == 0) { lock (StateSet) StateSet.Add(state.AsyncResult, state); //設定接收到的數據位空byte數組 state.Data = new byte[0]; //允許等待線程繼續 ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set(); //執行異步回調函數 state.AsyncCallBack(state.AsyncResult); return; } //如果是已完成狀態,則表示state.Data的數據是頭信息 if (state.Completed) { //設定狀態為未完成 state.Completed = false; //已接收得數據長度為0 state.DataLength = 0; //獲取主要數據長度 var length = BitConverter.ToUInt16(state.Data, 0); //初始化數據的byte數組 state.Data = new byte[length]; try { //開始異步接收主要數據 state.Stream.BeginRead(state.Data, 0, length, EndRead, state); } catch { //出現Socket異常 lock (StateSet) StateSet.Add(state.AsyncResult, state); state.Data = new byte[0]; ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set(); state.AsyncCallBack(state.AsyncResult); } return; } //接收到主要數據 else { //判斷是否接收了完整的數據 if (dataLength + state.DataLength != state.Data.Length) { //增加已接收數據長度 state.DataLength += dataLength; try { //繼續接收數據 state.Stream.BeginRead(state.Data, state.DataLength, state.Data.Length - state.DataLength, EndRead, state); } catch { //出現Socket異常 lock (StateSet) StateSet.Add(state.AsyncResult, state); state.Data = new byte[0]; ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set(); state.AsyncCallBack(state.AsyncResult); return; } return; } //接收完成 state.Completed = true; lock (StateSet) StateSet.Add(state.AsyncResult, state); ((AutoResetEvent)state.AsyncResult.AsyncWaitHandle).Set(); state.AsyncCallBack(state.AsyncResult); } } /// <summary> /// 結束接收 /// </summary> /// <param name="asyncResult">異步結果</param> /// <returns>接收到的數據</returns> public byte[] EndReceive(IAsyncResult asyncResult) { //判斷異步操作狀態是否屬於當前處理程序 if (!StateSet.ContainsKey(asyncResult)) throw new ArgumentException("無法識別的asyncResult。"); SocketHandlerState state = StateSet[asyncResult]; lock (StateSet) StateSet.Remove(asyncResult); return state.Data; }
至此,SocketHandler的功能已經實現。
下一篇將為大家講解服務器端的實現。
原文地址:http://www.cnblogs.com/Kation/archive/2013/03/06/2947145.html