1.類
(1)socket IO操作內存管理類 BufferManager
// This class creates a single large buffer which can be divided up // and assigned to SocketAsyncEventArgs objects for use with each // socket I/O operation. // This enables bufffers to be easily reused and guards against // fragmenting heap memory. // // The operations exposed on the BufferManager class are not thread safe. public class BufferManager { //buffer緩沖區大小 private int m_numBytes; //緩沖區 private byte[] m_buffer; private Stack<int> m_freeIndexPool; private int m_currentIndex; private int m_bufferSize; public BufferManager(int totalBytes, int bufferSize) { m_numBytes = totalBytes; m_currentIndex = 0; m_bufferSize = bufferSize; m_freeIndexPool = new Stack<int>(); } /// <summary> /// 給buffer分配緩沖區 /// </summary> public void InitBuffer() { m_buffer = new byte[m_numBytes]; } /// <summary> /// 將buffer添加到args的IO緩沖區中,並設置offset /// </summary> /// <param name="args"></param> /// <returns></returns> public bool SetBuffer(SocketAsyncEventArgs args) { if (m_freeIndexPool.Count > 0) { args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize); } else { if ((m_numBytes - m_bufferSize) < m_currentIndex) { return false; } args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize); m_currentIndex += m_bufferSize; } return true; } /// <summary> /// 將buffer從args的IO緩沖區中釋放 /// </summary> /// <param name="args"></param> public void FreeBuffer(SocketAsyncEventArgs args) { m_freeIndexPool.Push(args.Offset); args.SetBuffer(null, 0, 0); } /// <summary> /// 釋放全部buffer緩存 /// </summary> public void FreeAllBuffer() { m_freeIndexPool.Clear(); m_currentIndex = 0; m_buffer = null; } }
(2)SocketAsyncEventArgsPool
// Represents a collection of reusable SocketAsyncEventArgs objects. public class SocketAsyncEventArgsPool { private Stack<SocketAsyncEventArgs> m_pool; // Initializes the object pool to the specified size // // The "capacity" parameter is the maximum number of // SocketAsyncEventArgs objects the pool can hold public SocketAsyncEventArgsPool(int capacity) { m_pool = new Stack<SocketAsyncEventArgs>(capacity); } // Add a SocketAsyncEventArg instance to the pool // //The "item" parameter is the SocketAsyncEventArgs instance // to add to the pool public void Push(SocketAsyncEventArgs item) { if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); } lock (m_pool) { m_pool.Push(item); } } // Removes a SocketAsyncEventArgs instance from the pool // and returns the object removed from the pool public SocketAsyncEventArgs Pop() { lock (m_pool) { return m_pool.Pop(); } } /// <summary> /// 清空棧中元素 /// </summary> public void Clear() { lock (m_pool) { m_pool.Clear(); } } // The number of SocketAsyncEventArgs instances in the pool public int Count { get { return m_pool.Count; } } }
(3)AsyncUserToken
public class AsyncUserToken { private Socket socket = null; public Socket Socket { get => socket; set => socket = value; } }
(4)服務器端操作類TcpServiceSocketAsync
public class TcpServiceSocketAsync { //接收數據事件 public Action<string> recvMessageEvent = null; //發送結果事件 public Action<int> sendResultEvent = null; //監聽socket private Socket listenSocket = null; //允許連接到tcp服務器的tcp客戶端數量 private int numConnections = 0; //用於socket發送和接受的緩存區大小 private int bufferSize = 0; //socket緩沖區管理對象 private BufferManager bufferManager = null; //SocketAsyncEventArgs池 private SocketAsyncEventArgsPool socketAsyncEventArgsPool = null; //當前連接的tcp客戶端數量 private int numberAcceptedClients = 0; //控制tcp客戶端連接數量的信號量 private Semaphore maxNumberAcceptedClients = null; //用於socket發送數據的SocketAsyncEventArgs集合 private List<SocketAsyncEventArgs> sendAsyncEventArgs = null; //tcp服務器ip private string ip = ""; //tcp服務器端口 private int port = 0; /// <summary> /// 構造函數 /// </summary> /// <param name="numConnections">允許連接到tcp服務器的tcp客戶端數量</param> /// <param name="bufferSize">用於socket發送和接受的緩存區大小</param> public TcpServiceSocketAsync(int numConnections, int bufferSize) { if (numConnections <= 0 || numConnections > int.MaxValue) throw new ArgumentOutOfRangeException("_numConnections is out of range"); if (bufferSize <= 0 || bufferSize > int.MaxValue) throw new ArgumentOutOfRangeException("_receiveBufferSize is out of range"); this.numConnections = numConnections; this.bufferSize = bufferSize; bufferManager = new BufferManager(numConnections * bufferSize * 2, bufferSize); socketAsyncEventArgsPool = new SocketAsyncEventArgsPool(numConnections); maxNumberAcceptedClients = new Semaphore(numConnections, numConnections); sendAsyncEventArgs = new List<SocketAsyncEventArgs>(); } /// <summary> /// 初始化數據(bufferManager,socketAsyncEventArgsPool) /// </summary> public void Init() { numberAcceptedClients = 0; bufferManager.InitBuffer(); SocketAsyncEventArgs readWriteEventArg; for (int i = 0; i < numConnections * 2; i++) { readWriteEventArg = new SocketAsyncEventArgs(); readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); readWriteEventArg.UserToken = new AsyncUserToken(); bufferManager.SetBuffer(readWriteEventArg); socketAsyncEventArgsPool.Push(readWriteEventArg); } } /// <summary> /// 開啟tcp服務器,等待tcp客戶端連接 /// </summary> /// <param name="ip"></param> /// <param name="port"></param> public void Start(string ip, int port) { if (string.IsNullOrEmpty(ip)) throw new ArgumentNullException("ip cannot be null"); if (port < 1 || port > 65535) throw new ArgumentOutOfRangeException("port is out of range"); this.ip = ip; this.port = port; try { listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); IPAddress address = IPAddress.Parse(ip); IPEndPoint endpoint = new IPEndPoint(address, port); listenSocket.Bind(endpoint);//綁定地址 listenSocket.Listen(int.MaxValue);//開始監聽 StartAccept(null); } catch (Exception ex) { throw ex; } } /// <summary> /// 關閉tcp服務器 /// </summary> public void CloseSocket() { if (listenSocket == null) return; try { foreach (var e in sendAsyncEventArgs) { ((AsyncUserToken)e.UserToken).Socket.Shutdown(SocketShutdown.Both); } listenSocket.Shutdown(SocketShutdown.Both); } catch { } try { foreach (var e in sendAsyncEventArgs) { ((AsyncUserToken)e.UserToken).Socket.Close(); } listenSocket.Close(); } catch { } try { foreach (var e in sendAsyncEventArgs) { e.Dispose(); } } catch { } sendAsyncEventArgs.Clear(); socketAsyncEventArgsPool.Clear(); bufferManager.FreeAllBuffer(); maxNumberAcceptedClients.Release(numberAcceptedClients); } /// <summary> /// 重新啟動tcp服務器 /// </summary> public void Restart() { CloseSocket(); Init(); Start(ip, port); } /// <summary> /// 開始等待tcp客戶端連接 /// </summary> /// <param name="acceptEventArg"></param> private void StartAccept(SocketAsyncEventArgs acceptEventArg) { if (acceptEventArg == null) { acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed); } else { // socket must be cleared since the context object is being reused acceptEventArg.AcceptSocket = null; } maxNumberAcceptedClients.WaitOne(); bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg); if (!willRaiseEvent) { ProcessAccept(acceptEventArg); } } /// <summary> /// Socket.AcceptAsync完成回調函數 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e) { ProcessAccept(e); } /// <summary> /// 接受到tcp客戶端連接,進行處理 /// </summary> /// <param name="e"></param> private void ProcessAccept(SocketAsyncEventArgs e) { Interlocked.Increment(ref numberAcceptedClients); //設置用於接收的SocketAsyncEventArgs的socket,可以接受數據 SocketAsyncEventArgs recvEventArgs = socketAsyncEventArgsPool.Pop(); ((AsyncUserToken)recvEventArgs.UserToken).Socket = e.AcceptSocket; //設置用於發送的SocketAsyncEventArgs的socket,可以發送數據 SocketAsyncEventArgs sendEventArgs = socketAsyncEventArgsPool.Pop(); ((AsyncUserToken)sendEventArgs.UserToken).Socket = e.AcceptSocket; sendAsyncEventArgs.Add(sendEventArgs); StartRecv(recvEventArgs); StartAccept(e); } /// <summary> /// tcp服務器開始接受tcp客戶端發送的數據 /// </summary> private void StartRecv(SocketAsyncEventArgs e) { bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.ReceiveAsync(e); if (!willRaiseEvent) { ProcessReceive(e); } } /// <summary> /// socket.sendAsync和socket.recvAsync的完成回調函數 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void IO_Completed(object sender, SocketAsyncEventArgs e) { switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } /// <summary> /// 處理接受到的tcp客戶端數據 /// </summary> /// <param name="e"></param> private void ProcessReceive(SocketAsyncEventArgs e) { AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { if (recvMessageEvent != null) //一定要指定GetString的長度 recvMessageEvent(Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred)); StartRecv(e); } else { CloseClientSocket(e); } } /// <summary> /// 處理tcp服務器發送的結果 /// </summary> /// <param name="e"></param> private void ProcessSend(SocketAsyncEventArgs e) { AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.SocketError == SocketError.Success) { if (sendResultEvent != null) sendResultEvent(e.BytesTransferred); } else { if (sendResultEvent != null) sendResultEvent(e.BytesTransferred); CloseClientSocket(e); } } /// <summary> /// 關閉一個與tcp客戶端連接的socket /// </summary> /// <param name="e"></param> private void CloseClientSocket(SocketAsyncEventArgs e) { AsyncUserToken token = e.UserToken as AsyncUserToken; try { //關閉socket時,單獨使用socket.close()通常會造成資源提前被釋放,應該在關閉socket之前,先使用shutdown進行接受或者發送的禁用,再使用socket進行釋放 token.Socket.Shutdown(SocketShutdown.Both); } catch { } token.Socket.Close(); Interlocked.Decrement(ref numberAcceptedClients); socketAsyncEventArgsPool.Push(e); maxNumberAcceptedClients.Release(); if (e.LastOperation == SocketAsyncOperation.Send) sendAsyncEventArgs.Remove(e); } /// <summary> /// 給全部tcp客戶端發送數據 /// </summary> /// <param name="message"></param> public void SendMessageToAllClients(string message) { if (string.IsNullOrEmpty(message)) throw new ArgumentNullException("message cannot be null"); foreach (var e in sendAsyncEventArgs) { byte[] buff = Encoding.UTF8.GetBytes(message); if (buff.Length > bufferSize) throw new ArgumentOutOfRangeException("message is out off range"); buff.CopyTo(e.Buffer, e.Offset); e.SetBuffer(e.Offset, buff.Length); bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.SendAsync(e); if (!willRaiseEvent) { ProcessSend(e); } } } }
(5)客戶端操作類TcpClientSocketAsync
public class TcpClientSocketAsync { //接收數據事件 public Action<string> recvMessageEvent = null; //發送結果事件 public Action<int> sendResultEvent = null; //接受緩存數組 private byte[] recvBuff = null; //發送緩存數組 private byte[] sendBuff = null; //連接socket private Socket connectSocket = null; //用於發送數據的SocketAsyncEventArgs private SocketAsyncEventArgs sendEventArg = null; //用於接收數據的SocketAsyncEventArgs private SocketAsyncEventArgs recvEventArg = null; //tcp服務器ip private string ip = ""; //tcp服務器端口 private int port = 0; /// <summary> /// 構造函數 /// </summary> /// <param name="bufferSize">用於socket發送和接受的緩存區大小</param> public TcpClientSocketAsync(int bufferSize) { //設置用於發送數據的SocketAsyncEventArgs sendBuff = new byte[bufferSize]; sendEventArg = new SocketAsyncEventArgs(); sendEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); sendEventArg.SetBuffer(sendBuff, 0, bufferSize); //設置用於接受數據的SocketAsyncEventArgs recvBuff = new byte[bufferSize]; recvEventArg = new SocketAsyncEventArgs(); recvEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed); recvEventArg.SetBuffer(recvBuff, 0, bufferSize); } /// <summary> /// 開啟tcp客戶端,連接tcp服務器 /// </summary> /// <param name="ip"></param> /// <param name="port"></param> public void Start(string ip, int port) { if (string.IsNullOrEmpty(ip)) throw new ArgumentNullException("ip cannot be null"); if (port < 1 || port > 65535) throw new ArgumentOutOfRangeException("port is out of range"); this.ip = ip; this.port = port; try { connectSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); IPAddress address = IPAddress.Parse(ip); IPEndPoint endpoint = new IPEndPoint(address, port); //連接tcp服務器 SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs(); connectEventArg.Completed += ConnectEventArgs_Completed; connectEventArg.RemoteEndPoint = endpoint;//設置要連接的tcp服務器地址 bool willRaiseEvent = connectSocket.ConnectAsync(connectEventArg); if (!willRaiseEvent) ProcessConnect(connectEventArg); } catch (Exception ex) { throw ex; } } /// <summary> /// 發送數據到tcp服務器 /// </summary> /// <param name="message">要發送的數據</param> public void SendMessage(string message) { if (string.IsNullOrEmpty(message)) throw new ArgumentNullException("message cannot be null"); if (connectSocket == null) throw new Exception("socket cannot be null"); byte[] buff = Encoding.UTF8.GetBytes(message); buff.CopyTo(sendBuff, 0); sendEventArg.SetBuffer(0, buff.Length); bool willRaiseEvent = connectSocket.SendAsync(sendEventArg); if (!willRaiseEvent) { ProcessSend(sendEventArg); } } /// <summary> /// 關閉tcp客戶端 /// </summary> public void CloseSocket() { if (connectSocket == null) return; try { //關閉socket時,單獨使用socket.close()通常會造成資源提前被釋放,應該在關閉socket之前,先使用shutdown進行接受或者發送的禁用,再使用socket進行釋放 connectSocket.Shutdown(SocketShutdown.Both); } catch { } try { connectSocket.Close(); } catch { } } /// <summary> /// 重啟tcp客戶端,重新連接tcp服務器 /// </summary> public void Restart() { CloseSocket(); Start(ip, port); } /// <summary> /// Socket.ConnectAsync完成回調函數 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void ConnectEventArgs_Completed(object sender, SocketAsyncEventArgs e) { ProcessConnect(e); } private void ProcessConnect(SocketAsyncEventArgs e) { StartRecv(); } /// <summary> /// tcp客戶端開始接受tcp服務器發送的數據 /// </summary> private void StartRecv() { bool willRaiseEvent = connectSocket.ReceiveAsync(recvEventArg); if (!willRaiseEvent) { ProcessReceive(recvEventArg); } } /// <summary> /// socket.sendAsync和socket.recvAsync的完成回調函數 /// </summary> /// <param name="sender"></param> /// <param name="e"></param> private void IO_Completed(object sender, SocketAsyncEventArgs e) { switch (e.LastOperation) { case SocketAsyncOperation.Receive: ProcessReceive(e); break; case SocketAsyncOperation.Send: ProcessSend(e); break; default: throw new ArgumentException("The last operation completed on the socket was not a receive or send"); } } /// <summary> /// 處理接受到的tcp服務器數據 /// </summary> /// <param name="e"></param> private void ProcessReceive(SocketAsyncEventArgs e) { AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success) { if (recvMessageEvent != null) //一定要指定GetString的長度,否則整個bugger都要轉成string recvMessageEvent(Encoding.UTF8.GetString(e.Buffer, 0, e.BytesTransferred)); StartRecv(); } else { Restart(); } } /// <summary> /// 處理tcp客戶端發送的結果 /// </summary> /// <param name="e"></param> private void ProcessSend(SocketAsyncEventArgs e) { AsyncUserToken token = (AsyncUserToken)e.UserToken; if (e.SocketError == SocketError.Success) { if (sendResultEvent != null) sendResultEvent(e.BytesTransferred); } else { if (sendResultEvent != null) sendResultEvent(-1); Restart(); } } }
2.使用
(1)服務器端
public partial class Form1 : Form { TcpServiceSocketAsync tcpServiceSocket = null; private readonly string ip = "192.168.172.142"; private readonly int port = 8090; public Form1() { InitializeComponent(); tcpServiceSocket = new TcpServiceSocketAsync(10, 1024); tcpServiceSocket.recvMessageEvent += new Action<string>(Recv); tcpServiceSocket.Init(); } private void Recv(string message) { this.BeginInvoke(new Action(() => { tbRecv.Text += message + "\r\n"; })); } private void btnStart_Click(object sender, EventArgs e) { tcpServiceSocket.Start(ip, port); } private void btnSend_Click(object sender, EventArgs e) { string message = tbSend.Text.Trim(); if (string.IsNullOrEmpty(message)) return; tcpServiceSocket.SendMessageToAllClients(message); tbSend.Text = ""; } }
(2)客戶端
public partial class Form1 : Form { private TcpClientSocketAsync tcpClientSocket = null; private readonly string ip = "192.168.172.142"; private readonly int port = 8090; private readonly int buffSize = 1024; public Form1() { InitializeComponent(); tcpClientSocket = new TcpClientSocketAsync(buffSize); tcpClientSocket.recvMessageEvent += new Action<string>(Recv); } private void Recv(string message) { this.BeginInvoke(new Action(() => { tbRecv.Text += message + "\r\n"; })); } private void btnStart_Click(object sender, EventArgs e) { tcpClientSocket.Start(ip, port); } private void btnSend_Click(object sender, EventArgs e) { string message = tbSend.Text.Trim(); if (string.IsNullOrEmpty(message)) return; tcpClientSocket.SendMessage(message); tbSend.Text = ""; } }
3.演示
參考: