原創性申明
本文作者:小竹zz 博客地址:http://blog.csdn.net/zhujunxxxxx/article/details/43573879轉載請注明出處
引言
我一直在探尋一個高性能的Socket客戶端代碼。以前,我使用Socket類寫了一些基於傳統異步編程模型的代碼(BeginSend、BeginReceive,等等)也看過很多博客的知識,在linux中有poll和epoll來實現,在windows下面
微軟MSDN中也提供了SocketAsyncEventArgs這個類來實現IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
NET Framework中的APM也稱為Begin/End模式。這是因為會調用Begin方法來啟動異步操作,然后返回一個IAsyncResult 對象。可以選擇將一個代理作為參數提供給Begin方法,異步操作完成時會調用該方法。或者,一個線程可以等待 IAsyncResult.AsyncWaitHandle。當回調被調用或發出等待信號時,就會調用End方法來獲取異步操作的結果。這種模式很靈活,使用相對簡單,在 .NET Framework 中非常常見。
但是,您必須注意,如果進行大量異步套接字操作,是要付出代價的。針對每次操作,都必須創建一個IAsyncResult對象,而且該對象不能被重復使用。由於大量使用對象分配和垃圾收集,這會影響性能。為了解決這個問題,新版本提供了另一個使用套接字上執行異步I/O的方法模式。這種新模式並不要求為每個套接字操作分配操作上下文對象。
代碼下載:http://download.csdn.net/detail/zhujunxxxxx/8431289 這里的代碼優化了的
目標
在上面微軟提供的例子我覺得不是很完整,沒有具體一個流程,只是受到客戶端消息后發送相同內容給客戶端,初學者不容易看懂流程,因為我花了一天的時間來實現一個功能齊全的IOCP服務器,
效果如下
代碼
首先是ICOPServer.cs 這個類是IOCP服務器的核心類,目前這個類是網絡上比較全的代碼,MSDN上面的例子都沒有我的全
[csharp] view plain copy print?
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using System.Net.Sockets;
- using System.Net;
- using System.Threading;
- namespace ServerTest
- {
- /// <summary>
- /// IOCP SOCKET服務器
- /// </summary>
- public class IOCPServer : IDisposable
- {
- const int opsToPreAlloc = 2;
- #region Fields
- /// <summary>
- /// 服務器程序允許的最大客戶端連接數
- /// </summary>
- private int _maxClient;
- /// <summary>
- /// 監聽Socket,用於接受客戶端的連接請求
- /// </summary>
- private Socket _serverSock;
- /// <summary>
- /// 當前的連接的客戶端數
- /// </summary>
- private int _clientCount;
- /// <summary>
- /// 用於每個I/O Socket操作的緩沖區大小
- /// </summary>
- private int _bufferSize = 1024;
- /// <summary>
- /// 信號量
- /// </summary>
- Semaphore _maxAcceptedClients;
- /// <summary>
- /// 緩沖區管理
- /// </summary>
- BufferManager _bufferManager;
- /// <summary>
- /// 對象池
- /// </summary>
- SocketAsyncEventArgsPool _objectPool;
- private bool disposed = false;
- #endregion
- #region Properties
- /// <summary>
- /// 服務器是否正在運行
- /// </summary>
- public bool IsRunning { get; private set; }
- /// <summary>
- /// 監聽的IP地址
- /// </summary>
- public IPAddress Address { get; private set; }
- /// <summary>
- /// 監聽的端口
- /// </summary>
- public int Port { get; private set; }
- /// <summary>
- /// 通信使用的編碼
- /// </summary>
- public Encoding Encoding { get; set; }
- #endregion
- #region Ctors
- /// <summary>
- /// 異步IOCP SOCKET服務器
- /// </summary>
- /// <param name="listenPort">監聽的端口</param>
- /// <param name="maxClient">最大的客戶端數量</param>
- public IOCPServer(int listenPort,int maxClient)
- : this(IPAddress.Any, listenPort, maxClient)
- {
- }
- /// <summary>
- /// 異步Socket TCP服務器
- /// </summary>
- /// <param name="localEP">監聽的終結點</param>
- /// <param name="maxClient">最大客戶端數量</param>
- public IOCPServer(IPEndPoint localEP, int maxClient)
- : this(localEP.Address, localEP.Port,maxClient)
- {
- }
- /// <summary>
- /// 異步Socket TCP服務器
- /// </summary>
- /// <param name="localIPAddress">監聽的IP地址</param>
- /// <param name="listenPort">監聽的端口</param>
- /// <param name="maxClient">最大客戶端數量</param>
- public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient)
- {
- this.Address = localIPAddress;
- this.Port = listenPort;
- this.Encoding = Encoding.Default;
- _maxClient = maxClient;
- _serverSock = new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- _bufferManager = new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize);
- _objectPool = new SocketAsyncEventArgsPool(_maxClient);
- _maxAcceptedClients = new Semaphore(_maxClient, _maxClient);
- }
- #endregion
- #region 初始化
- /// <summary>
- /// 初始化函數
- /// </summary>
- public void Init()
- {
- // Allocates one large byte buffer which all I/O operations use a piece of. This gaurds
- // against memory fragmentation
- _bufferManager.InitBuffer();
- // preallocate pool of SocketAsyncEventArgs objects
- SocketAsyncEventArgs readWriteEventArg;
- for (int i = 0; i < _maxClient; i++)
- {
- //Pre-allocate a set of reusable SocketAsyncEventArgs
- readWriteEventArg = new SocketAsyncEventArgs();
- readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);
- readWriteEventArg.UserToken = null;
- // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object
- _bufferManager.SetBuffer(readWriteEventArg);
- // add SocketAsyncEventArg to the pool
- _objectPool.Push(readWriteEventArg);
- }
- }
- #endregion
- #region Start
- /// <summary>
- /// 啟動
- /// </summary>
- public void Start()
- {
- if (!IsRunning)
- {
- Init();
- IsRunning = true;
- IPEndPoint localEndPoint = new IPEndPoint(Address, Port);
- // 創建監聽socket
- _serverSock = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- //_serverSock.ReceiveBufferSize = _bufferSize;
- //_serverSock.SendBufferSize = _bufferSize;
- if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)
- {
- // 配置監聽socket為 dual-mode (IPv4 & IPv6)
- // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below,
- _serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false);
- _serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));
- }
- else
- {
- _serverSock.Bind(localEndPoint);
- }
- // 開始監聽
- _serverSock.Listen(this._maxClient);
- // 在監聽Socket上投遞一個接受請求。
- StartAccept(null);
- }
- }
- #endregion
- #region Stop
- /// <summary>
- /// 停止服務
- /// </summary>
- public void Stop()
- {
- if (IsRunning)
- {
- IsRunning = false;
- _serverSock.Close();
- //TODO 關閉對所有客戶端的連接
- }
- }
- #endregion
- #region Accept
- /// <summary>
- /// 從客戶端開始接受一個連接操作
- /// </summary>
- private void StartAccept(SocketAsyncEventArgs asyniar)
- {
- if (asyniar == null)
- {
- asyniar = new SocketAsyncEventArgs();
- asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted);
- }
- else
- {
- //socket must be cleared since the context object is being reused
- asyniar.AcceptSocket = null;
- }
- _maxAcceptedClients.WaitOne();
- if (!_serverSock.AcceptAsync(asyniar))
- {
- ProcessAccept(asyniar);
- //如果I/O掛起等待異步則觸發AcceptAsyn_Asyn_Completed事件
- //此時I/O操作同步完成,不會觸發Asyn_Completed事件,所以指定BeginAccept()方法
- }
- }
- /// <summary>
- /// accept 操作完成時回調函數
- /// </summary>
- /// <param name="sender">Object who raised the event.</param>
- /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>
- private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e)
- {
- ProcessAccept(e);
- }
- /// <summary>
- /// 監聽Socket接受處理
- /// </summary>
- /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param>
- private void ProcessAccept(SocketAsyncEventArgs e)
- {
- if (e.SocketError == SocketError.Success)
- {
- Socket s = e.AcceptSocket;//和客戶端關聯的socket
- if (s.Connected)
- {
- try
- {
- Interlocked.Increment(ref _clientCount);//原子操作加1
- SocketAsyncEventArgs asyniar = _objectPool.Pop();
- asyniar.UserToken = s;
- Log4Debug(String.Format("客戶 {0} 連入, 共有 {1} 個連接。", s.RemoteEndPoint.ToString(), _clientCount));
- if (!s.ReceiveAsync(asyniar))//投遞接收請求
- {
- ProcessReceive(asyniar);
- }
- }
- catch (SocketException ex)
- {
- Log4Debug(String.Format("接收客戶 {0} 數據出錯, 異常信息: {1} 。", s.RemoteEndPoint, ex.ToString()));
- //TODO 異常處理
- }
- //投遞下一個接受請求
- StartAccept(e);
- }
- }
- }
- #endregion
- #region 發送數據
- /// <summary>
- /// 異步的發送數據
- /// </summary>
- /// <param name="e"></param>
- /// <param name="data"></param>
- public void Send(SocketAsyncEventArgs e, byte[] data)
- {
- if (e.SocketError == SocketError.Success)
- {
- Socket s = e.AcceptSocket;//和客戶端關聯的socket
- if (s.Connected)
- {
- Array.Copy(data, 0, e.Buffer, 0, data.Length);//設置發送數據
- //e.SetBuffer(data, 0, data.Length); //設置發送數據
- if (!s.SendAsync(e))//投遞發送請求,這個函數有可能同步發送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件
- {
- // 同步發送時處理發送完成事件
- ProcessSend(e);
- }
- else
- {
- CloseClientSocket(e);
- }
- }
- }
- }
- /// <summary>
- /// 同步的使用socket發送數據
- /// </summary>
- /// <param name="socket"></param>
- /// <param name="buffer"></param>
- /// <param name="offset"></param>
- /// <param name="size"></param>
- /// <param name="timeout"></param>
- public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout)
- {
- socket.SendTimeout = 0;
- int startTickCount = Environment.TickCount;
- int sent = 0; // how many bytes is already sent
- do
- {
- if (Environment.TickCount > startTickCount + timeout)
- {
- //throw new Exception("Timeout.");
- }
- try
- {
- sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None);
- }
- catch (SocketException ex)
- {
- if (ex.SocketErrorCode == SocketError.WouldBlock ||
- ex.SocketErrorCode == SocketError.IOPending ||
- ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable)
- {
- // socket buffer is probably full, wait and try again
- Thread.Sleep(30);
- }
- else
- {
- throw ex; // any serious error occurr
- }
- }
- } while (sent < size);
- }
- /// <summary>
- /// 發送完成時處理函數
- /// </summary>
- /// <param name="e">與發送完成操作相關聯的SocketAsyncEventArg對象</param>
- private void ProcessSend(SocketAsyncEventArgs e)
- {
- if (e.SocketError == SocketError.Success)
- {
- Socket s = (Socket)e.UserToken;
- //TODO
- }
- else
- {
- CloseClientSocket(e);
- }
- }
- #endregion
- #region 接收數據
- /// <summary>
- ///接收完成時處理函數
- /// </summary>
- /// <param name="e">與接收完成操作相關聯的SocketAsyncEventArg對象</param>
- private void ProcessReceive(SocketAsyncEventArgs e)
- {
- if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
- {
- // 檢查遠程主機是否關閉連接
- if (e.BytesTransferred > 0)
- {
- Socket s = (Socket)e.UserToken;
- //判斷所有需接收的數據是否已經完成
- if (s.Available == 0)
- {
- //從偵聽者獲取接收到的消息。
- //String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred);
- //echo the data received back to the client
- //e.SetBuffer(e.Offset, e.BytesTransferred);
- byte[] data = new byte[e.BytesTransferred];
- Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//從e.Buffer塊中復制數據出來,保證它可重用
- string info=Encoding.Default.GetString(data);
- Log4Debug(String.Format("收到 {0} 數據為 {1}",s.RemoteEndPoint.ToString(),info));
- //TODO 處理數據
- //增加服務器接收的總字節數。
- }
- if (!s.ReceiveAsync(e))//為接收下一段數據,投遞接收請求,這個函數有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件
- {
- //同步接收時處理接收完成事件
- ProcessReceive(e);
- }
- }
- }
- else
- {
- CloseClientSocket(e);
- }
- }
- #endregion
- #region 回調函數
- /// <summary>
- /// 當Socket上的發送或接收請求被完成時,調用此函數
- /// </summary>
- /// <param name="sender">激發事件的對象</param>
- /// <param name="e">與發送或接收完成操作相關聯的SocketAsyncEventArg對象</param>
- private void OnIOCompleted(object sender, SocketAsyncEventArgs e)
- {
- // Determine which type of operation just completed and call the associated handler.
- switch (e.LastOperation)
- {
- case SocketAsyncOperation.Accept:
- ProcessAccept(e);
- break;
- case SocketAsyncOperation.Receive:
- ProcessReceive(e);
- break;
- default:
- throw new ArgumentException("The last operation completed on the socket was not a receive or send");
- }
- }
- #endregion
- #region Close
- /// <summary>
- /// 關閉socket連接
- /// </summary>
- /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param>
- private void CloseClientSocket(SocketAsyncEventArgs e)
- {
- Log4Debug(String.Format("客戶 {0} 斷開連接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));
- Socket s = e.UserToken as Socket;
- CloseClientSocket(s, e);
- }
- /// <summary>
- /// 關閉socket連接
- /// </summary>
- /// <param name="s"></param>
- /// <param name="e"></param>
- private void CloseClientSocket(Socket s, SocketAsyncEventArgs e)
- {
- try
- {
- s.Shutdown(SocketShutdown.Send);
- }
- catch (Exception)
- {
- // Throw if client has closed, so it is not necessary to catch.
- }
- finally
- {
- s.Close();
- }
- Interlocked.Decrement(ref _clientCount);
- _maxAcceptedClients.Release();
- _objectPool.Push(e);//SocketAsyncEventArg 對象被釋放,壓入可重用隊列。
- }
- #endregion
- #region Dispose
- /// <summary>
- /// Performs application-defined tasks associated with freeing,
- /// releasing, or resetting unmanaged resources.
- /// </summary>
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
- /// <summary>
- /// Releases unmanaged and - optionally - managed resources
- /// </summary>
- /// <param name="disposing"><c>true</c> to release
- /// both managed and unmanaged resources; <c>false</c>
- /// to release only unmanaged resources.</param>
- protected virtual void Dispose(bool disposing)
- {
- if (!this.disposed)
- {
- if (disposing)
- {
- try
- {
- Stop();
- if (_serverSock != null)
- {
- _serverSock = null;
- }
- }
- catch (SocketException ex)
- {
- //TODO 事件
- }
- }
- disposed = true;
- }
- }
- #endregion
- public void Log4Debug(string msg)
- {
- Console.WriteLine("notice:"+msg);
- }
- }
- }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net.Sockets; using System.Net; using System.Threading; namespace ServerTest { /// <summary> /// IOCP SOCKET服務器 /// </summary> public class IOCPServer : IDisposable { const int opsToPreAlloc = 2; #region Fields /// <summary> /// 服務器程序允許的最大客戶端連接數 /// </summary> private int _maxClient; /// <summary> /// 監聽Socket,用於接受客戶端的連接請求 /// </summary> private Socket _serverSock; /// <summary> /// 當前的連接的客戶端數 /// </summary> private int _clientCount; /// <summary> /// 用於每個I/O Socket操作的緩沖區大小 /// </summary> private int _bufferSize = 1024; /// <summary> /// 信號量 /// </summary> Semaphore _maxAcceptedClients; /// <summary> /// 緩沖區管理 /// </summary> BufferManager _bufferManager; /// <summary> /// 對象池 /// </summary> SocketAsyncEventArgsPool _objectPool; private bool disposed = false; #endregion #region Properties /// <summary> /// 服務器是否正在運行 /// </summary> public bool IsRunning { get; private set; } /// <summary> /// 監聽的IP地址 /// </summary> public IPAddress Address { get; private set; } /// <summary> /// 監聽的端口 /// </summary> public int Port { get; private set; } /// <summary> /// 通信使用的編碼 /// </summary> public Encoding Encoding { get; set; } #endregion #region Ctors /// <summary> /// 異步IOCP SOCKET服務器 /// </summary> /// <param name="listenPort">監聽的端口</param> /// <param name="maxClient">最大的客戶端數量</param> public IOCPServer(int listenPort,int maxClient) : this(IPAddress.Any, listenPort, maxClient) { } /// <summary> /// 異步Socket TCP服務器 /// </summary> /// <param name="localEP">監聽的終結點</param> /// <param name="maxClient">最大客戶端數量</param> public IOCPServer(IPEndPoint localEP, int maxClient) : this(localEP.Address, localEP.Port,maxClient) { } /// <summary> /// 異步Socket TCP服務器 /// </summary> /// <param name="localIPAddress">監聽的IP地址</param> /// <param name="listenPort">監聽的端口</param> /// <param name="maxClient">最大客戶端數量</param> public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient) { this.Address = localIPAddress; this.Port = listenPort; this.Encoding = Encoding.Default; _maxClient = maxClient; _serverSock = new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp); _bufferManager = new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize); _objectPool = new SocketAsyncEventArgsPool(_maxClient); _maxAcceptedClients = new Semaphore(_maxClient, _maxClient); } #endregion #region 初始化 /// <summary> /// 初始化函數 /// </summary> public void Init() { // Allocates one large byte buffer which all I/O operations use a piece of. This gaurds // against memory fragmentation _bufferManager.InitBuffer(); // preallocate pool of SocketAsyncEventArgs objects SocketAsyncEventArgs readWriteEventArg; for (int i = 0; i < _maxClient; i++) { //Pre-allocate a set of reusable SocketAsyncEventArgs readWriteEventArg = new SocketAsyncEventArgs(); readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted); readWriteEventArg.UserToken = null; // assign a byte buffer from the buffer pool to the SocketAsyncEventArg object _bufferManager.SetBuffer(readWriteEventArg); // add SocketAsyncEventArg to the pool _objectPool.Push(readWriteEventArg); } } #endregion #region Start /// <summary> /// 啟動 /// </summary> public void Start() { if (!IsRunning) { Init(); IsRunning = true; IPEndPoint localEndPoint = new IPEndPoint(Address, Port); // 創建監聽socket _serverSock = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); //_serverSock.ReceiveBufferSize = _bufferSize; //_serverSock.SendBufferSize = _bufferSize; if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6) { // 配置監聽socket為 dual-mode (IPv4 & IPv6) // 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below, _serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27, false); _serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port)); } else { _serverSock.Bind(localEndPoint); } // 開始監聽 _serverSock.Listen(this._maxClient); // 在監聽Socket上投遞一個接受請求。 StartAccept(null); } } #endregion #region Stop /// <summary> /// 停止服務 /// </summary> public void Stop() { if (IsRunning) { IsRunning = false; _serverSock.Close(); //TODO 關閉對所有客戶端的連接 } } #endregion #region Accept /// <summary> /// 從客戶端開始接受一個連接操作 /// </summary> private void StartAccept(SocketAsyncEventArgs asyniar) { if (asyniar == null) { asyniar = new SocketAsyncEventArgs(); asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnAcceptCompleted); } else { //socket must be cleared since the context object is being reused asyniar.AcceptSocket = null; } _maxAcceptedClients.WaitOne(); if (!_serverSock.AcceptAsync(asyniar)) { ProcessAccept(asyniar); //如果I/O掛起等待異步則觸發AcceptAsyn_Asyn_Completed事件 //此時I/O操作同步完成,不會觸發Asyn_Completed事件,所以指定BeginAccept()方法 } } /// <summary> /// accept 操作完成時回調函數 /// </summary> /// <param name="sender">Object who raised the event.</param> /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param> private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e) { ProcessAccept(e); } /// <summary> /// 監聽Socket接受處理 /// </summary> /// <param name="e">SocketAsyncEventArg associated with the completed accept operation.</param> private void ProcessAccept(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { Socket s = e.AcceptSocket;//和客戶端關聯的socket if (s.Connected) { try { Interlocked.Increment(ref _clientCount);//原子操作加1 SocketAsyncEventArgs asyniar = _objectPool.Pop(); asyniar.UserToken = s; Log4Debug(String.Format("客戶 {0} 連入, 共有 {1} 個連接。", s.RemoteEndPoint.ToString(), _clientCount)); if (!s.ReceiveAsync(asyniar))//投遞接收請求 { ProcessReceive(asyniar); } } catch (SocketException ex) { Log4Debug(String.Format("接收客戶 {0} 數據出錯, 異常信息: {1} 。", s.RemoteEndPoint, ex.ToString())); //TODO 異常處理 } //投遞下一個接受請求 StartAccept(e); } } } #endregion #region 發送數據 /// <summary> /// 異步的發送數據 /// </summary> /// <param name="e"></param> /// <param name="data"></param> public void Send(SocketAsyncEventArgs e, byte[] data) { if (e.SocketError == SocketError.Success) { Socket s = e.AcceptSocket;//和客戶端關聯的socket if (s.Connected) { Array.Copy(data, 0, e.Buffer, 0, data.Length);//設置發送數據 //e.SetBuffer(data, 0, data.Length); //設置發送數據 if (!s.SendAsync(e))//投遞發送請求,這個函數有可能同步發送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { // 同步發送時處理發送完成事件 ProcessSend(e); } else { CloseClientSocket(e); } } } } /// <summary> /// 同步的使用socket發送數據 /// </summary> /// <param name="socket"></param> /// <param name="buffer"></param> /// <param name="offset"></param> /// <param name="size"></param> /// <param name="timeout"></param> public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout) { socket.SendTimeout = 0; int startTickCount = Environment.TickCount; int sent = 0; // how many bytes is already sent do { if (Environment.TickCount > startTickCount + timeout) { //throw new Exception("Timeout."); } try { sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None); } catch (SocketException ex) { if (ex.SocketErrorCode == SocketError.WouldBlock || ex.SocketErrorCode == SocketError.IOPending || ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable) { // socket buffer is probably full, wait and try again Thread.Sleep(30); } else { throw ex; // any serious error occurr } } } while (sent < size); } /// <summary> /// 發送完成時處理函數 /// </summary> /// <param name="e">與發送完成操作相關聯的SocketAsyncEventArg對象</param> private void ProcessSend(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { Socket s = (Socket)e.UserToken; //TODO } else { CloseClientSocket(e); } } #endregion #region 接收數據 /// <summary> ///接收完成時處理函數 /// </summary> /// <param name="e">與接收完成操作相關聯的SocketAsyncEventArg對象</param> private void ProcessReceive(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { // 檢查遠程主機是否關閉連接 if (e.BytesTransferred > 0) { Socket s = (Socket)e.UserToken; //判斷所有需接收的數據是否已經完成 if (s.Available == 0) { //從偵聽者獲取接收到的消息。 //String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred); //echo the data received back to the client //e.SetBuffer(e.Offset, e.BytesTransferred); byte[] data = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//從e.Buffer塊中復制數據出來,保證它可重用 string info=Encoding.Default.GetString(data); Log4Debug(String.Format("收到 {0} 數據為 {1}",s.RemoteEndPoint.ToString(),info)); //TODO 處理數據 //增加服務器接收的總字節數。 } if (!s.ReceiveAsync(e))//為接收下一段數據,投遞接收請求,這個函數有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { //同步接收時處理接收完成事件 ProcessReceive(e); } } } else { CloseClientSocket(e); } } #endregion #region 回調函數 /// <summary> /// 當Socket上的發送或接收請求被完成時,調用此函數 /// </summary> /// <param name="sender">激發事件的對象</param> /// <param name="e">與發送或接收完成操作相關聯的SocketAsyncEventArg對象</param> private void OnIOCompleted(object sender, SocketAsyncEventArgs e) { // Determine which type of operation just completed and call the associated handler. switch (e.LastOperation) { case SocketAsyncOperation.Accept: ProcessAccept(e); break; case SocketAsyncOperation.Receive: ProcessReceive(e); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } #endregion #region Close /// <summary> /// 關閉socket連接 /// </summary> /// <param name="e">SocketAsyncEventArg associated with the completed send/receive operation.</param> private void CloseClientSocket(SocketAsyncEventArgs e) { Log4Debug(String.Format("客戶 {0} 斷開連接!",((Socket)e.UserToken).RemoteEndPoint.ToString())); Socket s = e.UserToken as Socket; CloseClientSocket(s, e); } /// <summary> /// 關閉socket連接 /// </summary> /// <param name="s"></param> /// <param name="e"></param> private void CloseClientSocket(Socket s, SocketAsyncEventArgs e) { try { s.Shutdown(SocketShutdown.Send); } catch (Exception) { // Throw if client has closed, so it is not necessary to catch. } finally { s.Close(); } Interlocked.Decrement(ref _clientCount); _maxAcceptedClients.Release(); _objectPool.Push(e);//SocketAsyncEventArg 對象被釋放,壓入可重用隊列。 } #endregion #region Dispose /// <summary> /// Performs application-defined tasks associated with freeing, /// releasing, or resetting unmanaged resources. /// </summary> public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// <summary> /// Releases unmanaged and - optionally - managed resources /// </summary> /// <param name="disposing"><c>true</c> to release /// both managed and unmanaged resources; <c>false</c> /// to release only unmanaged resources.</param> protected virtual void Dispose(bool disposing) { if (!this.disposed) { if (disposing) { try { Stop(); if (_serverSock != null) { _serverSock = null; } } catch (SocketException ex) { //TODO 事件 } } disposed = true; } } #endregion public void Log4Debug(string msg) { Console.WriteLine("notice:"+msg); } } }
BufferManager.cs 這個類是緩存管理類,是采用MSDN上面的例子一樣的 地址: https://msdn.microsoft.com/zh-cn/library/bb517542.aspx
SocketAsyncEventArgsPool.cs 這個類也是來自MSDN的 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx
需要的話自己到MSDN網站上去取,我就不貼出來了
服務器端
[csharp] view plain copy print?
- static void Main(string[] args)
- {
- IOCPServer server = new IOCPServer(8088, 1024);
- server.Start();
- Console.WriteLine("服務器已啟動....");
- System.Console.ReadLine();
- }
static void Main(string[] args) { IOCPServer server = new IOCPServer(8088, 1024); server.Start(); Console.WriteLine("服務器已啟動...."); System.Console.ReadLine(); }
客戶端
客戶端代碼也是很簡單
[csharp] view plain copy print?
- static void Main(string[] args)
- {
- IPAddress remote=IPAddress.Parse("192.168.3.4");
- client c = new client(8088,remote);
- c.connect();
- Console.WriteLine("服務器連接成功!");
- while (true)
- {
- Console.Write("send>");
- string msg=Console.ReadLine();
- if (msg == "exit")
- break;
- c.send(msg);
- }
- c.disconnect();
- Console.ReadLine();
- }
static void Main(string[] args) { IPAddress remote=IPAddress.Parse("192.168.3.4"); client c = new client(8088,remote); c.connect(); Console.WriteLine("服務器連接成功!"); while (true) { Console.Write("send>"); string msg=Console.ReadLine(); if (msg == "exit") break; c.send(msg); } c.disconnect(); Console.ReadLine(); }
client.cs
[csharp] view plain copy print?
- public class client
- {
- public TcpClient _client;
- public int port;
- public IPAddress remote;
- public client(int port,IPAddress remote)
- {
- this.port = port;
- this.remote = remote;
- }
- public void connect()
- {
- this._client=new TcpClient();
- _client.Connect(remote, port);
- }
- public void disconnect()
- {
- _client.Close();
- }
- public void send(string msg)
- {
- byte[] data=Encoding.Default.GetBytes(msg);
- _client.GetStream().Write(data, 0, data.Length);
- }
- }
public class client { public TcpClient _client; public int port; public IPAddress remote; public client(int port,IPAddress remote) { this.port = port; this.remote = remote; } public void connect() { this._client=new TcpClient(); _client.Connect(remote, port); } public void disconnect() { _client.Close(); } public void send(string msg) { byte[] data=Encoding.Default.GetBytes(msg); _client.GetStream().Write(data, 0, data.Length); } }
IOCPClient類,使用SocketAsyncEventArgs類建立一個Socket客戶端。雖然MSDN說這個類特別設計給網絡服務器應用,但也沒有限制在客戶端代碼中使用APM。下面給出了IOCPClient類的樣例代碼:
[csharp] view plain copy print?
- public class IOCPClient
- {
- /// <summary>
- /// 連接服務器的socket
- /// </summary>
- private Socket _clientSock;
- /// <summary>
- /// 用於服務器執行的互斥同步對象
- /// </summary>
- private static Mutex mutex = new Mutex();
- /// <summary>
- /// Socket連接標志
- /// </summary>
- private Boolean _connected = false;
- private const int ReceiveOperation = 1, SendOperation = 0;
- private static AutoResetEvent[]
- autoSendReceiveEvents = new AutoResetEvent[]
- {
- new AutoResetEvent(false),
- new AutoResetEvent(false)
- };
- /// <summary>
- /// 服務器監聽端點
- /// </summary>
- private IPEndPoint _remoteEndPoint;
- public IOCPClient(IPEndPoint local,IPEndPoint remote)
- {
- _clientSock = new Socket(local.AddressFamily,SocketType.Stream, ProtocolType.Tcp);
- _remoteEndPoint = remote;
- }
- #region 連接服務器
- /// <summary>
- /// 連接遠程服務器
- /// </summary>
- public void Connect()
- {
- SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs();
- connectArgs.UserToken = _clientSock;
- connectArgs.RemoteEndPoint = _remoteEndPoint;
- connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnected);
- mutex.WaitOne();
- if (!_clientSock.ConnectAsync(connectArgs))//異步連接
- {
- ProcessConnected(connectArgs);
- }
- }
- /// <summary>
- /// 連接上的事件
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- void OnConnected(object sender, SocketAsyncEventArgs e)
- {
- mutex.ReleaseMutex();
- //設置Socket已連接標志。
- _connected = (e.SocketError == SocketError.Success);
- }
- /// <summary>
- /// 處理連接服務器
- /// </summary>
- /// <param name="e"></param>
- private void ProcessConnected(SocketAsyncEventArgs e)
- {
- //TODO
- }
- #endregion
- #region 發送消息
- /// <summary>
- /// 向服務器發送消息
- /// </summary>
- /// <param name="data"></param>
- public void Send(byte[] data)
- {
- SocketAsyncEventArgs asyniar = new SocketAsyncEventArgs();
- asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendComplete);
- asyniar.SetBuffer(data, 0, data.Length);
- asyniar.UserToken = _clientSock;
- asyniar.RemoteEndPoint = _remoteEndPoint;
- autoSendReceiveEvents[SendOperation].WaitOne();
- if (!_clientSock.SendAsync(asyniar))//投遞發送請求,這個函數有可能同步發送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件
- {
- // 同步發送時處理發送完成事件
- ProcessSend(asyniar);
- }
- }
- /// <summary>
- /// 發送操作的回調方法
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void OnSendComplete(object sender, SocketAsyncEventArgs e)
- {
- //發出發送完成信號。
- autoSendReceiveEvents[SendOperation].Set();
- ProcessSend(e);
- }
- /// <summary>
- /// 發送完成時處理函數
- /// </summary>
- /// <param name="e">與發送完成操作相關聯的SocketAsyncEventArg對象</param>
- private void ProcessSend(SocketAsyncEventArgs e)
- {
- //TODO
- }
- #endregion
- #region 接收消息
- /// <summary>
- /// 開始監聽服務端數據
- /// </summary>
- /// <param name="e"></param>
- public void StartRecive(SocketAsyncEventArgs e)
- {
- //准備接收。
- Socket s = e.UserToken as Socket;
- byte[] receiveBuffer = new byte[255];
- e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length);
- e.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveComplete);
- autoSendReceiveEvents[ReceiveOperation].WaitOne();
- if (!s.ReceiveAsync(e))
- {
- ProcessReceive(e);
- }
- }
- /// <summary>
- /// 接收操作的回調方法
- /// </summary>
- /// <param name="sender"></param>
- /// <param name="e"></param>
- private void OnReceiveComplete(object sender, SocketAsyncEventArgs e)
- {
- //發出接收完成信號。
- autoSendReceiveEvents[ReceiveOperation].Set();
- ProcessReceive(e);
- }
- /// <summary>
- ///接收完成時處理函數
- /// </summary>
- /// <param name="e">與接收完成操作相關聯的SocketAsyncEventArg對象</param>
- private void ProcessReceive(SocketAsyncEventArgs e)
- {
- if (e.SocketError == SocketError.Success)
- {
- // 檢查遠程主機是否關閉連接
- if (e.BytesTransferred > 0)
- {
- Socket s = (Socket)e.UserToken;
- //判斷所有需接收的數據是否已經完成
- if (s.Available == 0)
- {
- byte[] data = new byte[e.BytesTransferred];
- Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//從e.Buffer塊中復制數據出來,保證它可重用
- //TODO 處理數據
- }
- if (!s.ReceiveAsync(e))//為接收下一段數據,投遞接收請求,這個函數有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件
- {
- //同步接收時處理接收完成事件
- ProcessReceive(e);
- }
- }
- }
- }
- #endregion
- public void Close()
- {
- _clientSock.Disconnect(false);
- }
- /// <summary>
- /// 失敗時關閉Socket,根據SocketError拋出異常。
- /// </summary>
- /// <param name="e"></param>
- private void ProcessError(SocketAsyncEventArgs e)
- {
- Socket s = e.UserToken as Socket;
- if (s.Connected)
- {
- //關閉與客戶端關聯的Socket
- try
- {
- s.Shutdown(SocketShutdown.Both);
- }
- catch (Exception)
- {
- //如果客戶端處理已經關閉,拋出異常
- }
- finally
- {
- if (s.Connected)
- {
- s.Close();
- }
- }
- }
- //拋出SocketException
- throw new SocketException((Int32)e.SocketError);
- }
- /// <summary>
- /// 釋放SocketClient實例
- /// </summary>
- public void Dispose()
- {
- mutex.Close();
- autoSendReceiveEvents[SendOperation].Close();
- autoSendReceiveEvents[ReceiveOperation].Close();
- if (_clientSock.Connected)
- {
- _clientSock.Close();
- }
- }
- }
public class IOCPClient { /// <summary> /// 連接服務器的socket /// </summary> private Socket _clientSock; /// <summary> /// 用於服務器執行的互斥同步對象 /// </summary> private static Mutex mutex = new Mutex(); /// <summary> /// Socket連接標志 /// </summary> private Boolean _connected = false; private const int ReceiveOperation = 1, SendOperation = 0; private static AutoResetEvent[] autoSendReceiveEvents = new AutoResetEvent[] { new AutoResetEvent(false), new AutoResetEvent(false) }; /// <summary> /// 服務器監聽端點 /// </summary> private IPEndPoint _remoteEndPoint; public IOCPClient(IPEndPoint local,IPEndPoint remote) { _clientSock = new Socket(local.AddressFamily,SocketType.Stream, ProtocolType.Tcp); _remoteEndPoint = remote; } #region 連接服務器 /// <summary> /// 連接遠程服務器 /// </summary> public void Connect() { SocketAsyncEventArgs connectArgs = new SocketAsyncEventArgs(); connectArgs.UserToken = _clientSock; connectArgs.RemoteEndPoint = _remoteEndPoint; connectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnConnected); mutex.WaitOne(); if (!_clientSock.ConnectAsync(connectArgs))//異步連接 { ProcessConnected(connectArgs); } } /// <summary> /// 連接上的事件 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> void OnConnected(object sender, SocketAsyncEventArgs e) { mutex.ReleaseMutex(); //設置Socket已連接標志。 _connected = (e.SocketError == SocketError.Success); } /// <summary> /// 處理連接服務器 /// </summary> /// <param name="e"></param> private void ProcessConnected(SocketAsyncEventArgs e) { //TODO } #endregion #region 發送消息 /// <summary> /// 向服務器發送消息 /// </summary> /// <param name="data"></param> public void Send(byte[] data) { SocketAsyncEventArgs asyniar = new SocketAsyncEventArgs(); asyniar.Completed += new EventHandler<SocketAsyncEventArgs>(OnSendComplete); asyniar.SetBuffer(data, 0, data.Length); asyniar.UserToken = _clientSock; asyniar.RemoteEndPoint = _remoteEndPoint; autoSendReceiveEvents[SendOperation].WaitOne(); if (!_clientSock.SendAsync(asyniar))//投遞發送請求,這個函數有可能同步發送出去,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { // 同步發送時處理發送完成事件 ProcessSend(asyniar); } } /// <summary> /// 發送操作的回調方法 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void OnSendComplete(object sender, SocketAsyncEventArgs e) { //發出發送完成信號。 autoSendReceiveEvents[SendOperation].Set(); ProcessSend(e); } /// <summary> /// 發送完成時處理函數 /// </summary> /// <param name="e">與發送完成操作相關聯的SocketAsyncEventArg對象</param> private void ProcessSend(SocketAsyncEventArgs e) { //TODO } #endregion #region 接收消息 /// <summary> /// 開始監聽服務端數據 /// </summary> /// <param name="e"></param> public void StartRecive(SocketAsyncEventArgs e) { //准備接收。 Socket s = e.UserToken as Socket; byte[] receiveBuffer = new byte[255]; e.SetBuffer(receiveBuffer, 0, receiveBuffer.Length); e.Completed += new EventHandler<SocketAsyncEventArgs>(OnReceiveComplete); autoSendReceiveEvents[ReceiveOperation].WaitOne(); if (!s.ReceiveAsync(e)) { ProcessReceive(e); } } /// <summary> /// 接收操作的回調方法 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void OnReceiveComplete(object sender, SocketAsyncEventArgs e) { //發出接收完成信號。 autoSendReceiveEvents[ReceiveOperation].Set(); ProcessReceive(e); } /// <summary> ///接收完成時處理函數 /// </summary> /// <param name="e">與接收完成操作相關聯的SocketAsyncEventArg對象</param> private void ProcessReceive(SocketAsyncEventArgs e) { if (e.SocketError == SocketError.Success) { // 檢查遠程主機是否關閉連接 if (e.BytesTransferred > 0) { Socket s = (Socket)e.UserToken; //判斷所有需接收的數據是否已經完成 if (s.Available == 0) { byte[] data = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//從e.Buffer塊中復制數據出來,保證它可重用 //TODO 處理數據 } if (!s.ReceiveAsync(e))//為接收下一段數據,投遞接收請求,這個函數有可能同步完成,這時返回false,並且不會引發SocketAsyncEventArgs.Completed事件 { //同步接收時處理接收完成事件 ProcessReceive(e); } } } } #endregion public void Close() { _clientSock.Disconnect(false); } /// <summary> /// 失敗時關閉Socket,根據SocketError拋出異常。 /// </summary> /// <param name="e"></param> private void ProcessError(SocketAsyncEventArgs e) { Socket s = e.UserToken as Socket; if (s.Connected) { //關閉與客戶端關聯的Socket try { s.Shutdown(SocketShutdown.Both); } catch (Exception) { //如果客戶端處理已經關閉,拋出異常 } finally { if (s.Connected) { s.Close(); } } } //拋出SocketException throw new SocketException((Int32)e.SocketError); } /// <summary> /// 釋放SocketClient實例 /// </summary> public void Dispose() { mutex.Close(); autoSendReceiveEvents[SendOperation].Close(); autoSendReceiveEvents[ReceiveOperation].Close(); if (_clientSock.Connected) { _clientSock.Close(); } } }
這個類我沒有測試,但是理論上是沒問題的。
本文地址:http://blog.csdn.net/zhujunxxxxx/article/details/43573879