在一般的設計中,當需要一個線程時,就創建一個,但是當線程過多時可能會影響系統的整體效率,這個性能的下降主要體現在:當線程過多時在線程間來回切換需要花費時間,而頻繁的創建和銷毀線程也需要花費額外的機器指令,同時在某些時候極少數線程可能就可以處理大量,比如http服務器可能只需要幾個線程就可以處理用戶發出的http請求,畢竟相對於用戶需要長時間來閱讀網頁來說,CPU只是找到對應位置的頁面返回即可。在這種情況下為每個用戶連接創建一個線程長時間等待再次處理用戶請求肯定是不划算的。為了解決這種問題,提出了線程池的概念,線程池中保存一定數量的 線程,當需要時,由線程池中的某一個線程來調用對應的處理函數。通過控制線程數量從而減少了CPU的線程切換,而且用完的線程還到線程池而不是銷毀,下一次再用時直接從池中取,在某種程度上減少了線程創建與銷毀的消耗,從而提高效率
在Windows上,使用線程池十分簡單,它將線程池做為一個整體,當需要使用池中的線程時,只需要定義對應的回調函數,然后調用API將回調函數進行提交,系統自帶的線程池就會自動執行對應的回調函數。從而實現任務的執行,這種方式相對於傳統的VC線程來說,程序員不再需要關注線程的創建與銷毀,以及線程的調度問題,這些統一由系統完成,只需要將精力集中到邏輯處理的回調函數中來,這樣將程序員從繁雜的線程控制中解放出來。同時Windows中線程池一般具有動態調整線程數量的自主行為,它會根據線程中執行任務的工作量來自動調整線程數,即不讓大量線程處於閑置狀態,也不會因為線程過少而有大量任務處於等待狀態。
在windows上主要有四種線程池
1. 普通線程池
2. 同步對象等待線程池
3. 定時器回調線程池
4. 完成端口回調線程池
這些線程池最大的特點是需要提供一個由線程池中線程調用的回調函數,當條件滿足時回調函數就會被線程池中的對應線程進行調用。從設計的角度來說,這樣的設計大大簡化了應用程序考慮多線程設計時的難度,此時只需要考慮回調函數中的處理邏輯和被調用的條件即可,而不必考慮線程的創建銷毀等等問題(一些設計還可以繞開繁瑣的同步處理)。需要注意的就是一般不要在這些回調函數中設計處理類似UI消息循環那樣的循環,即不要長久占用線程池中的線程。
下面來依次說明各種線程池的使用:
普通線程池
普通線程池在使用時主要是調用QueueUserWorkItem函數將回調函數加入線程池隊列,線程池中一旦有空閑的線程就會調用這個回調,函數原型如下:
BOOL WINAPI QueueUserWorkItem(
__in LPTHREAD_START_ROUTINE Function,
__in_opt PVOID Context,
__in ULONG Flags
);
第一個參數是一個回調函數地址,函數原型與線程函數原型相同,所以在設計時可以考慮使用宏開關來指定這個回調函數作為線程函數還是作為線程池的回調函數
第二個參數是傳給回調函數的參數指針
第三個參數是一個標志值,它的主要值及其含義如下:
標志 | 含義 |
---|---|
WT_EXECUTEDEFAULT | 線程池的默認標志 |
WT_EXECUTEINIOTHREAD | 以IO可警告狀態運行線程回調函數 |
WT_EXECUTEINPERSISTENTTHREAD | 該線程將一直運行而不會終止 |
WT_EXECUTELONGFUNCTION | 執行一個運行時間較長的任務(這會使系統考慮是否在線程池中創建新的線程) |
WT_TRANSFER_IMPERSONATION | 以當前的訪問字串運行線程並調用回調函數 |
下面是一個具體的例子:
void CALLBACK ThreadProc(LPVOID lpParam);
int _tmain(int argc, _TCHAR* argv[])
{
int nWaitTime;
while (TRUE)
{
printf("請輸入線程等待事件:");
scanf_s("%d", &nWaitTime);
printf("\n");
if (0 == nWaitTime)
{
break;
}
//將任務放入到隊列中進行排隊
QueueUserWorkItem((LPTHREAD_START_ROUTINE)ThreadProc, &nWaitTime, WT_EXECUTELONGFUNCTION);
}
//結束主線程
printf("主線程[%04x]\n", GetCurrentThreadId());
return 0;
}
void CALLBACK ThreadProc(LPVOID lpParam)
{
int nWaitTime = *(int*)lpParam;
printf("線程[%04x]將等待%ds\n", GetCurrentThreadId(), nWaitTime);
Sleep(nWaitTime * 1000);
printf("線程[%04x]執行完畢\n", GetCurrentThreadId());
}
這段代碼上我們加入了WT_EXECUTELONGFUNCTION標識,其實在計算機中,只要達到毫秒級的,這個時候已經達到了系統進行線程切換的時間粒度,這個時候它就是一個需要長時間執行的任務
定時器回調線程池
定時器回調主要經過下面幾步:
1. 調用CreateTimerQueue:創建定時器回調的隊列
2. 調用CreateTimerQueueTimer創建一個指定時間周期的計時器對象,並指定對應的回調函數及參數
之后當指定的時間片到達,就會將對應的回調歷程放入到隊列中,一旦線程池中有空閑的線程就執行它
另外可以調用對應的函數對其進行相關的操作:
1. 可以調用ChangeTimerQueueTimer修改一個已有的計時器對象的計時周期
2. 調用DeleteTimerQueueTimer刪除一個計時器對象
3. 調用DeleteTimerQueue刪除這樣一個線程池對象,在刪除這個線程池的時候它上面綁定的回調也會被刪除,所以在編碼時可以直接刪除線程池對象而不用調用DeleteTimerQueueTimer刪除每一個綁定的計時器對象。但是為了編碼的完整性,最好加上刪除計時器對象的操作
下面是一個使用的具體例子
VOID CALLBACK TimerCallback(PVOID lpParameter, BOOLEAN TimerOrWaitFired);
int _tmain(int argc, _TCHAR* argv[])
{
HANDLE hTimeQueue = CreateTimerQueue();
HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
HANDLE hTimer;
CreateTimerQueueTimer(&hTimer, hTimeQueue, (WAITORTIMERCALLBACK)TimerCallback, &hEvent, 10000, 0, WT_EXECUTEDEFAULT);
//等待定時器歷程被調用
WaitForSingleObject(hEvent, INFINITE);
//關閉事件對象
CloseHandle(hEvent);
//刪除定時器與定時器線程池的綁定
DeleteTimerQueueTimer(hTimeQueue, hTimer, NULL);
//刪除定時器線程池
DeleteTimerQueue(hTimeQueue);
return 0;
}
VOID CALLBACK TimerCallback(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
{
HANDLE hEvent = *(HANDLE*)lpParameter;
if (TimerOrWaitFired)
{
printf("定時器回調歷程[%04x]被執行\n", GetCurrentThreadId());
}
SetEvent(hEvent);
}
上述的代碼中我們定義了一個同步事件對象,這個事件對象將在定時器歷程中設置為有信號,這樣方便我們在主線程中等待計時器歷程執行完成
同步對象等待線程池
使用同步對象等待線程池只需要調用函數RegisterWaitForSingalObject,將一個同步對象綁定,當這個同步對象變為有信號或者等待的時間到達時,會調用對應的回調歷程。該函數原型如下:
BOOL WINAPI RegisterWaitForSingleObject(
__out PHANDLE phNewWaitObject,
__in HANDLE hObject,
__in WAITORTIMERCALLBACK Callback,
__in_opt PVOID Context,
__in ULONG dwMilliseconds,
__in ULONG dwFlags
);
第一個參數是一個輸出參數,返回一個等待對象的句柄,我們可以將其看做這個線程池的句柄
第二個參數是一個同步對象
第三個參數是對應的回調函數
第四個參數是傳入到回調函數中的參數指針
第五個參數是等待的時間
第六個參數是一個標志與函數QueueUserWorkItem中的標識含義相同
對應回調函數的原型如下:
VOID CALLBACK WaitOrTimerCallback(
__in PVOID lpParameter,
__in BOOLEAN TimerOrWaitFired
);
當同步對象變為有信號或者等待的時間到達時都會調用這個回調,它的第二個參數就表示它所等待的對象是否為有信號。
下面是一個使用的例子
void WaitEventCallBackProc(PVOID lpParameter, BOOLEAN TimerOrWaitFired);
int _tmain(int argc, _TCHAR* argv[])
{
HANDLE hWait;
HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
//注冊等待同步對象的線程池
RegisterWaitForSingleObject(&hWait, hEvent, (WAITORTIMERCALLBACK)WaitEventCallBackProc, NULL, 5000, WT_EXECUTELONGFUNCTION);
for(int i = 0; i < 5; i++)
{
SetEvent(hEvent);
Sleep(5000);
}
UnregisterWaitEx(hWait, hEvent);
CloseHandle(hEvent);
CloseHandle(hWait);
return 0;
}
void WaitEventCallBackProc(PVOID lpParameter, BOOLEAN TimerOrWaitFired)
{
if (TimerOrWaitFired)
{
printf("線程[%04x]等到事件對象\n");
}else
{
printf("線程[%04x]等待事件對象超時\n");
}
}
完成端口線程池
在前面講述文件操作的博文中,講解了在文件中完成端口的使用,其實完成端口本質上就是一個線程池,或者說,windows上自帶的線程池是使用完成端口的基礎之上編寫的。所以在這,完成端口線程池的使用將比IO完成端口來的簡單
通過調用BindIoCompletionCallback函數來將一個IO對象句柄與對應的完成歷程綁定,這樣在對應的IO操作完成后,對應的歷程將會被丟到線程池中准備執行
相比於前面的文件中的完成端口,這個完成端口線程池要簡單許多,文件的完成端口需要自己創建完成多個線程,創建完成端口,並且將線程與完成端口綁定。另外還需要在線程中調用相應的等待函數等待IO操作完成,而線程池則不需要這些操作,我只需要准備一個完成歷程,然后調用BindIoCompletionCallback,這樣一旦歷程被調用,就可以肯定IO操作一定完成了。這樣我們只需要將主要精力集中在完成歷程的編寫中
函數BindIoCompletionCallback的原型如下:
BOOL WINAPI BindIoCompletionCallback(
__in HANDLE FileHandle,
__in LPOVERLAPPED_COMPLETION_ROUTINE Function,
__in ULONG Flags
);
第一個參數是一個對應IO操作的句柄
第二個參數是對應的完成歷程函數指針
第三個參數是一個標志,與之前的標識相同
完成歷程的函數原型如下:
VOID CALLBACK FileIOCompletionRoutine(
__in DWORD dwErrorCode,
__in DWORD dwNumberOfBytesTransfered,
__in LPOVERLAPPED lpOverlapped
);
第一個參數是一個錯誤碼,當IO操作發生錯誤時可以通過這個參數獲取當前錯誤原因
第二個參數是當前IO操作操作的字節數
第三個參數是一個OVERLAPPED結構
這函數的使用與之前文件完成端口中完成歷程一樣
下面我們將之前文件完成端口的例子進行改寫,如下:
typedef struct tagIOCP_OVERLAPPED
{
OVERLAPPED Overlapped;
HANDLE hFile; //操作的文件句柄
DWORD dwDataLen; //當前操作數據的長度
LPVOID pData; //操作數據的指針
DWORD dwWrittenLen; //寫入文件中的數據長度
}IOCP_OVERLAPPED, *LPIOCP_OVERLAPPED;
#define MAX_WRITE_THREAD 20 //寫線程總數
#define EVERY_THREAD_WRITTEN 100 //每個線程寫入信息數
LARGE_INTEGER g_FilePointer; //全局的文件指針
void GetAppPath(LPTSTR lpAppPath)
{
TCHAR szExePath[MAX_PATH] = _T("");
GetModuleFileName(NULL, szExePath, MAX_PATH);
size_t nPathLen = 0;
StringCchLength(szExePath, MAX_PATH, &nPathLen);
for (int i = nPathLen; i > 0; i--)
{
if (szExePath[i] == _T('\\'))
{
szExePath[i + 1] = _T('\0');
break;
}
}
StringCchCopy(lpAppPath, MAX_PATH, szExePath);
}
VOID CALLBACK WriteThread(LPVOID lpParam);
VOID CALLBACK FileIOCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped);
int _tmain(int argc, _TCHAR* argv[])
{
TCHAR szAppPath[MAX_PATH] = _T("");
GetAppPath(szAppPath);
StringCchCat(szAppPath, MAX_PATH, _T("IocpLog.txt"));
HANDLE hFile = CreateFile(szAppPath, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_FLAG_OVERLAPPED | FILE_ATTRIBUTE_NORMAL, NULL);
if (hFile == INVALID_HANDLE_VALUE)
{
return 0;
}
//綁定IO完成端口
BindIoCompletionCallback(hFile, (LPOVERLAPPED_COMPLETION_ROUTINE)FileIOCompletionRoutine, 0);
//往日志文件中寫入Unicode前綴
LPIOCP_OVERLAPPED pIocpOverlapped = (LPIOCP_OVERLAPPED)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(IOCP_OVERLAPPED));
pIocpOverlapped->dwDataLen = sizeof(WORD);
pIocpOverlapped->hFile = hFile;
WORD dwUnicode = MAKEWORD(0xff, 0xfe); //構造Unicode前綴
pIocpOverlapped->pData = HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(WORD));
CopyMemory(pIocpOverlapped->pData, &dwUnicode, sizeof(WORD));
//偏移文件指針
pIocpOverlapped->Overlapped.Offset = g_FilePointer.LowPart;
pIocpOverlapped->Overlapped.OffsetHigh = g_FilePointer.HighPart;
g_FilePointer.QuadPart += pIocpOverlapped->dwDataLen;
//寫文件
WriteFile(hFile, pIocpOverlapped->pData, pIocpOverlapped->dwDataLen, &pIocpOverlapped->dwWrittenLen, &pIocpOverlapped->Overlapped);
//創建線程進行寫日志操作
HANDLE hWrittenThreads[MAX_WRITE_THREAD];
for (int i = 0; i < MAX_WRITE_THREAD; i++)
{
hWrittenThreads[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)WriteThread, &hFile, 0, NULL);
}
//等待所有寫線程執行完成
WaitForMultipleObjects(MAX_WRITE_THREAD, hWrittenThreads, TRUE, INFINITE);
for (int i = 0; i < MAX_WRITE_THREAD; i++)
{
CloseHandle(hWrittenThreads[i]);
}
CloseHandle(hFile);
return 0;
}
VOID CALLBACK FileIOCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
{
LPIOCP_OVERLAPPED pIOCPOverlapped = (LPIOCP_OVERLAPPED)lpOverlapped;
//釋放對應的內存空間
printf("線程[%04x]得到IO完成通知,寫入長度%d\n", GetCurrentThreadId(), pIOCPOverlapped->dwDataLen);
if (pIOCPOverlapped->pData != NULL)
{
HeapFree(GetProcessHeap(), 0, pIOCPOverlapped->pData);
}
if (NULL != pIOCPOverlapped)
{
HeapFree(GetProcessHeap(), 0, pIOCPOverlapped);
pIOCPOverlapped = NULL;
}
}
VOID CALLBACK WriteThread(LPVOID lpParam)
{
TCHAR szBuf[255] = _T("線程[%04x]模擬寫入一條日志記錄\r\n");
TCHAR szWrittenBuf[255] = _T("");
StringCchPrintf(szWrittenBuf, 255, szBuf, GetCurrentThreadId());
for (int i = 0; i < EVERY_THREAD_WRITTEN; i++)
{
LPIOCP_OVERLAPPED lpIocpOverlapped = (LPIOCP_OVERLAPPED)HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, sizeof(IOCP_OVERLAPPED));
size_t dwBufLen = 0;
StringCchLength(szWrittenBuf, 255, &dwBufLen);
lpIocpOverlapped->dwDataLen = dwBufLen * sizeof(TCHAR);
lpIocpOverlapped->pData = HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, (dwBufLen + 1) * sizeof(TCHAR));
CopyMemory(lpIocpOverlapped->pData, szWrittenBuf, dwBufLen * sizeof(TCHAR));
lpIocpOverlapped->hFile = *(HANDLE*)lpParam;
//同步文件指針
*((LONGLONG*)&(lpIocpOverlapped->Overlapped.Pointer)) = InterlockedCompareExchange64(&g_FilePointer.QuadPart, g_FilePointer.QuadPart + lpIocpOverlapped->dwDataLen, g_FilePointer.QuadPart);
//寫文件
WriteFile(lpIocpOverlapped->hFile, lpIocpOverlapped->pData, lpIocpOverlapped->dwDataLen, &lpIocpOverlapped->dwWrittenLen, &lpIocpOverlapped->Overlapped);
}
}