1.類
(1)socket IO操作內存管理類 BufferManager
// This class creates a single large buffer which can be divided up
// and assigned to SocketAsyncEventArgs objects for use with each
// socket I/O operation.
// This enables bufffers to be easily reused and guards against
// fragmenting heap memory.
//
// The operations exposed on the BufferManager class are not thread safe.
public class BufferManager
{
//buffer緩沖區大小
private int m_numBytes;
//緩沖區
private byte[] m_buffer;
private Stack<int> m_freeIndexPool;
private int m_currentIndex;
private int m_bufferSize;
public BufferManager(int totalBytes, int bufferSize)
{
m_numBytes = totalBytes;
m_currentIndex = 0;
m_bufferSize = bufferSize;
m_freeIndexPool = new Stack<int>();
}
/// <summary>
/// 給buffer分配緩沖區
/// </summary>
public void InitBuffer()
{
m_buffer = new byte[m_numBytes];
}
/// <summary>
/// 將buffer添加到args的IO緩沖區中,並設置offset
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
public bool SetBuffer(SocketAsyncEventArgs args)
{
if (m_freeIndexPool.Count > 0)
{
args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
}
else
{
if ((m_numBytes - m_bufferSize) < m_currentIndex)
{
return false;
}
args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
m_currentIndex += m_bufferSize;
}
return true;
}
/// <summary>
/// 將buffer從args的IO緩沖區中釋放
/// </summary>
/// <param name="args"></param>
public void FreeBuffer(SocketAsyncEventArgs args)
{
m_freeIndexPool.Push(args.Offset);
args.SetBuffer(null, 0, 0);
}
/// <summary>
/// 釋放全部buffer緩存
/// </summary>
public void FreeAllBuffer()
{
m_freeIndexPool.Clear();
m_currentIndex = 0;
m_buffer = null;
}
}
(2)SocketAsyncEventArgsPool
// Represents a collection of reusable SocketAsyncEventArgs objects.
public class SocketAsyncEventArgsPool
{
private Stack<SocketAsyncEventArgs> m_pool;
// Initializes the object pool to the specified size
//
// The "capacity" parameter is the maximum number of
// SocketAsyncEventArgs objects the pool can hold
public SocketAsyncEventArgsPool(int capacity)
{
m_pool = new Stack<SocketAsyncEventArgs>(capacity);
}
// Add a SocketAsyncEventArg instance to the pool
//
//The "item" parameter is the SocketAsyncEventArgs instance
// to add to the pool
public void Push(SocketAsyncEventArgs item)
{
if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
lock (m_pool)
{
m_pool.Push(item);
}
}
// Removes a SocketAsyncEventArgs instance from the pool
// and returns the object removed from the pool
public SocketAsyncEventArgs Pop()
{
lock (m_pool)
{
return m_pool.Pop();
}
}
/// <summary>
/// 清空棧中元素
/// </summary>
public void Clear()
{
lock (m_pool)
{
m_pool.Clear();
}
}
// The number of SocketAsyncEventArgs instances in the pool
public int Count
{
get { return m_pool.Count; }
}
}
(3)AsyncUserToken
public class AsyncUserToken
{
private Socket socket = null;
public Socket Socket { get => socket; set => socket = value; }
}
(4)服務器端操作類TcpServiceSocketAsync
public class TcpServiceSocketAsync
{
//接收數據事件
public Action<string> recvMessageEvent = null;
//發送結果事件
public Action<int> sendResultEvent = null;
//監聽socket
private Socket listenSocket = null;
//允許連接到tcp服務器的tcp客戶端數量
private int numConnections = 0;
//用於socket發送和接受的緩存區大小
private int bufferSize = 0;
//socket緩沖區管理對象
private BufferManager bufferManager = null;
//SocketAsyncEventArgs池
private SocketAsyncEventArgsPool socketAsyncEventArgsPool = null;
//當前連接的tcp客戶端數量
private int numberAcceptedClients = 0;
//控制tcp客戶端連接數量的信號量
private Semaphore maxNumberAcceptedClients = null;
//用於socket發送數據的SocketAsyncEventArgs集合
private List<SocketAsyncEventArgs> sendAsyncEventArgs = null;
//tcp服務器ip
private string ip = "";
//tcp服務器端口
private int port = 0;
/// <summary>
/// 構造函數
/// </summary>
/// <param name="numConnections">允許連接到tcp服務器的tcp客戶端數量</param>
/// <param name="bufferSize">用於socket發送和接受的緩存區大小</param>
public TcpServiceSocketAsync(int numConnections, int bufferSize)
{
if (numConnections <= 0 || numConnections > int.MaxValue)
throw new ArgumentOutOfRangeException("_numConnections is out of range");
if (bufferSize <= 0 || bufferSize > int.MaxValue)
throw new ArgumentOutOfRangeException("_receiveBufferSize is out of range");
this.numConnections = numConnections;
this.bufferSize = bufferSize;
bufferManager = new BufferManager(numConnections * bufferSize * 2, bufferSize);
socketAsyncEventArgsPool = new SocketAsyncEventArgsPool(numConnections);
maxNumberAcceptedClients = new Semaphore(numConnections, numConnections);
sendAsyncEventArgs = new List<SocketAsyncEventArgs>();
}
/// <summary>
/// 初始化數據(bufferManager,socketAsyncEventArgsPool)
/// </summary>
public void Init()
{
numberAcceptedClients = 0;
bufferManager.InitBuffer();
SocketAsyncEventArgs readWriteEventArg;
for (int i = 0; i < numConnections * 2; i++)
{
readWriteEventArg = new SocketAsyncEventArgs();
readWriteEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
readWriteEventArg.UserToken = new AsyncUserToken();
bufferManager.SetBuffer(readWriteEventArg);
socketAsyncEventArgsPool.Push(readWriteEventArg);
}
}
/// <summary>
/// 開啟tcp服務器,等待tcp客戶端連接
/// </summary>
/// <param name="ip"></param>
/// <param name="port"></param>
public void Start(string ip, int port)
{
if (string.IsNullOrEmpty(ip))
throw new ArgumentNullException("ip cannot be null");
if (port < 1 || port > 65535)
throw new ArgumentOutOfRangeException("port is out of range");
this.ip = ip;
this.port = port;
try
{
listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress address = IPAddress.Parse(ip);
IPEndPoint endpoint = new IPEndPoint(address, port);
listenSocket.Bind(endpoint);//綁定地址
listenSocket.Listen(int.MaxValue);//開始監聽
StartAccept(null);
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 關閉tcp服務器
/// </summary>
public void CloseSocket()
{
if (listenSocket == null)
return;
try
{
foreach (var e in sendAsyncEventArgs)
{
((AsyncUserToken)e.UserToken).Socket.Shutdown(SocketShutdown.Both);
}
listenSocket.Shutdown(SocketShutdown.Both);
}
catch { }
try
{
foreach (var e in sendAsyncEventArgs)
{
((AsyncUserToken)e.UserToken).Socket.Close();
}
listenSocket.Close();
}
catch { }
try
{
foreach (var e in sendAsyncEventArgs)
{
e.Dispose();
}
}
catch { }
sendAsyncEventArgs.Clear();
socketAsyncEventArgsPool.Clear();
bufferManager.FreeAllBuffer();
maxNumberAcceptedClients.Release(numberAcceptedClients);
}
/// <summary>
/// 重新啟動tcp服務器
/// </summary>
public void Restart()
{
CloseSocket();
Init();
Start(ip, port);
}
/// <summary>
/// 開始等待tcp客戶端連接
/// </summary>
/// <param name="acceptEventArg"></param>
private void StartAccept(SocketAsyncEventArgs acceptEventArg)
{
if (acceptEventArg == null)
{
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
// socket must be cleared since the context object is being reused
acceptEventArg.AcceptSocket = null;
}
maxNumberAcceptedClients.WaitOne();
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArg);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArg);
}
}
/// <summary>
/// Socket.AcceptAsync完成回調函數
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
/// <summary>
/// 接受到tcp客戶端連接,進行處理
/// </summary>
/// <param name="e"></param>
private void ProcessAccept(SocketAsyncEventArgs e)
{
Interlocked.Increment(ref numberAcceptedClients);
//設置用於接收的SocketAsyncEventArgs的socket,可以接受數據
SocketAsyncEventArgs recvEventArgs = socketAsyncEventArgsPool.Pop();
((AsyncUserToken)recvEventArgs.UserToken).Socket = e.AcceptSocket;
//設置用於發送的SocketAsyncEventArgs的socket,可以發送數據
SocketAsyncEventArgs sendEventArgs = socketAsyncEventArgsPool.Pop();
((AsyncUserToken)sendEventArgs.UserToken).Socket = e.AcceptSocket;
sendAsyncEventArgs.Add(sendEventArgs);
StartRecv(recvEventArgs);
StartAccept(e);
}
/// <summary>
/// tcp服務器開始接受tcp客戶端發送的數據
/// </summary>
private void StartRecv(SocketAsyncEventArgs e)
{
bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.ReceiveAsync(e);
if (!willRaiseEvent)
{
ProcessReceive(e);
}
}
/// <summary>
/// socket.sendAsync和socket.recvAsync的完成回調函數
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void IO_Completed(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
/// <summary>
/// 處理接受到的tcp客戶端數據
/// </summary>
/// <param name="e"></param>
private void ProcessReceive(SocketAsyncEventArgs e)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
if (recvMessageEvent != null)
//一定要指定GetString的長度
recvMessageEvent(Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred));
StartRecv(e);
}
else
{
CloseClientSocket(e);
}
}
/// <summary>
/// 處理tcp服務器發送的結果
/// </summary>
/// <param name="e"></param>
private void ProcessSend(SocketAsyncEventArgs e)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.SocketError == SocketError.Success)
{
if (sendResultEvent != null)
sendResultEvent(e.BytesTransferred);
}
else
{
if (sendResultEvent != null)
sendResultEvent(e.BytesTransferred);
CloseClientSocket(e);
}
}
/// <summary>
/// 關閉一個與tcp客戶端連接的socket
/// </summary>
/// <param name="e"></param>
private void CloseClientSocket(SocketAsyncEventArgs e)
{
AsyncUserToken token = e.UserToken as AsyncUserToken;
try
{
//關閉socket時,單獨使用socket.close()通常會造成資源提前被釋放,應該在關閉socket之前,先使用shutdown進行接受或者發送的禁用,再使用socket進行釋放
token.Socket.Shutdown(SocketShutdown.Both);
}
catch { }
token.Socket.Close();
Interlocked.Decrement(ref numberAcceptedClients);
socketAsyncEventArgsPool.Push(e);
maxNumberAcceptedClients.Release();
if (e.LastOperation == SocketAsyncOperation.Send)
sendAsyncEventArgs.Remove(e);
}
/// <summary>
/// 給全部tcp客戶端發送數據
/// </summary>
/// <param name="message"></param>
public void SendMessageToAllClients(string message)
{
if (string.IsNullOrEmpty(message))
throw new ArgumentNullException("message cannot be null");
foreach (var e in sendAsyncEventArgs)
{
byte[] buff = Encoding.UTF8.GetBytes(message);
if (buff.Length > bufferSize)
throw new ArgumentOutOfRangeException("message is out off range");
buff.CopyTo(e.Buffer, e.Offset);
e.SetBuffer(e.Offset, buff.Length);
bool willRaiseEvent = ((AsyncUserToken)e.UserToken).Socket.SendAsync(e);
if (!willRaiseEvent)
{
ProcessSend(e);
}
}
}
}
(5)客戶端操作類TcpClientSocketAsync
public class TcpClientSocketAsync
{
//接收數據事件
public Action<string> recvMessageEvent = null;
//發送結果事件
public Action<int> sendResultEvent = null;
//接受緩存數組
private byte[] recvBuff = null;
//發送緩存數組
private byte[] sendBuff = null;
//連接socket
private Socket connectSocket = null;
//用於發送數據的SocketAsyncEventArgs
private SocketAsyncEventArgs sendEventArg = null;
//用於接收數據的SocketAsyncEventArgs
private SocketAsyncEventArgs recvEventArg = null;
//tcp服務器ip
private string ip = "";
//tcp服務器端口
private int port = 0;
/// <summary>
/// 構造函數
/// </summary>
/// <param name="bufferSize">用於socket發送和接受的緩存區大小</param>
public TcpClientSocketAsync(int bufferSize)
{
//設置用於發送數據的SocketAsyncEventArgs
sendBuff = new byte[bufferSize];
sendEventArg = new SocketAsyncEventArgs();
sendEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
sendEventArg.SetBuffer(sendBuff, 0, bufferSize);
//設置用於接受數據的SocketAsyncEventArgs
recvBuff = new byte[bufferSize];
recvEventArg = new SocketAsyncEventArgs();
recvEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
recvEventArg.SetBuffer(recvBuff, 0, bufferSize);
}
/// <summary>
/// 開啟tcp客戶端,連接tcp服務器
/// </summary>
/// <param name="ip"></param>
/// <param name="port"></param>
public void Start(string ip, int port)
{
if (string.IsNullOrEmpty(ip))
throw new ArgumentNullException("ip cannot be null");
if (port < 1 || port > 65535)
throw new ArgumentOutOfRangeException("port is out of range");
this.ip = ip;
this.port = port;
try
{
connectSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
IPAddress address = IPAddress.Parse(ip);
IPEndPoint endpoint = new IPEndPoint(address, port);
//連接tcp服務器
SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();
connectEventArg.Completed += ConnectEventArgs_Completed;
connectEventArg.RemoteEndPoint = endpoint;//設置要連接的tcp服務器地址
bool willRaiseEvent = connectSocket.ConnectAsync(connectEventArg);
if (!willRaiseEvent)
ProcessConnect(connectEventArg);
}
catch (Exception ex)
{
throw ex;
}
}
/// <summary>
/// 發送數據到tcp服務器
/// </summary>
/// <param name="message">要發送的數據</param>
public void SendMessage(string message)
{
if (string.IsNullOrEmpty(message))
throw new ArgumentNullException("message cannot be null");
if (connectSocket == null)
throw new Exception("socket cannot be null");
byte[] buff = Encoding.UTF8.GetBytes(message);
buff.CopyTo(sendBuff, 0);
sendEventArg.SetBuffer(0, buff.Length);
bool willRaiseEvent = connectSocket.SendAsync(sendEventArg);
if (!willRaiseEvent)
{
ProcessSend(sendEventArg);
}
}
/// <summary>
/// 關閉tcp客戶端
/// </summary>
public void CloseSocket()
{
if (connectSocket == null)
return;
try
{
//關閉socket時,單獨使用socket.close()通常會造成資源提前被釋放,應該在關閉socket之前,先使用shutdown進行接受或者發送的禁用,再使用socket進行釋放
connectSocket.Shutdown(SocketShutdown.Both);
}
catch { }
try
{
connectSocket.Close();
}
catch { }
}
/// <summary>
/// 重啟tcp客戶端,重新連接tcp服務器
/// </summary>
public void Restart()
{
CloseSocket();
Start(ip, port);
}
/// <summary>
/// Socket.ConnectAsync完成回調函數
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void ConnectEventArgs_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessConnect(e);
}
private void ProcessConnect(SocketAsyncEventArgs e)
{
StartRecv();
}
/// <summary>
/// tcp客戶端開始接受tcp服務器發送的數據
/// </summary>
private void StartRecv()
{
bool willRaiseEvent = connectSocket.ReceiveAsync(recvEventArg);
if (!willRaiseEvent)
{
ProcessReceive(recvEventArg);
}
}
/// <summary>
/// socket.sendAsync和socket.recvAsync的完成回調函數
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void IO_Completed(object sender, SocketAsyncEventArgs e)
{
switch (e.LastOperation)
{
case SocketAsyncOperation.Receive:
ProcessReceive(e);
break;
case SocketAsyncOperation.Send:
ProcessSend(e);
break;
default:
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
/// <summary>
/// 處理接受到的tcp服務器數據
/// </summary>
/// <param name="e"></param>
private void ProcessReceive(SocketAsyncEventArgs e)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
{
if (recvMessageEvent != null)
//一定要指定GetString的長度,否則整個bugger都要轉成string
recvMessageEvent(Encoding.UTF8.GetString(e.Buffer, 0, e.BytesTransferred));
StartRecv();
}
else
{
Restart();
}
}
/// <summary>
/// 處理tcp客戶端發送的結果
/// </summary>
/// <param name="e"></param>
private void ProcessSend(SocketAsyncEventArgs e)
{
AsyncUserToken token = (AsyncUserToken)e.UserToken;
if (e.SocketError == SocketError.Success)
{
if (sendResultEvent != null)
sendResultEvent(e.BytesTransferred);
}
else
{
if (sendResultEvent != null)
sendResultEvent(-1);
Restart();
}
}
}
2.使用
(1)服務器端
public partial class Form1 : Form
{
TcpServiceSocketAsync tcpServiceSocket = null;
private readonly string ip = "192.168.172.142";
private readonly int port = 8090;
public Form1()
{
InitializeComponent();
tcpServiceSocket = new TcpServiceSocketAsync(10, 1024);
tcpServiceSocket.recvMessageEvent += new Action<string>(Recv);
tcpServiceSocket.Init();
}
private void Recv(string message)
{
this.BeginInvoke(new Action(() =>
{
tbRecv.Text += message + "\r\n";
}));
}
private void btnStart_Click(object sender, EventArgs e)
{
tcpServiceSocket.Start(ip, port);
}
private void btnSend_Click(object sender, EventArgs e)
{
string message = tbSend.Text.Trim();
if (string.IsNullOrEmpty(message))
return;
tcpServiceSocket.SendMessageToAllClients(message);
tbSend.Text = "";
}
}
(2)客戶端
public partial class Form1 : Form
{
private TcpClientSocketAsync tcpClientSocket = null;
private readonly string ip = "192.168.172.142";
private readonly int port = 8090;
private readonly int buffSize = 1024;
public Form1()
{
InitializeComponent();
tcpClientSocket = new TcpClientSocketAsync(buffSize);
tcpClientSocket.recvMessageEvent += new Action<string>(Recv);
}
private void Recv(string message)
{
this.BeginInvoke(new Action(() =>
{
tbRecv.Text += message + "\r\n";
}));
}
private void btnStart_Click(object sender, EventArgs e)
{
tcpClientSocket.Start(ip, port);
}
private void btnSend_Click(object sender, EventArgs e)
{
string message = tbSend.Text.Trim();
if (string.IsNullOrEmpty(message))
return;
tcpClientSocket.SendMessage(message);
tbSend.Text = "";
}
}
3.演示

參考:
