.NET中的Socket類提供了網絡通信常用的方法,分別提供了同步和異步兩個版本,其中異步的實現是基於APM異步模式實現,即BeginXXX/EndXXX的方式。異步方法由於其非阻塞的特性,在需考慮程序性能和伸縮性的情況下,一般會選擇使用異步方法。但使用過Socket提供的異步方法的同學,應該都會注意到了Socket的異步方法是無法設置Timeout的。以Receive操作為例,Socket提供了一個ReceiveTimeout屬性,但該屬性設置的是同步版本的Socket.Receive()方法的Timeout值,該設置對異步的Socket.BeginReceive()無效:如果對方沒有返回任何消息,則BeginReceive操作將無法完成,其中提供的回調函數也將不會調用。如下示例代碼所示:
private static void TestSocketBeginReceive()
{
Socket socket = new Socket(AddressFamily.InterNetwork,
SocketType.Dgram, ProtocolType.Udp);
byte[] content = Encoding.ASCII.GetBytes("Hello world");
IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];
IPEndPoint receiver = new IPEndPoint(ip, 80);
socket.BeginSendTo(content, 0, content.Length, SocketFlags.None,
receiver, SendToCb, socket);
Console.WriteLine("Sent bytes: " + content.Length);
}
private static void SendToCb(IAsyncResult ar)
{
var socket = ar.AsyncState as Socket;
socket.EndSendTo(ar);
byte[] buffer = new byte[1024];
IAsyncResult receiveAr = socket.BeginReceive(buffer, 0, buffer.Length,
SocketFlags.None, null, null);
int received = socket.EndReceive(receiveAr);
Console.WriteLine("Received bytes: " + received);
}
由於接收方不會返回任何消息,Socket.BeginReceive將永遠不會完成,SentToCb方法中的socket.EndReceive()調用將永遠阻塞,應用程序也無法得知操作的狀態。
支持Timeout
在個別的應用場景下,我們希望既能使用Socket的異步通信方法,保證程序的性能,同時又希望能指定Timeout值,當操作沒有在指定的時間內完成時,應用程序能得到通知,以進行下一步的操作,如retry等。以下介紹的就是一種支持Timeout的Socket異步Receive操作的實現,方式如下:
1.基於APM異步模式封裝Socket.BeginReceive/EndReceive方法。
2.使用ThreadPool提供的RegisterWaitForSingleObject()方法注冊一個WaitOrTimerCallback,如果指定時間內操作未完成,則結束操作,並設置狀態為Timeout。
3.將上述封裝實現為Socket的擴展方法方便調用。
以下代碼簡化了所有的參數檢查和異常處理,實際使用中需添加相關邏輯。
AsyncResultWithTimeout
首先看一下IAsyncResult接口的實現:
public class AsyncResultWithTimeout : IAsyncResult
{
private ManualResetEvent m_waitHandle = new ManualResetEvent(false);
public AsyncResultWithTimeout(AsyncCallback cb, object state)
{
this.AsyncState = state;
this.Callback = cb;
}
#region IAsyncResult
public object AsyncState { get; private set; }
public WaitHandle AsyncWaitHandle { get { return m_waitHandle; } }
public bool CompletedSynchronously { get { return false; } }
public bool IsCompleted { get; private set; }
#endregion
public AsyncCallback Callback { get; private set; }
public int ReceivedCount { get; private set; }
public bool TimedOut { get; private set; }
public void SetResult(int count)
{
this.IsCompleted = true;
this.ReceivedCount = count;
this.m_waitHandle.Set();
if (Callback != null) Callback(this);
}
public void SetTimeout()
{
this.TimedOut = true;
this.IsCompleted = true;
this.m_waitHandle.Set();
}
}
AsyncResultWithTimeOut類中包含了IAsyncResult接口中4個屬性的實現、用戶傳入的AsyncCallback委托、接收到的字節數ReceivedCount以及兩個額外的方法:
1.SetResult(): 用於正常接收到消息時設置結果,標記操作完成以及執行回調。
2.SetTimeout():當超時時,標記操作完成以及設置超時狀態。
StateInfo
StateInfo類用於保存相關的狀態信息,該對象會作為Socket.BeginReceive()的最后一個參數傳入。當接收到消息時,接收到的字節數會保存到AsyncResult屬性中,並設置操作完成。當超時時,WatchTimeOut方法會將AsyncResult設置為TimeOut狀態,並通過RegisteredWaitHandle屬性取消注冊的WaitOrTimerCallback.
public class StateInfo
{
public StateInfo(AsyncResultWithTimeout result, Socket socket)
{
this.AsycResult = result;
this.Socket = socket;
}
public Socket Socket { get; private set; }
public AsyncResultWithTimeout AsycResult { get; private set; }
public RegisteredWaitHandle RegisteredWaitHandle { get; set; }
}
封裝Socket.BeginReceive
與Socket.BeginReceive方法相比,BeginReceive2添加了一個參數timeout,可以設置該操作的超時時間,單位為毫秒。BeginReceive2中調用Socket.BeginReceive()方法,其中指定的ReceiveCb回調將在正常接收到消息后將結果保存在stateInfo對象的AsyncResult屬性中,該屬性中的值就是BeginReceive2()方法返回的IAsyncResult。BeginReceive2調用Socket.BeginReceive后,在ThreadPool中注冊了一個WaitOrTimerCallback委托。ThreadPool將在Receive操作完成或者Timeout時調用該委托。
public static class SocketExtension
{
public static int EndReceive2(IAsyncResult ar)
{
var result = ar as AsyncResultWithTimeout;
result.AsyncWaitHandle.WaitOne();
return result.ReceivedCount;
}
public static AsyncResultWithTimeout BeginReceive2
(
this Socket socket,
int timeout,
byte[] buffer,
int offset,
int size,
SocketFlags flags,
AsyncCallback callback,
object state
)
{
var result = new AsyncResultWithTimeout(callback, state);
var stateInfo = new StateInfo(result, socket);
socket.BeginReceive(buffer, offset, size, flags, ReceiveCb, state);
var registeredWaitHandle =
ThreadPool.RegisterWaitForSingleObject(
result.AsyncWaitHandle,
WatchTimeOut,
stateInfo, // 作為state傳遞給WatchTimeOut
timeout,
true);
// stateInfo中保存RegisteredWaitHandle,以方便在úWatchTimeOut
// 中unregister.
stateInfo.RegisteredWaitHandle = registeredWaitHandle;
return result;
}
private static void WatchTimeOut(object state, bool timeout)
{
var stateInfo = state as StateInfo;
// 設置的timeout前,操作未完成,則設置為操作Timeout
if (timeout)
{
stateInfo.AsycResult.SetTimeout();
}
// 取消之前注冊的WaitOrTimerCallback
stateInfo.RegisteredWaitHandle.Unregister(
stateInfo.AsycResult.AsyncWaitHandle);
}
private static void ReceiveCb(IAsyncResult result)
{
var state = result.AsyncState as StateInfo;
var asyncResultWithTimeOut = state.AsycResult;
var count = state.Socket.EndReceive(result);
state.AsycResult.SetResult(count);
}
}
試一下
以下代碼演示了如何使用BeginReceive2:
private static void TestSocketBeginReceive2()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp);
byte[] content = Encoding.ASCII.GetBytes("Hello world");
IPAddress ip = Dns.Resolve("www.google.com").AddressList[0];
IPEndPoint receiver = new IPEndPoint(ip, 80);
socket.BeginSendTo(content, 0, content.Length, SocketFlags.None, receiver, SendToCb2, socket);
Console.WriteLine("Sent bytes: " + content.Length);
}
private static void SendToCb2(IAsyncResult ar)
{
var socket = ar.AsyncState as Socket;
socket.EndSendTo(ar);
byte[] buffer = new byte[1024];
AsyncResultWithTimeout receiveAr = socket.BeginReceive2(2000, buffer, 0, buffer.Length, SocketFlags.None, null, null);
receiveAr.AsyncWaitHandle.WaitOne();
if (receiveAr.TimedOut)
{
Console.WriteLine("Operation timed out.");
}
else
{
int received = socket.EndReceive(ar);
Console.WriteLine("Received bytes: " + received);
}
}
輸出結果如下:
上述實現是針對BeginReceive的封裝,還可以以相同的方式將Send/Receive封裝以支持Timeout, 或者更進一步支持retry操作。
附示例代碼:files.cnblogs.com/dytes/SocketAsyncOpWithTimeOut.zip
本文轉自:http://www.csharpwin.com/csharpspace/13263r8436.shtml