第11章 Windows線程池
11.1 傳統的Windows線程池及API
(1)線程池中的幾種底層線程
①可變數量的長任務線程:WT_EXECUTELONGFUNCTION
②Timer線程:調用CreateTimerQueueTimer時,將在Timer線程上創建以APC方式通知的可等待計時器對象,並讓該線程在可警告狀態下等待定時器的APC。由於這個線程一旦創建就貫穿進程生命期而不會被銷毀,因此WT_EXECUTEINPERSISTENTHREAD標志的線程池回調函數也由這種線程執行。
③多個Wait線程:服務於RegisterWaitForSingleObject,每個線程用WaitForMultipleObjects等待最多63(MAXIMUM_WAIT_OBJECTS減去一個用於維護對象數組的工作對象)個內核對象,對象觸發后執行回調函數。
④可變數量的IO線程:因線程在發出異步IO請求(如ReadFileEx)后,一旦線程結束,請求就會被撤消,因此在請求完成之前,發出請求的線程一定要存在。但線程池的被設計為會根據CPU繁忙情況動態地創建和刪除線程。因此線程池中有一部分線程比較特殊,他們會檢測自己在執行回調函數里發出的異步IO請求是否完成。如果沒有,就不會結束,這種會追蹤自己發出的異步IO請求的線程被稱為IO線程。
⑤可變數量的非IO線程:線程池內部實現了一個IO完成端口,服務於BindIoCompletionCallback,其中IOCP服務線程(即在GetQueueCompletionStatus上休眠)由於其數量會根據CPU情況動態調整,所以不應在這種線程上執行異步IO,故稱為非IO線程。
(2) 傳統的線程池對象及對應的API
線程池對象 |
API |
普通任務線程池 |
QueueUserWorkItem |
計時器線程池 |
CreateTimerQueue(創建線程池) |
CreateTimerQueueTimer(創建計時器) |
|
ChangeTimerQueueTimer |
|
DeleteTimerQueueTimer |
|
DeteTimerQueueEx |
|
同步對象等待線程池 |
RegisterWaitForSingleObject |
UnregisterWaitEx |
|
完成端口線程池 |
BindIoCompletionCallback |
11.1.2 普通任務線程池
(1)QueueUserWorkItem函數
參數 |
描述 |
LPTHREAD_START_ROUTINE pfnCallback |
工作項,即要排隊到線程池中的回調函數(類似於線程函數),原型聲明為 DWORD WINAPI ThreadProc(LPVOID lpParameter); |
PVOID Context |
要傳給線程函數的額外數據 |
ULONG Flags |
用於指明線程池中的線程在什么條件下調用這個回調函數 ①WT_EXECUTEDEFAULT:普通線程不可警告狀態下運行。 ②WT_EXECUTEINIOTHREAD:以IO可警告狀態運行線程回調函數。 ③WT_EXECUTEINPERSISTENTTHREAD:該線程一直運行而不會終止。 ④WT_EXECUTELONGFUNCTION:執行一個運行時間較長的任務(這會使系統考慮是否在線程池中創建新的線程)。 ⑤WT_TRANSFER_IMPERSONATION:以當前的訪問令牌運行線程並回調函數 |
備注:CreateThread函數與QueueUserWorkItem函數要求的線程函數的原型一致。因此可以方便的將一個線程函數創建為線程或線程池的線程池回調函數。 |
【QueueUserWorkItem示例程序】
#include <windows.h> #include <locale.h> #include <tchar.h> #include <strsafe.h> #define BEGINTHREAD(Fun,Param) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL); DWORD WINAPI ThreadProc(LPVOID lpParameter); int _tmain() { _tsetlocale(LC_ALL, _T("chs")); int iWaitLen = 0; do{ _tprintf(_T("請輸入一個等待的時間常量,單位秒(輸入0退出):")); _tscanf_s(_T("%i"), &iWaitLen); if (iWaitLen>0){ //下面的代碼演示了,使用CreateThread和QueueUserWorkItem,實際效果 //是一樣的,當然線程不多的情況下如此,如果線程很多時一定要使用QueueUserWorkItem QueueUserWorkItem(ThreadProc, (PVOID)iWaitLen, WT_EXECUTELONGFUNCTION); //顯示使用CreateThread來創建多個線程的效果 //BEGINTHREAD(ThreadProc, (LPVOID)iWaitLen); } } while (iWaitLen); return 0; } //該函數可以由CreateThread的線程啟動,也可以使用QueueUserWorkItem線程池中的線程啟動 DWORD WINAPI ThreadProc(LPVOID lpParameter){ int iWaitLen = (int)lpParameter; _tprintf(_T("\n線程[ID:0x%X]將等待%u秒..."), GetCurrentThreadId(), iWaitLen); Sleep(iWaitLen * 1000); _tprintf(_T("\n線程[ID:0x%X]將等待結束!\n"), GetCurrentThreadId(), iWaitLen); return 0; }
11.1.3 同步對象等待線程池——當對象被觸發時調用函數
(1)RegisterWaitForSingleObject函數
參數 |
描述 |
phNewWaitObject |
返回的線程池對象(同步對象等待線程池),該對象句柄不能用CloseHandle來關閉。 |
hObject |
要等待觸發的內核對象 |
pfnCallback |
回調函數,其原型如下: VOID CALLBACK WaitOrTimerCallback( PVOID lpParameter, // thread data BOOLEAN TimerOrWaitFired // reason ); |
pvContext |
傳給回調函數的額外參數 |
dwMilliseconds |
等待的時間 |
dwFlags |
與QueueUserWorkItem函數的dwFlags意義相同 |
備注:CreateThread函數與QueueUserWorkItem函數要求的線程函數的原型一致。因此可以方便的將一個線程函數創建為線程或線程池的線程池回調函數。 |
(2)撤銷等待:UnregisterWait(hNewWaitObject),傳入RegisterWaitForSingleObject時返回的第1個句柄。注意不能用CloseHandle(hNewWaitObject)來關閉
【WaitableCallback程序】
線程池調用同一線程來執行回調函數 線程池調用不同的線程來執行回調函數
#include <windows.h> #include <tchar.h> #include <strsafe.h> #include <locale.h> //bWaitFired:TRUE表示超時,FALSE表示事件對象被觸發 void CALLBACK WaitCallback(PVOID lpParameter, BOOLEAN bWaitFired) { if (!bWaitFired){ _tprintf(_T("[ID:0x%X] WaitCallback Success\n"), GetCurrentThreadId()); } else{ _tprintf(_T("[ID:0x%X] WaitCallback Failed\n"), GetCurrentThreadId()); } } int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); _tprintf(_T("主線程[ID:0x%X] Runing\n"), GetCurrentThreadId()); //創建一個事件對象 HANDLE hEvent = NULL; hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //自動,初始未觸發狀態 if (NULL == hEvent) return 0; //模擬等待五次 HANDLE hNewWait = NULL; //用來保存線程池對象 //WT_EXECUTEONLYONCE:表示回調函數只被執行一次。適用於進程/線程句柄這種觸發后不再重置的對象 RegisterWaitForSingleObject(&hNewWait, hEvent, WaitCallback, NULL, INFINITE, WT_EXECUTEDEFAULT); for (int i = 0; i < 5;i++){ SetEvent(hEvent); //觸發5次,讓回調函數被執行5次(但並不關心是那個線程執行了該函數) Sleep(500); //改變這個時間,可以看到線程池會調用不同線程來執行回調函數 } UnregisterWaitEx(hNewWait, INVALID_HANDLE_VALUE); CloseHandle(hEvent); _tsystem(_T("PAUSE")); return 0; }
11.1.4 定時器回調線程池
(1)調定定時器線程池對象的一般步驟
①調用CreateTimerQueue來創建一個定時器線程池的對象
②調用CreateTimerQueueTimer創建一個定時器,並指定計時器的回調函數及參數
③調用ChangeTimerQueueTimer可修改一個已有的定時器的計時周期
④調用DeleteTimerQueueTimer刪除一個定時器對象(該計時器的回調函數也會被停止調用)
⑤調用DeleteTimerQueueEx刪除定時器線程池對象。
(2)CreateTimerQueueTimer函數
參數 |
描述 |
phNewTimer |
用來接收創建好的計時器對象句柄的指針 |
hTimerQueue |
計時器線程池對象。為NULL時,使用默認的計時器線程池,此時可以不需調用CreateTimerQueue來創建線程池對象。 |
pfnCallback |
新計時器對象的回調函數。函數原型如下: Void WINAPI WaitOrTimerCallback(PVOID pvContext,BOOL fTimerOrWaitFired);其中的fTimerOrWaitFired為TRUE時,表示調用回調函數時,計時器己經觸發。 |
pvContext |
傳給回調函數的額外參數 |
dwDueTime |
預計從調用該函數開始后,多少毫秒后第一次調用回調函數。如果為0,只有可能,就會調用回調函數。 |
dwPeriod |
調用回調函數的周期(毫米數)。如果為0,表示一個單步計時器,即回調函數只被調用一次。 |
dwFlags |
用於指明線程池中的線程在什么條件下調用這個回調函數。該參數的意義與QueueUserWorkItem函數相應的參數相同 |
備注:如果dwDueTime和dwPeriod均不為0,計時器在dwDueTime后會第1次被觸發,以后每經過dwPeriod時間后,周期性地觸發。每次觸發時都會調用回調函數,哪怕前一個回調函數還沒執行完(會啟動另一個線程來調用該回調函數)。 |
(3)修改計時器對象周期:ChangeTimeQueueTimer(hTimerQueue,hTimer,dwDueTime,dwPeriod);
(4)DeleteTimerQueueTimer:刪除一個計時器對象
參數 |
描述 |
phNewTimer |
用來接收創建好的計時器對象句柄的指針 |
hTimerQueue |
指定要刪除的計時器對象位於哪個線程隊列(線程池對象)中 |
hTimer |
要刪除的計時器對象 |
hCompletionEvent |
當系統取消計時器且隊列中的所有回調函數都執行完畢時,會觸發該事件對象。如果指定為INVALID_HANDLE_VALUE:將會一直等待計時器其回調函數,DeleteTimerQueueTimer函數才返回。如果為NULL,函數會給該計時器對象做個刪除標志並立即返回。但是以后並不會接收到回調函數執行完的任何通知,一般為指定為NULL。如果指定為一個有效的事件內核對象的句柄,將函數會立即返回,當所有排隊的工作項目完成之后,該內核對象被觸發。 |
備注: 從當回調函數內部刪除計時器對象可能會造成死鎖。 |
(5)DeleteTimerQueueEx:刪除計時器隊列及其中的所有計時器對象
DeleteTimerQueueEx(hTimerQueue,hCompletionEvent);
【TimerQueue程序】
#include <windows.h> #include <tchar.h> #include <strsafe.h> #include <locale.h> HANDLE gDoneEvent = NULL; //定時器回調函數 //參數TimerOrWaitFired:TRUE表示當函數被調用時,定時器己經被觸發 void CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired); int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); HANDLE hTimer = NULL; HANDLE hTimerQueue = NULL; int arg = 123; //使用事件對象來追蹤TimerRoutine的運行 gDoneEvent = CreateEvent(NULL, TRUE, FALSE, NULL); //手動重置 if (NULL == gDoneEvent){ _tprintf(_T("創建事件對象失敗!(%d)\n"), GetLastError()); return 1; } //創建定時器線程池對象 hTimerQueue = CreateTimerQueue(); if (NULL == hTimerQueue){ _tprintf(_T("定時器線程池對象失敗!(%d)\n"), GetLastError()); return 2; } //創建定時器對象 if (!CreateTimerQueueTimer(&hTimer, hTimerQueue, (WAITORTIMERCALLBACK)TimerRoutine, &arg, 10000, 0, 0)){ _tprintf(_T("定時器對象失敗!(%d)\n"), GetLastError()); return 3; } _tprintf(_T("將在10秒后調用TimerRoutine函數...\n")); if (WAIT_OBJECT_0 != WaitForSingleObject(gDoneEvent, INFINITE)){ _tprintf(_T("WaitForSingleObject失敗(%d)\n"), GetLastError()); } CloseHandle(gDoneEvent); //因gDoneEvent被觸發時,說明回調函數己執行完,但計時器對象還沒被完全刪除, //所以指定參數為INVALID_HANDLE_VALUE會等到所有對象都被刪除時,Delete*函數才會返回。 //注意DleteTimerQueueEx雖然是刪除線池隊列的,但該函數被調用時,會將線程池中的所有 //計時器對象一起刪除,因此此處可以不必調用DeleteTimerQueueTimer函數。 if (!DeleteTimerQueueEx(hTimerQueue,INVALID_HANDLE_VALUE)){ _tprintf(_T("DeleteTimerQueue失敗(%d)\n"), GetLastError()); } _tsystem(_T("PAUSE")); return 0; } void CALLBACK TimerRoutine(PVOID lpParam, BOOLEAN TimerOrWaitFired){ if (lpParam == NULL){ _tprintf(_T("TimerRoutine lpParam為空!\n")); } else{ _tprintf(_T("TimerRoutine函數被調用。參數為%d.\n"), *(int*)lpParam); } SetEvent(gDoneEvent); }
11.1.4 完成端口回調線程池
(1)BindIoCompletionCallback函數
參數 |
描述 |
hDevice |
設備句柄(如文件、Socket等) |
pfnCallback |
回調函數(完成例程)函數原型如下: VOID WINAPI OverlappedCompletionRoutine( DWORD dwErrorCode, DWORD dwNumberOfBytesTransferred, POVERLAPPED pOverlapped); |
dwFlags |
保留字段,這個標志必須為0. |
備注: ①該函數內部是使用非IO線程來調用回調函數的,所以在回調函數內部不能再發出其他的異步IO請求。(注意,該函數內部使用非IO線程來處理向設備發出異步IO請求的。) ②該函數並沒有OVERLAPPED結構體的參數,但OVERLAPPED結構體會被傳遞給ReadFile和WriteFile之類的函數,操作系統會跟蹤這個結構體,並在請求完成時,傳入完成端口,並最終傳到我們的回調函數中來。 ③如果有其他信息要傳入回調函數中,可以在自定封裝OVERLAPPED結構體,設計出一個“單IO數據”結構體以便傳遞額外的參數。 ④CloseHandle(hDevice)來關閉設備,會導致所有待處理IO請求立即完成,並產生一個錯誤碼,可以在回調函數中處理這種情況。因此在關閉設備時,如果要避免這種錯誤,可以引入引用計數,當每發出一個IO請求時,計數加1,完成一個IO請求時,計數減1,當計數為0時,才能關閉。 |
(2)完成端口回調線程池的優點
①不用自己再去創建和管理完成端口對象。
②不用創建線程和管理線程。
③不用調用GetQueuedCompletionStatus(Ex)方法去等待IO完成操作。
④開發者只需要集中設計好每個IO完成后的回調函數即可。
【IOCPPool程序】利用完成端口回調線程池模擬文件的寫入操作
#define _WIN32_WINNT 0x0600 //用於 Windows Vista #include <windows.h> #include <tchar.h> #include <strsafe.h> #include <locale.h> #define GRS_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz) #define GRS_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz) #define GRS_SAFEFREE(p) if(NULL !=p){HeapFree(GetProcessHeap(),0,p);p=NULL;} #define GRS_ASSERT(s) if (!(s)){DebugBreak();} #define OP_READ 0x1 //讀取操 #define OP_WRITE 0x2 //寫入操作 #define GRS_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)Fun,Param,0,NULL); #define MAX_WRITEPERTHREAD 20 //每個線程最大寫入次數 #define MAXWRITE 10 //寫入線程數量 //單IO數據(擴展OVERLAPPED結構體 typedef struct _tagPER_IO_CONTEXT{ OVERLAPPED m_ol; //Overlapped結構體 HANDLE m_hFile; //操作的文件句柄 DWORD m_dwOp; //操作類型,OP_READ或OP_WRITE LPVOID m_pData; //操作的數據 UINT m_nLen; //操作的數據長度 DWORD m_dwWrite; //寫入的字節數 DWORD m_dwTimestamp; //起始操作的時間戳 }PER_IO_CONTEXT, *PPER_IO_CONTEXT; //IOCP線程池回調函數,實際就是完成通知響應函數 VOID CALLBACK FileIoCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped); //寫入文件的線程 DWORD WINAPI WriteThread(LPVOID lpParameter); //當前操作的文件對象的指針 LARGE_INTEGER g_liFilePointer = {}; //獲取文件所在的完整路徑(不含文件名,但包含最后的\) VOID GetAppPath(LPTSTR pszBuffer){ DWORD dwLen = 0; if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH))) return; for (DWORD i = dwLen; i > 0;i--){ if ('\\'== pszBuffer[i]){ pszBuffer[i + 1] = '\0'; break; } } } int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); TCHAR pFileName[MAX_PATH] = { 0 }; GetAppPath(pFileName); StringCchCat(pFileName, MAX_PATH, _T("OldIOCPFile.txt")); HANDLE ahWThread[MAX_WRITEPERTHREAD] = { 0 }; DWORD dwWrited = 0; //創建文件(使用FILE_FLAG_OVERLAPPED標志,表示異步設備) HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,NULL); if (INVALID_HANDLE_VALUE == hTxtFile){ _tprintf(_T("創建文件(%s)失敗,錯誤碼:0x%08X\n"), pFileName, GetLastError()); _tsystem(_T("PAUSE")); return 0; } //將文件句柄與IOCP線程池綁定 BindIoCompletionCallback(hTxtFile, FileIoCompletionRoutine, 0); //寫入UNICODE文件的前綴碼,以便正確打開 PER_IO_CONTEXT* pIo = (PPER_IO_CONTEXT)GRS_CALLOC(sizeof(PER_IO_CONTEXT)); GRS_ASSERT(NULL != pIo); pIo->m_dwOp = OP_WRITE; pIo->m_hFile = hTxtFile; pIo->m_pData = GRS_CALLOC(sizeof(WORD)); GRS_ASSERT(NULL != pIo->m_pData); *(WORD*)pIo->m_pData = MAKEWORD(0xff, 0xfe); //UNICODE文本文件的前綴 pIo->m_nLen = sizeof(WORD); //偏移文件指針 pIo->m_ol.Offset = g_liFilePointer.LowPart; pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart; g_liFilePointer.QuadPart += pIo->m_nLen; //重新文件指針的位置 pIo->m_dwTimestamp = GetTickCount(); //記錄時間戳 WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen, &pIo->m_dwWrite, &pIo->m_ol);//寫入Unicode前綴 //啟動寫入線程進行日志寫入操作 for (int i = 0; i < MAXWRITE; i++){ ahWThread[i] = GRS_BEGINTHREAD(WriteThread, hTxtFile); } //讓主線程等待寫入線程結束 WaitForMultipleObjects(MAXWRITE, ahWThread, TRUE, INFINITE); for (int i = 0; i < MAXWRITE; i++){ CloseHandle(ahWThread[i]); } //關閉日志文件 if (INVALID_HANDLE_VALUE != hTxtFile){ CloseHandle(hTxtFile); hTxtFile = INVALID_HANDLE_VALUE; } _tsystem(_T("PAUSE")); return 0; } VOID CALLBACK FileIoCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped) { if (ERROR_SUCCESS != dwErrorCode){ _tprintf(_T("I/O操作出錯,錯誤碼:%u\n"), dwErrorCode); return; } PER_IO_CONTEXT* pIoContext = CONTAINING_RECORD(lpOverlapped, PER_IO_CONTEXT, m_ol); DWORD dwCurTimestamp = GetTickCount(); switch (pIoContext->m_dwOp) { case OP_WRITE: _tprintf(_T("線程[0x%X]得到IO完成通知,完成操作(%s),緩沖區(0x%08X)長度(%ubytes),寫入時間戳(%u),當前時間戳(%u),時差(%u)\n"), GetCurrentThreadId(),OP_WRITE == pIoContext->m_dwOp ? _T("Write"):_T("Read"), pIoContext->m_pData,pIoContext->m_nLen,pIoContext->m_dwTimestamp,dwCurTimestamp,dwCurTimestamp-pIoContext->m_dwTimestamp); GRS_SAFEFREE(pIoContext->m_pData); GRS_SAFEFREE(pIoContext); break; case OP_READ: //這里沒用到 break; default: break; } } //寫入文件的線程 #define MAX_LOGLEN 256 DWORD WINAPI WriteThread(LPVOID lpParameter){ TCHAR pTxtContext[MAX_LOGLEN] = {0}; PER_IO_CONTEXT* pIo = NULL; size_t szLen = 0; LPTSTR pWriteText = NULL; StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("這是一條模擬的日志記錄,由線程[0x%08X]寫入.\r\n"), GetCurrentThreadId()); StringCchLength(pTxtContext, MAX_LOGLEN, &szLen); szLen +=1; int i=0; //每個線程寫入20次 for (; i < MAX_WRITEPERTHREAD;i++){ pWriteText = (LPTSTR)GRS_CALLOC(szLen*sizeof(TCHAR)); //每條記錄 GRS_ASSERT(NULL != pWriteText); StringCchCopy(pWriteText, szLen, pTxtContext); //為每個IO操作申請一個“單IO數據”結構體 pIo = (PER_IO_CONTEXT*)GRS_CALLOC(sizeof(PER_IO_CONTEXT)); GRS_ASSERT(NULL != pIo); pIo->m_dwOp = OP_WRITE; pIo->m_hFile = (HANDLE)lpParameter; pIo->m_pData = pWriteText; //pWriteText為堆上分配的數據!生命期長於線程函數 pIo->m_nLen = (szLen-1)* sizeof(TCHAR); //注意,這里不寫入每行最后的\0 //這里使用原子操作同步文件指針,寫入不會相互覆蓋 //函數執行時,先比較第1個參數和第3個參數是否相同(此處必然相等)。相等時 //將參數2的值更新到g_liFilePointer里,同時將g_liFilePointer中舊的值賦值給 //pIo->m_ol.Pointer(這是一個技巧,體現了lock-free算法的精髓) *((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart, g_liFilePointer.QuadPart + pIo->m_nLen,g_liFilePointer.QuadPart); pIo->m_dwTimestamp = GetTickCount(); //記錄寫入時間戳 //寫入 WriteFile((HANDLE)lpParameter, pIo->m_pData, pIo->m_nLen, &pIo->m_dwWrite, &pIo->m_ol); } return i; //該線程寫入的次數 }