11.2 Win2008以上的新線程池
(1)傳統線程池的優缺點:
①傳統Windows線程池調用簡單,使用方便(有時只需調用一個API即可)
②這種簡單也帶來負面問題,如接口過於簡單,無法更多去控制線程池的行為。
(2)Windows2008新線程池及API
線程池對象 |
傳統API |
Win2008及以上平台新API |
普通任務線程池 |
QueueUserWorkItem |
CreateThreadpoolWork |
TrySubmitThreadpoolWaitCallbacks |
||
SubmitThreadpoolWork |
||
CloseThreadpoolWork |
||
計時器線程池 |
CreateTimerQueue(創建線程池) |
CreateThreadpoolTimer |
CreateTimerQueueTimer(創建計時器) |
SetThreadpoolTimer |
|
ChangeTimerQueueTimer |
IsThreadpoolTimerSet |
|
DeleteTimerQueueTimer |
WaitForThreadpoolTimerCallbacks |
|
DeteTimerQueueEx |
CloseThreadpoolTimer |
|
同步對象等待線程池 |
RegisterWaitForSingleObject |
CreateThreadpoolWait |
UnregisterWaitEx |
SetThreadpoolWait |
|
|
WaitForThreadpoolWaitCallbacks |
|
|
CloseThreadpoolWait |
|
完成端口線程池 |
BindIoCompletionCallback |
CreateThreadpoolIo |
StartThreadpoolIo |
||
CancelThreadpoolIo |
||
WaitForThreadpoolIoCallbacks |
||
CloseThreadpoolIo |
(3)新線程池輔助API
功能 |
輔助API |
線程池清理器 |
CreateThreadpoolCleanupGroup |
CloseThreadpoolCleanupGroup |
|
ClosethreadpoolCleanupGroupMembers |
|
線程池控制函數 |
CreateThreadpool |
CloseThreadpool |
|
SetThreadpoolThreadMaximum |
|
SetThreadpoolMinimum |
|
線程池環境設備 |
InitializeThreadpoolEnviroment |
DestroyThreadpoolEnvironment |
|
SetThreadpoolCallbackCleanupGroup |
|
SetThreadCallbackLibrary |
|
SetThreadpoolbackpool |
|
SetThreadpoolCallbackRunsLong |
|
顯式設定一個長時間調用的回調線程池函數 |
CallbackMayRunLong |
清理及回調方法 |
DisassociateCurrentThreadFromCallback |
FreeLibraryWhenCallbackReturns |
|
LeaveCriticalSectionWhenCallbackReturns |
|
ReleaseMutexWhenCallbackReturns |
|
ReleaseSemaphoreWhenCallbackReturns |
|
SetEventWhenCallbackReturns |
(3)Win2008新線程池的一般編程模型
11.2.1 以異步方式調用函數
(1)單步使用線程池——TrySubmitThreadpoolCallback函數提交異步函數給線程池
參數 |
描述 |
PTP_SIMPLE_CALLBACK pfnCallback |
回調函數,其原型為: VOID NTAPI SimpleCallback (PTP_CALLBACK_INSTANCE pInstance, //不透明的參數,調用回調函數時,由Windows自動傳入,可用於繼續傳給其他回調終止操作的函數使用,如LeaveCriticalSectionWhenCallbackReturns。。 PVOID pvContext);//其中的pInstanc見“回調函數終止時行為”那部分的內容 |
PVOID pvContext |
回調函數的額外參數。 |
PTP_CALLBACK_ENVIRON pcbe |
回調環境,用來對線程池進行定制的參數。(注意,這個結構體內部與一個線程池關聯,該參數為NULL時,會創建默認線程池,否則我們可以用CreateThreadpool來創建一個線程池,並與這個結構體關聯起來) |
備注:當為pcbe為NULL時,該函數被調用時系統會在進程中創建一個默認的線程池,並讓線程池中的一個線程來調用指定的回調函數。該函數(內部調用PostQueuedCompletionStatus)將一個工作項添加到隊列中。 |
(2)兩步使用線程池
①CreateThreadpoolWork創建“工作項”。注意與之前所說的那些工作項不同。這里的工作項是個對象,不能簡單理解成是一個回調函數。而是關聯了回調函數及回調環境的一個對象了!
參數 |
描述 |
PTP_WORK_CALLBACK pfnWorkHandler |
工作項要關聯的回調函數,其原型為 VOID CALLBACK WorkCallback( PTP_CALLBACK_INSTNACE,PVOID pvContext,PTP_WORK work); |
PVOID pvContext |
回調函數的額外參數 |
PTP_CALLBACK_ENVIRON pcbe |
回調環境,見《TrySubmitThreadpoolCallback函數》的說明 |
②SubmitThreadpoolWork提交這個工作項給線程池。結束后還可以關閉該工作項。
SubmitThreadpoolWork(PTP_WORK pWork);
【注意】
★WaitForThreadpoolWorkCallbacks(pWork,bCancelPendingCallbacks)取消己提交的工作項或等待工作項處理完成
bCancelPendingCallbacks為FALSE,會等待工作項處理完成,函數再返回。為TRUE時會試圖取消pWork這個工作項。
如果用一個PTP_WORK提交了多個工作項,當bCancelPendingCallbacks為FALSE時則會等待所有的己提交的工作項,如果為TRUE只要等待當前正在運行的工作項完成時就返回。
★CloseThreadpoolWork關閉一個工作項
【Batch示例程序】批處理程序
//主程序
/************************************************************************* Module: Batch.cpp Notices: Copyright(c) 2008 Jeffrey Ritcher & Christophe Nasarre *************************************************************************/ #include "..\..\CommonFiles\CmnHdr.h" #include "resource.h" #include <tchar.h> #include <strsafe.h> ////////////////////////////////////////////////////////////////////////// //全局變量 HWND g_hDlg = NULL; PTP_WORK g_pWorkItem = NULL;//工作項對象 volatile LONG g_nCurrentTask = 0; //自定義消息 #define WM_APP_COMPLETED (WM_APP + 123) ////////////////////////////////////////////////////////////////////////// void AddMessage(LPCTSTR szMsg){ HWND hListBox = GetDlgItem(g_hDlg, IDC_LB_STATUS); ListBox_SetCurSel(hListBox, ListBox_AddString(hListBox, szMsg)); } ////////////////////////////////////////////////////////////////////////// void WINAPI TaskHandler(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_WORK pWork){ LONG currentTask = InterlockedIncrement(&g_nCurrentTask); TCHAR szMsg[MAX_PATH]; StringCchPrintf(szMsg, _countof(szMsg), TEXT("線程[%u]:%u號任務開始. "), GetCurrentThreadId(), currentTask); AddMessage(szMsg); //模擬許多的工作 Sleep(currentTask * 1000); StringCchPrintf(szMsg, _countof(szMsg), TEXT("線程[%u]:%u號任務結束. "), GetCurrentThreadId(), currentTask); AddMessage(szMsg); if (InterlockedDecrement(&g_nCurrentTask)==0){ //通知UI線程任務己經完成 PostMessage(g_hDlg, WM_APP_COMPLETED, 0, (LPARAM)currentTask); } } ////////////////////////////////////////////////////////////////////////// void OnStartBatch(){ //禁用“開始”按鈕 Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), FALSE); AddMessage(TEXT("----開始新的批處理----")); //使用同一個工作項對象,提交6個任務 for (int i = 0; i < 6;i++){ SubmitThreadpoolWork(g_pWorkItem); } //SubmitThreadpoolWork(g_pWorkItem); //SubmitThreadpoolWork(g_pWorkItem); //SubmitThreadpoolWork(g_pWorkItem); //SubmitThreadpoolWork(g_pWorkItem); //SubmitThreadpoolWork(g_pWorkItem); } ////////////////////////////////////////////////////////////////////////// BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){ g_hDlg = hwnd; return TRUE; } void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){ switch (id) { case IDOK: case IDCANCEL: EndDialog(hwnd, id); break; case IDC_BTN_START_BATCH: OnStartBatch(); break; } } ////////////////////////////////////////////////////////////////////////// INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){ switch (uMsg){ chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog); chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand); case WM_APP_COMPLETED: { TCHAR szMsg[MAX_PATH + 1]; StringCchPrintf(szMsg, _countof(szMsg), TEXT("----%u號任務是批處理的最后一個任務----"), lParam); AddMessage(szMsg); //啟用“開始”按鈕 Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), TRUE); } break; } return FALSE; } ////////////////////////////////////////////////////////////////////////// int APIENTRY _tWinMain(HINSTANCE hInst, HINSTANCE, LPTSTR pCmdLine, int){ //創建用於供所有任務使用的工作項對象(最后一個參數為NULL,使用進程默認的線程池!) g_pWorkItem = CreateThreadpoolWork(TaskHandler, NULL, NULL); if (NULL == g_pWorkItem){ MessageBox(NULL, TEXT("無法創建任務所需的工作項對象"), TEXT(""), MB_ICONSTOP); return -1; } //ttoi,將字符轉為整型 DialogBoxParam(hInst, MAKEINTRESOURCE(IDD_MAIN), NULL, Dlg_Proc,_ttoi(pCmdLine)); //關閉工作項對象 CloseThreadpoolWork(g_pWorkItem); return 0; }
//resource.h
//{{NO_DEPENDENCIES}} // Microsoft Visual C++ 生成的包含文件。 // 供 11_Batch.rc 使用 // #define IDD_MAIN 101 #define IDC_LB_STATUS 1001 #define IDC_BTN_START_BATCH 1002 // Next default values for new objects // #ifdef APSTUDIO_INVOKED #ifndef APSTUDIO_READONLY_SYMBOLS #define _APS_NEXT_RESOURCE_VALUE 102 #define _APS_NEXT_COMMAND_VALUE 40001 #define _APS_NEXT_CONTROL_VALUE 1003 #define _APS_NEXT_SYMED_VALUE 101 #endif #endif
//Batch.rc
// Microsoft Visual C++ generated resource script. // #include "resource.h" #define APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 2 resource. // #include "winres.h" ///////////////////////////////////////////////////////////////////////////// #undef APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // 中文(簡體,中國) resources #if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS) LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED #ifdef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // TEXTINCLUDE // 1 TEXTINCLUDE BEGIN "resource.h\0" END 2 TEXTINCLUDE BEGIN "#include ""winres.h""\r\n" "\0" END 3 TEXTINCLUDE BEGIN "\r\n" "\0" END #endif // APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // Dialog // IDD_MAIN DIALOGEX 0, 0, 191, 190 STYLE DS_SETFONT | DS_MODALFRAME | DS_CENTER | WS_POPUP | WS_CAPTION | WS_SYSMENU CAPTION "利用線程池進行批處理" FONT 9, "宋體", 400, 0, 0x86 BEGIN DEFPUSHBUTTON "退出",IDOK,136,169,50,14 LISTBOX IDC_LB_STATUS,7,25,176,142,LBS_NOINTEGRALHEIGHT | NOT WS_BORDER | WS_VSCROLL | WS_HSCROLL | WS_TABSTOP,WS_EX_STATICEDGE PUSHBUTTON "開始批處理",IDC_BTN_START_BATCH,7,7,50,14 END ///////////////////////////////////////////////////////////////////////////// // // DESIGNINFO // #ifdef APSTUDIO_INVOKED GUIDELINES DESIGNINFO BEGIN IDD_MAIN, DIALOG BEGIN LEFTMARGIN, 7 RIGHTMARGIN, 186 TOPMARGIN, 7 BOTTOMMARGIN, 183 END END #endif // APSTUDIO_INVOKED #endif // 中文(簡體,中國) resources ///////////////////////////////////////////////////////////////////////////// #ifndef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 3 resource. // ///////////////////////////////////////////////////////////////////////////// #endif // not APSTUDIO_INVOKED
11.2.2 每隔一段時間調用一個函數
(1)CreateThreadpoolTimer函數——在線程池中創建一個定時器對象
參數 |
描述 |
PTP_TIMER_CALLBACK PfnTimerCallback |
回調函數指針,其原型為 VOID CALLBACK TimeoutCallback( PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext,PTP_TIMER pTimer); |
PVOID pvContext |
傳給回調函數的額外參數 |
PTP_CALLBACK_ENVIRON pcbe |
回調環境 |
返回值 |
計時器工作項,TP_TIMER對象的指針 |
(2)SetThreadpoolTimer:向線程池注冊計時器
參數 |
描述 |
PTP_TIMER pTimer |
要設定的計時器的指針 |
PFILETIME pftDueTime |
第1次調用回調函數的時間。 ①NULL:表示停止調用回調函數。(暫停但不銷毀計時器) ②小於0:表示相對時間(單位微秒),即相對於調用SetThreadpoolTimer的時間。 ③-1:表示立即開始。 ④正數:以100ns為單位,從1600年1月1日 開始計算的周期數。 |
DWORD msPeriod |
觸發周期(單位微秒)。0表示只觸發一次。注意如果回調函數執行的時間太長,而回調函數觸發的周期又很短,此時系統會啟動多個線程來執行這些回調函數。 |
DWORD msWindowLength |
用來給回調函數的執行增加一些隨機性。單位微秒 ①設當前設定的觸發時刻為T,則下次觸發的時間是[T+msPeriod,T+msPeriod+msWindowLength]之間的任何一個時間值。 ②msWindowLength的另一個作用是將計時器分組。如計時器A會在5-7微秒內被觸發,計時器B會在6-8微秒內。因時間有重疊,所以線程池只會喚醒一個線程來處理這兩個計時器,在處理完A的回調函數后,該線程不進入睡眠,會直接再調用B的回調函數,以減少用兩個線程調用時產生的額外的線程上下文切換的開銷。 |
(3)IsThreadpoolTimerSet——確定某個計時器是否己經被設置,即pftDueTime!=NULL
(4)WaitForThreadpoolTimerCallbacks等待一個計時器完成。
(5)CloseThreadpoolTimer釋放計時器的內存。
【TimedMessageBox示例程序】
/*----------------------------------------------------------------------------------------- Module: TimedMsgBox.cpp Notices:Copyright(c) 2008 Jeffrey Richter & Christophe Nasarre -----------------------------------------------------------------------------------------*/ #include "../../CommonFiles/CmnHdr.h" #include <tchar.h> #include <strsafe.h> ////////////////////////////////////////////////////////////////////////// TCHAR g_szCaption[100]; //消息框的標題 int g_nSecLeft = 10; //消息框中顯示的剩余時間 #define ID_MSGBOX_STATIC_TEXT 0x0000FFFF //對話框中文本框的ID,這是系統默認的ID ////////////////////////////////////////////////////////////////////////// VOID CALLBACK MsgBoxTimeoutCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PTP_TIMER pTimer){ //注意,因競爭條件,當程序運行到這里時,可能對話框還沒有創建! HWND hwnd = FindWindow(NULL, g_szCaption); if (hwnd !=NULL){ if (g_nSecLeft ==1){ //時間結束,強迫對話框退出 EndDialog(hwnd, IDOK); return; } //如果對話框存在,則更新剩作時間 TCHAR szMsg[100]; StringCchPrintf(szMsg, _countof(szMsg), TEXT("還剩%d秒,按確定將取消"), --g_nSecLeft); SetDlgItemText(hwnd, ID_MSGBOX_STATIC_TEXT, szMsg); } } ////////////////////////////////////////////////////////////////////////// int WINAPI _tWinMain(HINSTANCE hInstance,HINSTANCE hPrevInstance, PTSTR lpCmdLine,int nShowCmd){ StringCchPrintf(g_szCaption, _countof(g_szCaption), TEXT("Timed Message Box")); //創建線程池計時器對象 PTP_TIMER lpTimer = CreateThreadpoolTimer(MsgBoxTimeoutCallback, NULL, NULL); if (NULL ==lpTimer){ TCHAR szMsg[MAX_PATH]; StringCchPrintf(szMsg, _countof(szMsg), TEXT("無法創建計時器對象:%u"), GetLastError()); MessageBox(NULL, szMsg, TEXT("錯誤"), MB_OK | MB_ICONERROR); return (-1); } //設置定時器1秒后觸發,以后每秒觸發一次 ULARGE_INTEGER ulRelativeStartTime; ulRelativeStartTime.QuadPart = (LONGLONG)(-1000000); //單位微秒。轉換后為1秒。 FILETIME ftRelativeStartTime; ftRelativeStartTime.dwHighDateTime = ulRelativeStartTime.HighPart; ftRelativeStartTime.dwLowDateTime = ulRelativeStartTime.LowPart; SetThreadpoolTimer( lpTimer, &ftRelativeStartTime, 1000, //每隔一秒觸發 0 ); //顯示消息框 MessageBox(NULL, TEXT("還剩10秒,按確定將取消"), g_szCaption, MB_OK); //清除計時器對象 CloseThreadpoolTimer(lpTimer); //判斷是用戶取消計時或者是超時 MessageBox(NULL, (g_nSecLeft == 1) ? TEXT("超時") : TEXT("用戶取消"), TEXT("結果"), MB_OK); return 0; }
【NewWorkPool程序】簡單的工作項和定時器回調演示
#include <windows.h> #include <tchar.h> #include <strsafe.h> #include <locale.h> ////////////////////////////////////////////////////////////////////////// //簡單的工作項函數 VOID CALLBACK MyWorkCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_WORK pWork){ BOOL bRet = FALSE; DWORD dwPriorityOriginal = 0; dwPriorityOriginal = GetThreadPriority(GetCurrentThread()); if (THREAD_PRIORITY_ERROR_RETURN == dwPriorityOriginal){ _tprintf(_T("GetThreadPriority失敗。錯誤碼:%u\n"), GetLastError()); return; } bRet = SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL); if (FALSE == bRet){ _tprintf(_T("SetThreadPriority失敗。錯誤碼:%u\n"), GetLastError()); return; } _tprintf(_T("[ID:0x%X] MyWorkCallback Runing...\n"), GetCurrentThreadId()); bRet = SetThreadPriority(GetCurrentThread(), dwPriorityOriginal); if (FALSE == bRet){ _tprintf(_T("SetThreadPriority失敗。錯誤碼:%u\n"), GetLastError()); return; } return; } //簡單的定時回調函數 VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_TIMER pTimer){ _tprintf(_T("[ID:0x%X] MyTimerCallback Runing...\n"), GetCurrentThreadId()); } ////////////////////////////////////////////////////////////////////////// int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); BOOL bRet = FALSE; PTP_WORK pWork = NULL; PTP_TIMER pTimer = NULL; PTP_POOL pPool = NULL; PTP_WORK_CALLBACK workcallback = MyWorkCallback; PTP_TIMER_CALLBACK timercallback = MyTimerCallback; TP_CALLBACK_ENVIRON CallbackEnviron; PTP_CLEANUP_GROUP cleanupgroup = NULL; FILETIME ftDueTime; ULARGE_INTEGER ulDueTime; UINT rollback = 0; __try{ //初始化環境塊 InitializeThreadpoolEnvironment(&CallbackEnviron); //創建線程池 pPool = CreateThreadpool(NULL); if (NULL == pPool){ _tprintf(_T("創建線程池失敗!錯誤碼:%u\n"), GetLastError()); __leave; } rollback = 1;//創建線程池成功! //設置線程數 SetThreadpoolThreadMaximum(pPool, 8); bRet = SetThreadpoolThreadMinimum(pPool, 2); if (!bRet){ _tprintf(_T("SetThreadpoolThreadMinimum失敗,錯誤碼:%u\n"), GetLastError()); __leave; } //創建資源清理器 cleanupgroup = CreateThreadpoolCleanupGroup(); if (NULL == cleanupgroup){ _tprintf(_T("CreateThreadpoolCleanupGroup失敗!錯誤碼:%u\n"), GetLastError()); __leave; } rollback = 2; //資源清理器創建成功 //將環境塊與線程池關聯 SetThreadpoolCallbackPool(&CallbackEnviron, pPool); //將清理器與環境塊聯聯 SetThreadpoolCallbackCleanupGroup(&CallbackEnviron, cleanupgroup,NULL); //創建線程池需要的回調函數,這里是一個普通的工作項 pWork = CreateThreadpoolWork(workcallback, NULL, &CallbackEnviron); if (NULL == pWork){ _tprintf(_T("創建線程池普通工作項失敗!錯誤碼:%u\n"), GetLastError()); __leave; } rollback = 3; //創建普通工作項成功 SubmitThreadpoolWork(pWork); //提交工作項 //創建一個定時回調項 pTimer = CreateThreadpoolTimer(timercallback, NULL, &CallbackEnviron); if (NULL == pTimer){ _tprintf(_T("創建線程池計時器對象失敗!錯誤碼:%u\n"), GetLastError()); __leave; } rollback = 4; //計時器對象創建成功 //設定定時回調周期 ulDueTime.QuadPart = (LONGLONG)-(1 * 10 * 1000 * 1000); //1秒以后觸發 ftDueTime.dwHighDateTime = ulDueTime.HighPart; ftDueTime.dwLowDateTime = ulDueTime.LowPart; SetThreadpoolTimer(pTimer, &ftDueTime, 0, 0); //只調用一次 //主線程進入等待狀態或干別的工作 Sleep(1500); //當所有的線程池回調函數都被執行后,關閉清理器 CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL); //事務標志,回滾到第2步,執行第2步后的銷毀工作 rollback = 2; __leave; } __finally{ switch (rollback) { case 4: case 3: //關閉清理器 CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL); break; case 2: //關閉清理器 CloseThreadpoolCleanupGroup(cleanupgroup); break; case 1: //關閉線程池 CloseThreadpool(pPool); break; default: break; } } _tsystem(_T("PAUSE")); return 0; }
11.2.3 在內核對象觸發時調用一個函數
(1)CreateThreadpoolWait——創建一個線程池等待對象(也是一個工作項,等待項)
參數 |
描述 |
PTP_WAIT_CALLBACK pfnWaitCallback |
回調函數指針,其原型為 VOID CALLBACK WaitCallback( PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext,PTP_WAIT pWait, TP_WAIT_RESULT WaitResult); |
PVOID pvContext |
傳給回調函數的額外參數 |
PTP_CALLBACK_ENVIRON pcbe |
回調環境 |
返回值 |
等待對象的指針 |
備注:回調函數的WaitResult表示回調函數被調用的原因: WAIT_OBJECT_0:表示傳給SetThreadpoolWait的內核對象在超時之前被觸發。 WAIT_TIMEOUT:表示內核對象在超時之前沒被觸發,回調函數被執行是因為超時 WAIT_ABANDONED_0:表示內核對象一個互斥量,並且互斥量被“遺棄”,觸發了回調函數。 |
(2)SetThreadpoolWait——將某個內核對象綁定到線程池
參數 |
描述 |
PTP_WAIT pWaitItem |
傳入由CreateThreadWait返回的對象的指針 |
HANDLE hObject |
要綁定的內核對象,當該對象被觸發時,會調用線程池中的WaitCallback函數。 |
PFILETIME pftTimeout |
線程池願意花的最長多少時間來等待內核對象觸發。 0:立即返回,負值為相對時間,正值為絕對時間,NULL表示無限等待。(線程池內部調用了WaitForMultipleObjects) |
備注:①線程池內部讓一個線程調用WaitForMultipleObjects並傳入由SetThreadpoolWait函數注冊的句柄,不斷地組成一個句柄組,同時將Wait*函數的bWaitAll設為FALSE,這樣當任何一個句柄被觸發,線程池就會被喚醒。 ②因WaitForMultipleObjects不允許將同一個句柄傳入多次,因此必須確保不會用SetThreadpoolWait來多次注冊同一個句柄,但可以調用DuplicationHandle復制句柄並傳給Set*函數。 ③因WaitForMultipleObjects一次最多只能等待64個內核對象,因此線程池實際上為每64個內核對象分配一個線程來等待,所以效率比較高。因此,如果要等待超過64個以上的內核對象,可以考慮用這種線程池,因為系統會每64個內核對象,就開辟一個線程來等待這些內核對象。 ④一旦線程池中一個線程調用了我們的回調函數,對應的等待項將進入“不活躍”狀態。這意味着如果在同一個內核對象被觸發時再次調用這個回調函數時,需要調用SetThreadpoolWait再次注冊。如果傳入的hObject為NULL,將把pWaitItem這個等待項從線程中移除。 |
(3)WaitForThreadpoolWaitCallbacks:等待一個等待項完成
(4)ClosethreadpoolWait函數:釋放一個等待項的內存
【NewWaitCallback程序】演示觸發內核對象時,調用一個函數
#include <windows.h> #include <tchar.h> #include <strsafe.h> #include <locale.h> ////////////////////////////////////////////////////////////////////////// VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_WAIT pWait, TP_WAIT_RESULT WaitResult){ _tprintf(_T("線程[ID:0x%X] MyWaitCallback Runing...\n"), GetCurrentThreadId()); } ////////////////////////////////////////////////////////////////////////// int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); PTP_WAIT pWait = NULL; PTP_WAIT_CALLBACK pfnWaitCallback = MyWaitCallback; HANDLE hEvent = NULL; UINT rollback = 0; //創建一個事件對象 hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //自動重置 if (NULL == hEvent) return 0; rollback = 1; //創建事件對象成功 __try{ //創建等待線程池 pWait = CreateThreadpoolWait(pfnWaitCallback, NULL, NULL);//利用系統默認線程池 if (NULL == pWait){ _tprintf(_T("CreateThreadpoolWait失敗。錯誤碼:%u\n"), GetLastError()); __leave; } rollback = 2; //模擬等待5次,注意每次等待前要調用SetThreadpoolWait方法 for (int i = 0; i < 5;i++){ SetThreadpoolWait(pWait, hEvent, NULL); //這句很重要 SetEvent(hEvent); Sleep(500); //主線程等待回調線程池調用完畢 WaitForThreadpoolWaitCallbacks(pWait, TRUE); } }__finally{ switch (rollback) { case 2: SetThreadpoolWait(pWait, NULL, NULL);//取消等待項 CloseThreadpoolWait(pWait); break; case 1: CloseHandle(hEvent); break; default: break; } } _tsystem(_T("PAUSE")); return 0; }
11.2.4 在異步I/O請求完成時調用一個函數
(1)CreateThreadpoolIo:創建線程池Io對象
參數 |
描述 |
HANDLE hDevice |
要關聯的設備句柄 |
PTP_WIN32_IO_CALLBACK pfnIoCallback |
回調函數指針,其原型為 VOID CALLBACK OverlappedCompletionRoutine( PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext,PVOID pOverlapped, ULONG IoResult,//操作結果,成功時為NO_ERROR ULONG_PTR NumberOfBytesTransferred,//已傳輸字節數 PTP_IO //指向線程池中的I/O項的指針, //即CreateThreadpoolIo的返回值 ); |
PVOID pvContext |
傳給回調函數的額外參數 |
PTP_CALLBACK_ENVIRON pcbe |
回調環境 |
返回值 |
I/O對象(一個工作項)的指針 |
(2)將IO對象(工作項)與線程池內部的I/O完成端口關聯
VOID StartThreadpoolIo(PTP_IO pio);
【注意】每次調用ReadFile和Writefile之前都必調用StartThreadpoolIo,否則回調函數不會被調用(這步相當於給完成端口增加IO完成項通知)
(3)停止線程池調用回調函數:VOID CancelThreadpoolIo(PTP_IO pio);
【注意】①當發出IO請求之后,可以用這來取消。
②在調用ReadFile或WriteFile失敗時,仍然必須調用CancelThreadpoolIo(除了返回FALSE且GetLastError為ERROR_IO_PENDING,因這表示正在完成)
(4)等待一處待處理的IO請求完成。
WaitForThreadIoCallbacks(pio,bCancelPendingCallbacks);
【注意】
①該函數須在另一個線程使用,而不能在回調函數內部使用,因為這會造成死鎖。
②如果bCancelPendingCallbacks為TRUE,那么當請求完成的時候,回調函數不會被調用(如果尚未被調用)。這和調用CancelThreadpoolIo函數的時候很相似。
(5)解除IO對象(工作項)與線程池的關聯:VOID CloseThreadpoolIo(PTP_IO pio);
【NewIOCPPool程序】模擬寫入日志文件
效果圖與【IOCPPool程序】一致。
#include <windows.h> #include <tchar.h> #include <strsafe.h> #include <locale.h> ////////////////////////////////////////////////////////////////////////// #define QMLX_ALLOC(sz) HeapAlloc(GetProcessHeap(),0,sz) //QMLX:淺墨濃香 #define QMLX_CALLOC(sz) HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz) #define QMLX_SAFEFREE(p) if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;} #define QMLX_ASSERT(s) if(!(s)){DebugBreak();} #define QMLX_BEGINTHREAD(Fun,Param) CreateThread(NULL,0,\ (LPTHREAD_START_ROUTINE)Fun,Param,0,NULL); ////////////////////////////////////////////////////////////////////////// #define MAXWRITEPERTHREAD 20 //每個線程最大寫入次數 #define MAXWRITETHREAD 10 //寫入線程的數量 #define OP_READ 0x01 //讀操作 #define OP_WRITE 0x02 //寫操作 //單IO數據 typedef struct _tagPerIoData{ OVERLAPPED m_ol; HANDLE m_hFile; //操作的文件句柄 DWORD m_dwOp; //操作類型,OP_READ或OP_WRITE LPVOID m_pData; //操作的數據 UINT m_nLen; //操作的數據長度 DWORD m_dwWrite; //寫入的字節數 DWORD m_dwTimestamp; //起始操作的時間戳 }PER_IO_DATA,*PPER_IO_DATA; //IOCP線程池回調函數,實際就是完成通知的響應函數 VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PVOID pOverlapped, ULONG IoResult,ULONG_PTR NumberOfBytesTransferred,PTP_IO pio); //寫文件的線程 DWORD WINAPI WriteThread(LPVOID lpParam); //當前操作的文件對象的指針 LARGE_INTEGER g_liFilePointer = { 0 }; //IOCP線程池 PTP_IO g_pThreadpoolIo = NULL; ////////////////////////////////////////////////////////////////////////// //獲取可模塊的路徑名(路徑后含‘\’) VOID GetAppPath(LPTSTR pszBuffer){ DWORD dwLen = 0; if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH))) return; for (DWORD i = dwLen; i > 0;i--){ if ('\\' == pszBuffer[i]){ pszBuffer[i + 1] = '\0'; break; } } } int _tmain(){ _tsetlocale(LC_ALL, _T("chs")); TCHAR pFileName[MAX_PATH] = {}; GetAppPath(pFileName); StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt")); HANDLE ahWThread[MAXWRITETHREAD] = {}; DWORD dwWrited = 0; //創建文件 HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL); if (INVALID_HANDLE_VALUE == hTxtFile){ _tprintf(_T("CreateFile(%s)失敗,錯誤碼:%u\n"), GetLastError()); _tsystem(_T("PAUSE")); return 0; } //初始化線程池回調環境 TP_CALLBACK_ENVIRON poolEnv = {}; InitializeThreadpoolEnvironment(&poolEnv); //創建IOCP線程池 g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback,hTxtFile,&poolEnv); //啟動IOCP線程池 StartThreadpoolIo(g_pThreadpoolIo); //寫入UNICODE文件的前綴碼,以便正確打開 PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA)); QMLX_ASSERT(pIo != NULL); pIo->m_dwOp = OP_WRITE; pIo->m_hFile = hTxtFile; pIo->m_pData = QMLX_CALLOC(sizeof(WORD)); QMLX_ASSERT(pIo->m_pData != NULL); *((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE); pIo->m_nLen = sizeof(WORD); //偏移文件指針 pIo->m_ol.Offset = g_liFilePointer.LowPart; pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart; g_liFilePointer.QuadPart += pIo->m_nLen; pIo->m_dwTimestamp = GetTickCount(); //記錄時間戳 WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen, &pIo->m_dwWrite,(LPOVERLAPPED)&pIo->m_ol); //等待IOCP線程池完成操作 WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE); //啟動寫入線程進行日志寫入操作 for (int i = 0; i < MAXWRITETHREAD;i++){ ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile); } //讓主線程等待這些寫入線程結束 WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE); for (int i = 0; i < MAXWRITETHREAD;i++){ CloseHandle(ahWThread[i]); } //關閉IOCP線程池 CloseThreadpoolIo(g_pThreadpoolIo); //關閉日志文件 if (INVALID_HANDLE_VALUE != hTxtFile){ CloseHandle(hTxtFile); hTxtFile = INVALID_HANDLE_VALUE; } _tsystem(_T("PAUSE")); return 0; } //IOCP線程池回調函數,實際就是完成通知的響應函數 VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped, ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio) { if (NO_ERROR != IoResult){ _tprintf(_T("I/O操作出錯,錯誤碼:%u\n"),IoResult); return; } PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol); DWORD dwCurTimestamp = GetTickCount(); switch (pIo->m_dwOp) { case OP_WRITE://寫操作結束 {//寫入操作結束 _tprintf(_T("線程[0x%x]得到IO完成通知,完成操作(%s),緩沖(0x%08x)長度(%ubytes),寫入時間戳(%u)當前時間戳(%u)時差(%u)\n"), GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"), pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp); QMLX_SAFEFREE(pIo->m_pData); QMLX_SAFEFREE(pIo); } break; case OP_READ: //讀操作結束 break; default: break; } } //寫文件的線程 #define MAX_LOGLEN 256 DWORD WINAPI WriteThread(LPVOID lpParam) { TCHAR pTxtContext[MAX_LOGLEN] = {}; PPER_IO_DATA pIo = NULL; size_t szLen = 0; LPTSTR pWriteText = NULL; StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("這是一條模擬的日志記錄,由線程[0x%x]寫入\r\n"), GetCurrentThreadId()); StringCchLength(pTxtContext, MAX_LOGLEN, &szLen); szLen += 1; int i = 0; for (; i < MAXWRITEPERTHREAD;i++){ pWriteText = (LPTSTR)QMLX_CALLOC(szLen*sizeof(TCHAR)); QMLX_ASSERT(NULL != pWriteText); StringCchCopy(pWriteText, szLen, pTxtContext); //為每個操作申請一個“單IO數據”結構體 pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA)); QMLX_ASSERT(pIo != NULL); pIo->m_dwOp = OP_WRITE; pIo->m_hFile = (HANDLE)lpParam; pIo->m_pData = pWriteText; pIo->m_nLen = (szLen-1)*sizeof(TCHAR); //這里使用原子操作同步文件指針,寫入不會相互覆蓋 //這個地方體現了lock-free算法的精髓,使用了基本的CAS操作控制文件指針 //比傳統的使用關鍵代碼段並等待的方法,這里用的方法要輕巧的多,付出的代價也小 *((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart, g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart); pIo->m_dwTimestamp = GetTickCount(); //記錄時間戳 StartThreadpoolIo(g_pThreadpoolIo); //寫入 WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen, &pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol); if (ERROR_IO_PENDING != GetLastError()){ CancelThreadpoolIo(g_pThreadpoolIo); } } return i; }
11.2.5 回調函數的終止操作
(1)回調函數的pInstance參數:當線程調用回調函數時,Windows會自動傳一個pInstance參數(類型PTP_CALLBACK_INSTANCE)給回調函數,然后回調函數將這個參數又傳給如下的函數,以便在這些函數在回調函數完后,執行一些相應的終止操作(主要是用來通知另一個線程,線程池中的工作項己經完成。顯然,如下函數是在回調函數的內部進行調用的!)
函數 |
終止操作 |
LeaveCriticalSectionWhenCallbackReturns |
當回調函數返回時,線程池會自動調用LeavCriticalSection,並在參數中傳入指定的CRITCAL_SECTION結構體。 |
ReleaseMutexWhenCallbackReturns |
當回調函數返回時,線程池會自動調用ReleaseMutex,並在參數中傳入指定的HANDLE。 |
ReleaseSemaphoreWhenCallbackReturns |
當回調函數返回的時候,線程池會自動調用ReleaseSemaphore,並在參數中傳入指定的HANDLE |
SetEventWhenCallbackReturns |
當回調函數返回的時候,線程池會自動調用SetEvent,並在參數中傳入指定的HANDLE。 |
FreeLibraryWhenCallbackReturns |
當回調函數返回的時候,線程池會自動調用FreeLibrary,並在參數中傳入指定的HMOUDLE。 (注意:如果回調函數是從DLL中載入的,這個函數尤為重要,因為當線程執行完畢后,回調函數不能自己調用FreeLibrary,否則回調函數代碼將從進程中清除,這樣當FreeLibrary試圖返回到回調函數時,會引發訪問違規) |
注意,對於任何一個回調函數,只能執行上述的一種終止操作。如果調用了以上的多個函數,則最后調用的終止函數會覆蓋之前調用的那個。 |
(2)BOOL WINAPI CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci);用來通知線程池回調函數可能運行的時間會比較長。返回TRUE時,說明線程池還有其他線程可供使用。FALSE則相反。線程池會根據來決定是否創建新線程,以防止其他工作項出現挨餓現象。(只能在調用線程的回調函數里使用!)
(3)DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci)
用來告訴線程池,邏輯上自己已經完成了工作。這使得任何由於調用WaitForThreadpool*Callbacks(如WaitForThreadpoolIoCallbacks)而被阻塞的線程能早一些返回,而不必等到線程從回調函數中結束時才返回。