第11章 Windows線程池(2)_Win2008及以上的新線程池


11.2 Win2008以上的新線程池

(1)傳統線程池的優缺點:

  ①傳統Windows線程池調用簡單,使用方便(有時只需調用一個API即可)

  ②這種簡單也帶來負面問題,如接口過於簡單,無法更多去控制線程池的行為。

(2)Windows2008新線程池及API

線程池對象

傳統API

Win2008及以上平台新API

普通任務線程池

QueueUserWorkItem

CreateThreadpoolWork

TrySubmitThreadpoolWaitCallbacks

SubmitThreadpoolWork

CloseThreadpoolWork

計時器線程池

CreateTimerQueue(創建線程池)

CreateThreadpoolTimer

CreateTimerQueueTimer(創建計時器)

SetThreadpoolTimer

ChangeTimerQueueTimer

IsThreadpoolTimerSet

DeleteTimerQueueTimer

WaitForThreadpoolTimerCallbacks

DeteTimerQueueEx

CloseThreadpoolTimer

同步對象等待線程池

RegisterWaitForSingleObject

CreateThreadpoolWait

UnregisterWaitEx

SetThreadpoolWait

 

WaitForThreadpoolWaitCallbacks

 

CloseThreadpoolWait

完成端口線程池

BindIoCompletionCallback

CreateThreadpoolIo

StartThreadpoolIo

CancelThreadpoolIo

WaitForThreadpoolIoCallbacks

CloseThreadpoolIo

(3)新線程池輔助API

功能

輔助API

線程池清理器

CreateThreadpoolCleanupGroup

CloseThreadpoolCleanupGroup

ClosethreadpoolCleanupGroupMembers

線程池控制函數

CreateThreadpool

CloseThreadpool

SetThreadpoolThreadMaximum

SetThreadpoolMinimum

線程池環境設備

InitializeThreadpoolEnviroment

DestroyThreadpoolEnvironment

SetThreadpoolCallbackCleanupGroup

SetThreadCallbackLibrary

SetThreadpoolbackpool

SetThreadpoolCallbackRunsLong

顯式設定一個長時間調用的回調線程池函數

CallbackMayRunLong

清理及回調方法

DisassociateCurrentThreadFromCallback

FreeLibraryWhenCallbackReturns

LeaveCriticalSectionWhenCallbackReturns

ReleaseMutexWhenCallbackReturns

ReleaseSemaphoreWhenCallbackReturns

SetEventWhenCallbackReturns

(3)Win2008新線程池的一般編程模型

 

11.2.1 以異步方式調用函數

(1)單步使用線程池——TrySubmitThreadpoolCallback函數提交異步函數給線程池

參數

描述

PTP_SIMPLE_CALLBACK pfnCallback

回調函數,其原型為:

VOID NTAPI SimpleCallback

(PTP_CALLBACK_INSTANCE pInstance, //不透明的參數,調用回調函數時,由Windows自動傳入,可用於繼續傳給其他回調終止操作的函數使用,如LeaveCriticalSectionWhenCallbackReturns。。

PVOID pvContext);//其中的pInstanc見“回調函數終止時行為”那部分的內容

PVOID pvContext

回調函數的額外參數。

PTP_CALLBACK_ENVIRON pcbe

回調環境,用來對線程池進行定制的參數。(注意,這個結構體內部與一個線程池關聯,該參數為NULL時,會創建默認線程池,否則我們可以用CreateThreadpool來創建一個線程池,並與這個結構體關聯起來)

備注:當為pcbe為NULL時,該函數被調用時系統會在進程中創建一個默認的線程池,並讓線程池中的一個線程來調用指定的回調函數。該函數(內部調用PostQueuedCompletionStatus)將一個工作項添加到隊列中。

(2)兩步使用線程池

  ①CreateThreadpoolWork創建“工作項”。注意與之前所說的那些工作項不同。這里的工作項是個對象,不能簡單理解成是一個回調函數。而是關聯了回調函數及回調環境的一個對象了!

參數

描述

PTP_WORK_CALLBACK

pfnWorkHandler

工作項要關聯的回調函數,其原型為

VOID CALLBACK WorkCallback(

PTP_CALLBACK_INSTNACE,PVOID pvContext,PTP_WORK work);

PVOID pvContext

回調函數的額外參數

PTP_CALLBACK_ENVIRON pcbe

回調環境,見《TrySubmitThreadpoolCallback函數》的說明

  ②SubmitThreadpoolWork提交這個工作項給線程池。結束后還可以關閉該工作項。

    SubmitThreadpoolWork(PTP_WORK pWork);

  【注意】

  ★WaitForThreadpoolWorkCallbacks(pWork,bCancelPendingCallbacks)取消己提交的工作項或等待工作項處理完成

    bCancelPendingCallbacks為FALSE,會等待工作項處理完成,函數再返回。為TRUE時會試圖取消pWork這個工作項。

    如果用一個PTP_WORK提交了多個工作項,當bCancelPendingCallbacks為FALSE時則會等待所有的己提交的工作項,如果為TRUE只要等待當前正在運行的工作項完成時就返回。

  ★CloseThreadpoolWork關閉一個工作項

【Batch示例程序】批處理程序

 

//主程序

/*************************************************************************
Module:  Batch.cpp
Notices: Copyright(c) 2008 Jeffrey Ritcher & Christophe Nasarre
*************************************************************************/

#include "..\..\CommonFiles\CmnHdr.h"
#include "resource.h"

#include <tchar.h>
#include <strsafe.h>

//////////////////////////////////////////////////////////////////////////
//全局變量
HWND  g_hDlg = NULL;
PTP_WORK g_pWorkItem = NULL;//工作項對象
volatile LONG g_nCurrentTask = 0;

//自定義消息
#define WM_APP_COMPLETED  (WM_APP + 123)

//////////////////////////////////////////////////////////////////////////
void AddMessage(LPCTSTR szMsg){
    HWND hListBox = GetDlgItem(g_hDlg, IDC_LB_STATUS);
    ListBox_SetCurSel(hListBox, ListBox_AddString(hListBox, szMsg));
}

//////////////////////////////////////////////////////////////////////////
void WINAPI TaskHandler(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PTP_WORK pWork){

    LONG currentTask = InterlockedIncrement(&g_nCurrentTask);

    TCHAR szMsg[MAX_PATH];
    StringCchPrintf(szMsg, _countof(szMsg),
                    TEXT("線程[%u]:%u號任務開始. "), GetCurrentThreadId(), currentTask);
    AddMessage(szMsg);

    //模擬許多的工作
    Sleep(currentTask * 1000);

    StringCchPrintf(szMsg, _countof(szMsg),
                    TEXT("線程[%u]:%u號任務結束. "), GetCurrentThreadId(), currentTask);
    AddMessage(szMsg);

    if (InterlockedDecrement(&g_nCurrentTask)==0){
       //通知UI線程任務己經完成
        PostMessage(g_hDlg, WM_APP_COMPLETED, 0, (LPARAM)currentTask);
    }
}

//////////////////////////////////////////////////////////////////////////
void OnStartBatch(){
    //禁用“開始”按鈕
    Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), FALSE);

    AddMessage(TEXT("----開始新的批處理----"));

    //使用同一個工作項對象,提交6個任務
    for (int i = 0; i < 6;i++){
        SubmitThreadpoolWork(g_pWorkItem);
    }
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
    //SubmitThreadpoolWork(g_pWorkItem);
}

//////////////////////////////////////////////////////////////////////////
BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){
    g_hDlg = hwnd;
    return TRUE;
}


void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){
    switch (id)
    {
        case IDOK:
        case IDCANCEL:
            EndDialog(hwnd, id);
            break;

        case IDC_BTN_START_BATCH:
            OnStartBatch();
            break;
    }
}


//////////////////////////////////////////////////////////////////////////
INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){
    switch (uMsg){
        chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
        chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand);

    case WM_APP_COMPLETED:
        {
            TCHAR szMsg[MAX_PATH + 1];
            StringCchPrintf(szMsg, _countof(szMsg),
                            TEXT("----%u號任務是批處理的最后一個任務----"), lParam);
            AddMessage(szMsg);

            //啟用“開始”按鈕
            Button_Enable(GetDlgItem(g_hDlg, IDC_BTN_START_BATCH), TRUE);
        }

        break;
    }
    return FALSE;
}

//////////////////////////////////////////////////////////////////////////
int APIENTRY _tWinMain(HINSTANCE hInst, HINSTANCE, LPTSTR pCmdLine, int){

    //創建用於供所有任務使用的工作項對象(最后一個參數為NULL,使用進程默認的線程池!)
    g_pWorkItem = CreateThreadpoolWork(TaskHandler, NULL, NULL);
    if (NULL == g_pWorkItem){
        MessageBox(NULL, TEXT("無法創建任務所需的工作項對象"),
                   TEXT(""), MB_ICONSTOP);
        return -1;
    }


    //ttoi,將字符轉為整型
    DialogBoxParam(hInst, MAKEINTRESOURCE(IDD_MAIN), NULL, Dlg_Proc,_ttoi(pCmdLine));

    //關閉工作項對象
    CloseThreadpoolWork(g_pWorkItem);
    return 0;
}

//resource.h

//{{NO_DEPENDENCIES}}
// Microsoft Visual C++ 生成的包含文件。
// 供 11_Batch.rc 使用
//
#define IDD_MAIN                        101
#define IDC_LB_STATUS                   1001
#define IDC_BTN_START_BATCH             1002

// Next default values for new objects
// 
#ifdef APSTUDIO_INVOKED
#ifndef APSTUDIO_READONLY_SYMBOLS
#define _APS_NEXT_RESOURCE_VALUE        102
#define _APS_NEXT_COMMAND_VALUE         40001
#define _APS_NEXT_CONTROL_VALUE         1003
#define _APS_NEXT_SYMED_VALUE           101
#endif
#endif

//Batch.rc

// Microsoft Visual C++ generated resource script.
//
#include "resource.h"

#define APSTUDIO_READONLY_SYMBOLS
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 2 resource.
//
#include "winres.h"

/////////////////////////////////////////////////////////////////////////////
#undef APSTUDIO_READONLY_SYMBOLS

/////////////////////////////////////////////////////////////////////////////
// 中文(簡體,中國) resources

#if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS)
LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED

#ifdef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// TEXTINCLUDE
//

1 TEXTINCLUDE 
BEGIN
    "resource.h\0"
END

2 TEXTINCLUDE 
BEGIN
    "#include ""winres.h""\r\n"
    "\0"
END

3 TEXTINCLUDE 
BEGIN
    "\r\n"
    "\0"
END

#endif    // APSTUDIO_INVOKED


/////////////////////////////////////////////////////////////////////////////
//
// Dialog
//

IDD_MAIN DIALOGEX 0, 0, 191, 190
STYLE DS_SETFONT | DS_MODALFRAME | DS_CENTER | WS_POPUP | WS_CAPTION | WS_SYSMENU
CAPTION "利用線程池進行批處理"
FONT 9, "宋體", 400, 0, 0x86
BEGIN
    DEFPUSHBUTTON   "退出",IDOK,136,169,50,14
    LISTBOX         IDC_LB_STATUS,7,25,176,142,LBS_NOINTEGRALHEIGHT | NOT WS_BORDER | WS_VSCROLL | WS_HSCROLL | WS_TABSTOP,WS_EX_STATICEDGE
    PUSHBUTTON      "開始批處理",IDC_BTN_START_BATCH,7,7,50,14
END


/////////////////////////////////////////////////////////////////////////////
//
// DESIGNINFO
//

#ifdef APSTUDIO_INVOKED
GUIDELINES DESIGNINFO
BEGIN
    IDD_MAIN, DIALOG
    BEGIN
        LEFTMARGIN, 7
        RIGHTMARGIN, 186
        TOPMARGIN, 7
        BOTTOMMARGIN, 183
    END
END
#endif    // APSTUDIO_INVOKED

#endif    // 中文(簡體,中國) resources
/////////////////////////////////////////////////////////////////////////////



#ifndef APSTUDIO_INVOKED
/////////////////////////////////////////////////////////////////////////////
//
// Generated from the TEXTINCLUDE 3 resource.
//


/////////////////////////////////////////////////////////////////////////////
#endif    // not APSTUDIO_INVOKED

11.2.2 每隔一段時間調用一個函數

(1)CreateThreadpoolTimer函數——在線程池中創建一個定時器對象

參數

描述

PTP_TIMER_CALLBACK

     PfnTimerCallback

回調函數指針,其原型為

VOID CALLBACK TimeoutCallback(

PTP_CALLBACK_INSTANCE pInstance,

PVOID pvContext,PTP_TIMER pTimer);

PVOID pvContext

傳給回調函數的額外參數

PTP_CALLBACK_ENVIRON pcbe

回調環境

返回值

計時器工作項,TP_TIMER對象的指針

(2)SetThreadpoolTimer:向線程池注冊計時器

參數

描述

PTP_TIMER pTimer

要設定的計時器的指針

PFILETIME pftDueTime

第1次調用回調函數的時間。

①NULL:表示停止調用回調函數。(暫停但不銷毀計時器)

②小於0:表示相對時間(單位微秒),即相對於調用SetThreadpoolTimer的時間。

③-1:表示立即開始。

④正數:以100ns為單位,從1600年1月1日 開始計算的周期數。

DWORD msPeriod

觸發周期(單位微秒)。0表示只觸發一次。注意如果回調函數執行的時間太長,而回調函數觸發的周期又很短,此時系統會啟動多個線程來執行這些回調函數。

DWORD msWindowLength

用來給回調函數的執行增加一些隨機性。單位微秒

①設當前設定的觸發時刻為T,則下次觸發的時間是[T+msPeriod,T+msPeriod+msWindowLength]之間的任何一個時間值。

②msWindowLength的另一個作用是將計時器分組。如計時器A會在5-7微秒內被觸發,計時器B會在6-8微秒內。因時間有重疊,所以線程池只會喚醒一個線程來處理這兩個計時器,在處理完A的回調函數后,該線程不進入睡眠,會直接再調用B的回調函數,以減少用兩個線程調用時產生的額外的線程上下文切換的開銷。

(3)IsThreadpoolTimerSet——確定某個計時器是否己經被設置,即pftDueTime!=NULL

(4)WaitForThreadpoolTimerCallbacks等待一個計時器完成。

(5)CloseThreadpoolTimer釋放計時器的內存。

【TimedMessageBox示例程序】

/*-----------------------------------------------------------------------------------------
Module: TimedMsgBox.cpp
Notices:Copyright(c) 2008 Jeffrey Richter & Christophe Nasarre
-----------------------------------------------------------------------------------------*/

#include "../../CommonFiles/CmnHdr.h"
#include <tchar.h>
#include <strsafe.h>

//////////////////////////////////////////////////////////////////////////
TCHAR g_szCaption[100];  //消息框的標題

int g_nSecLeft = 10; //消息框中顯示的剩余時間

#define ID_MSGBOX_STATIC_TEXT   0x0000FFFF //對話框中文本框的ID,這是系統默認的ID
//////////////////////////////////////////////////////////////////////////
VOID CALLBACK MsgBoxTimeoutCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PTP_TIMER pTimer){
    //注意,因競爭條件,當程序運行到這里時,可能對話框還沒有創建!
    HWND hwnd = FindWindow(NULL, g_szCaption);

    if (hwnd !=NULL){
        if (g_nSecLeft ==1){
            //時間結束,強迫對話框退出
            EndDialog(hwnd, IDOK);
            return;
        }

        //如果對話框存在,則更新剩作時間
        TCHAR szMsg[100];
        StringCchPrintf(szMsg, _countof(szMsg), TEXT("還剩%d秒,按確定將取消"), --g_nSecLeft);
        SetDlgItemText(hwnd, ID_MSGBOX_STATIC_TEXT, szMsg);
    } 
}

//////////////////////////////////////////////////////////////////////////
int WINAPI _tWinMain(HINSTANCE hInstance,HINSTANCE hPrevInstance, PTSTR lpCmdLine,int nShowCmd){
    StringCchPrintf(g_szCaption, _countof(g_szCaption), TEXT("Timed Message Box"));
    
    //創建線程池計時器對象
    PTP_TIMER lpTimer = CreateThreadpoolTimer(MsgBoxTimeoutCallback, NULL, NULL);

    if (NULL ==lpTimer){
        TCHAR szMsg[MAX_PATH];
        StringCchPrintf(szMsg, _countof(szMsg), TEXT("無法創建計時器對象:%u"), GetLastError());
        MessageBox(NULL, szMsg, TEXT("錯誤"), MB_OK | MB_ICONERROR);
        return (-1);
    }

    //設置定時器1秒后觸發,以后每秒觸發一次
    ULARGE_INTEGER ulRelativeStartTime;
    ulRelativeStartTime.QuadPart = (LONGLONG)(-1000000); //單位微秒。轉換后為1秒。
    FILETIME ftRelativeStartTime;
    ftRelativeStartTime.dwHighDateTime = ulRelativeStartTime.HighPart;
    ftRelativeStartTime.dwLowDateTime = ulRelativeStartTime.LowPart;
    SetThreadpoolTimer(
        lpTimer,
        &ftRelativeStartTime,
        1000, //每隔一秒觸發
        0
        );

    //顯示消息框
    MessageBox(NULL, TEXT("還剩10秒,按確定將取消"), g_szCaption, MB_OK);

    //清除計時器對象
    CloseThreadpoolTimer(lpTimer);

    //判斷是用戶取消計時或者是超時
    MessageBox(NULL, (g_nSecLeft == 1) ? TEXT("超時") : TEXT("用戶取消"), TEXT("結果"), MB_OK);
    return 0;

}

 【NewWorkPool程序】簡單的工作項和定時器回調演示

#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>

//////////////////////////////////////////////////////////////////////////
//簡單的工作項函數
VOID CALLBACK MyWorkCallback(PTP_CALLBACK_INSTANCE pInstance, 
                             PVOID pvContext,
                             PTP_WORK pWork){
    BOOL bRet = FALSE;
    DWORD dwPriorityOriginal = 0;

    dwPriorityOriginal = GetThreadPriority(GetCurrentThread());

    if (THREAD_PRIORITY_ERROR_RETURN == dwPriorityOriginal){
        _tprintf(_T("GetThreadPriority失敗。錯誤碼:%u\n"), GetLastError());
        return;
    }


    bRet = SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL);

    if (FALSE == bRet){
        _tprintf(_T("SetThreadPriority失敗。錯誤碼:%u\n"), GetLastError());
        return;
    }

    _tprintf(_T("[ID:0x%X] MyWorkCallback Runing...\n"), GetCurrentThreadId());
    
    bRet = SetThreadPriority(GetCurrentThread(), dwPriorityOriginal);

    if (FALSE == bRet){
        _tprintf(_T("SetThreadPriority失敗。錯誤碼:%u\n"), GetLastError());
        return;
    }
    return;
}


//簡單的定時回調函數
VOID CALLBACK MyTimerCallback(PTP_CALLBACK_INSTANCE pInstance,
                             PVOID pvContext,
                             PTP_TIMER pTimer){
    _tprintf(_T("[ID:0x%X] MyTimerCallback Runing...\n"), GetCurrentThreadId());
}
//////////////////////////////////////////////////////////////////////////
int _tmain(){
    _tsetlocale(LC_ALL, _T("chs"));

    BOOL bRet = FALSE;
    PTP_WORK pWork = NULL;
    PTP_TIMER pTimer = NULL;
    PTP_POOL  pPool = NULL;
    PTP_WORK_CALLBACK  workcallback = MyWorkCallback;
    PTP_TIMER_CALLBACK timercallback = MyTimerCallback;
    TP_CALLBACK_ENVIRON CallbackEnviron;
    PTP_CLEANUP_GROUP cleanupgroup = NULL;
    FILETIME ftDueTime;
    ULARGE_INTEGER ulDueTime;

    UINT rollback = 0;

    __try{
        //初始化環境塊
        InitializeThreadpoolEnvironment(&CallbackEnviron);

        //創建線程池
        pPool = CreateThreadpool(NULL);

        if (NULL == pPool){
            _tprintf(_T("創建線程池失敗!錯誤碼:%u\n"), GetLastError());
            __leave;
        }

        rollback = 1;//創建線程池成功!

        //設置線程數
        SetThreadpoolThreadMaximum(pPool, 8);
        bRet = SetThreadpoolThreadMinimum(pPool, 2);

        if (!bRet){
            _tprintf(_T("SetThreadpoolThreadMinimum失敗,錯誤碼:%u\n"), GetLastError());
            __leave;
        }

        //創建資源清理器
        cleanupgroup = CreateThreadpoolCleanupGroup();
        if (NULL == cleanupgroup){
            _tprintf(_T("CreateThreadpoolCleanupGroup失敗!錯誤碼:%u\n"), GetLastError());
            __leave;    
        }

        rollback = 2; //資源清理器創建成功

        //將環境塊與線程池關聯
        SetThreadpoolCallbackPool(&CallbackEnviron, pPool);

        //將清理器與環境塊聯聯
        SetThreadpoolCallbackCleanupGroup(&CallbackEnviron, cleanupgroup,NULL);

        //創建線程池需要的回調函數,這里是一個普通的工作項
        pWork = CreateThreadpoolWork(workcallback, NULL, &CallbackEnviron);

        if (NULL == pWork){
            _tprintf(_T("創建線程池普通工作項失敗!錯誤碼:%u\n"), GetLastError());
            __leave;
        }

        rollback = 3; //創建普通工作項成功
        SubmitThreadpoolWork(pWork); //提交工作項

        //創建一個定時回調項
        pTimer = CreateThreadpoolTimer(timercallback, NULL, &CallbackEnviron);
        if (NULL == pTimer){
            _tprintf(_T("創建線程池計時器對象失敗!錯誤碼:%u\n"), GetLastError());
            __leave;
        }

        rollback = 4; //計時器對象創建成功

        //設定定時回調周期
        ulDueTime.QuadPart = (LONGLONG)-(1 * 10 * 1000 * 1000); //1秒以后觸發
        ftDueTime.dwHighDateTime = ulDueTime.HighPart;
        ftDueTime.dwLowDateTime = ulDueTime.LowPart;

        SetThreadpoolTimer(pTimer, &ftDueTime, 0, 0); //只調用一次

        //主線程進入等待狀態或干別的工作
        Sleep(1500);

        //當所有的線程池回調函數都被執行后,關閉清理器
        CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL);

        //事務標志,回滾到第2步,執行第2步后的銷毀工作
        rollback = 2;
        __leave;

    }
    __finally{
        switch (rollback)
        {
        case 4:
        case 3:
            //關閉清理器
            CloseThreadpoolCleanupGroupMembers(cleanupgroup, FALSE, NULL);
            break;

        case 2:
            //關閉清理器
            CloseThreadpoolCleanupGroup(cleanupgroup);
            break;

        case 1:
            //關閉線程池
            CloseThreadpool(pPool);
            break;

        default:
                break;
        }
    }

    _tsystem(_T("PAUSE"));

    return 0;
}

 11.2.3 在內核對象觸發時調用一個函數

(1)CreateThreadpoolWait——創建一個線程池等待對象(也是一個工作項,等待項)

參數

描述

PTP_WAIT_CALLBACK

     pfnWaitCallback

回調函數指針,其原型為

VOID CALLBACK WaitCallback(

PTP_CALLBACK_INSTANCE pInstance,

PVOID pvContext,PTP_WAIT pWait,

TP_WAIT_RESULT WaitResult);

PVOID pvContext

傳給回調函數的額外參數

PTP_CALLBACK_ENVIRON pcbe

回調環境

返回值

等待對象的指針

備注:回調函數的WaitResult表示回調函數被調用的原因:

WAIT_OBJECT_0:表示傳給SetThreadpoolWait的內核對象在超時之前被觸發。

WAIT_TIMEOUT:表示內核對象在超時之前沒被觸發,回調函數被執行是因為超時

WAIT_ABANDONED_0:表示內核對象一個互斥量,並且互斥量被“遺棄”,觸發了回調函數。

(2)SetThreadpoolWait——將某個內核對象綁定到線程池

參數

描述

PTP_WAIT pWaitItem

傳入由CreateThreadWait返回的對象的指針

HANDLE hObject

要綁定的內核對象,當該對象被觸發時,會調用線程池中的WaitCallback函數。

PFILETIME pftTimeout

線程池願意花的最長多少時間來等待內核對象觸發。

0:立即返回,負值為相對時間,正值為絕對時間,NULL表示無限等待。(線程池內部調用了WaitForMultipleObjects)

備注:①線程池內部讓一個線程調用WaitForMultipleObjects並傳入由SetThreadpoolWait函數注冊的句柄,不斷地組成一個句柄組,同時將Wait*函數的bWaitAll設為FALSE,這樣當任何一個句柄被觸發,線程池就會被喚醒。

②因WaitForMultipleObjects不允許將同一個句柄傳入多次,因此必須確保不會用SetThreadpoolWait來多次注冊同一個句柄,但可以調用DuplicationHandle復制句柄並傳給Set*函數。

③因WaitForMultipleObjects一次最多只能等待64個內核對象,因此線程池實際上為每64個內核對象分配一個線程來等待,所以效率比較高。因此,如果要等待超過64個以上的內核對象,可以考慮用這種線程池,因為系統會每64個內核對象,就開辟一個線程來等待這些內核對象

④一旦線程池中一個線程調用了我們的回調函數,對應的等待項將進入“不活躍”狀態。這意味着如果在同一個內核對象被觸發時再次調用這個回調函數時,需要調用SetThreadpoolWait再次注冊。如果傳入的hObject為NULL,將把pWaitItem這個等待項從線程中移除。

(3)WaitForThreadpoolWaitCallbacks:等待一個等待項完成

(4)ClosethreadpoolWait函數:釋放一個等待項的內存

 【NewWaitCallback程序】演示觸發內核對象時,調用一個函數

#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>

//////////////////////////////////////////////////////////////////////////
VOID CALLBACK MyWaitCallback(PTP_CALLBACK_INSTANCE pInstance,
                             PVOID pvContext,
                             PTP_WAIT pWait,
                             TP_WAIT_RESULT WaitResult){
    _tprintf(_T("線程[ID:0x%X] MyWaitCallback Runing...\n"), GetCurrentThreadId());
}
//////////////////////////////////////////////////////////////////////////
int _tmain(){

    _tsetlocale(LC_ALL, _T("chs"));

    PTP_WAIT pWait = NULL;
    PTP_WAIT_CALLBACK pfnWaitCallback = MyWaitCallback;
    HANDLE hEvent = NULL;
    UINT rollback = 0;

    //創建一個事件對象
    hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); //自動重置
    if (NULL == hEvent)
        return 0;

    rollback = 1; //創建事件對象成功

    __try{
        //創建等待線程池
        pWait = CreateThreadpoolWait(pfnWaitCallback, NULL, NULL);//利用系統默認線程池
        if (NULL == pWait){
            _tprintf(_T("CreateThreadpoolWait失敗。錯誤碼:%u\n"), GetLastError());
            __leave;
        }
    
        rollback = 2;

        //模擬等待5次,注意每次等待前要調用SetThreadpoolWait方法
        for (int i = 0; i < 5;i++){
            SetThreadpoolWait(pWait, hEvent, NULL); //這句很重要
            SetEvent(hEvent);
            Sleep(500);

            //主線程等待回調線程池調用完畢
            WaitForThreadpoolWaitCallbacks(pWait, TRUE);
        }
    }__finally{
        switch (rollback)
        {
        case 2:
            SetThreadpoolWait(pWait, NULL, NULL);//取消等待項
            CloseThreadpoolWait(pWait);
            break;
        case 1:
            CloseHandle(hEvent);
            break;
        default:
            break;
        }
    }

    _tsystem(_T("PAUSE"));
    return 0;
}

11.2.4 在異步I/O請求完成時調用一個函數

 

(1)CreateThreadpoolIo:創建線程池Io對象

參數

描述

HANDLE  hDevice

要關聯的設備句柄

PTP_WIN32_IO_CALLBACK

                            pfnIoCallback

回調函數指針,其原型為

VOID CALLBACK OverlappedCompletionRoutine(

PTP_CALLBACK_INSTANCE pInstance,

PVOID pvContext,PVOID pOverlapped,

ULONG IoResult,//操作結果,成功時為NO_ERROR

ULONG_PTR NumberOfBytesTransferred,//已傳輸字節數

PTP_IO //指向線程池中的I/O項的指針,

//即CreateThreadpoolIo的返回值

);

PVOID pvContext

傳給回調函數的額外參數

PTP_CALLBACK_ENVIRON pcbe

回調環境

返回值

I/O對象(一個工作項)的指針

(2)將IO對象(工作項)與線程池內部的I/O完成端口關聯

VOID StartThreadpoolIo(PTP_IO pio);

 【注意】每次調用ReadFile和Writefile之前都必調用StartThreadpoolIo,否則回調函數不會被調用(這步相當於給完成端口增加IO完成項通知)

(3)停止線程池調用回調函數:VOID CancelThreadpoolIo(PTP_IO pio);

  【注意】①當發出IO請求之后,可以用這來取消。

      ②在調用ReadFile或WriteFile失敗時,仍然必須調用CancelThreadpoolIo(除了返回FALSE且GetLastError為ERROR_IO_PENDING,因這表示正在完成)

(4)等待一處待處理的IO請求完成。

  WaitForThreadIoCallbacks(pio,bCancelPendingCallbacks);

  【注意】

    ①該函數須在另一個線程使用,而不能在回調函數內部使用,因為這會造成死鎖。

    ②如果bCancelPendingCallbacks為TRUE,那么當請求完成的時候,回調函數不會被調用(如果尚未被調用)。這和調用CancelThreadpoolIo函數的時候很相似。

(5)解除IO對象(工作項)與線程池的關聯:VOID CloseThreadpoolIo(PTP_IO pio);

【NewIOCPPool程序】模擬寫入日志文件

  效果圖與【IOCPPool程序】一致。

#include <windows.h>
#include <tchar.h>
#include <strsafe.h>
#include <locale.h>

//////////////////////////////////////////////////////////////////////////
#define  QMLX_ALLOC(sz)    HeapAlloc(GetProcessHeap(),0,sz)  //QMLX:淺墨濃香
#define  QMLX_CALLOC(sz)   HeapAlloc(GetProcessHeap(),HEAP_ZERO_MEMORY,sz)
#define  QMLX_SAFEFREE(p)  if(NULL != p){HeapFree(GetProcessHeap(),0,p);p=NULL;}

#define  QMLX_ASSERT(s)    if(!(s)){DebugBreak();}
#define  QMLX_BEGINTHREAD(Fun,Param)  CreateThread(NULL,0,\
                                     (LPTHREAD_START_ROUTINE)Fun,Param,0,NULL);

//////////////////////////////////////////////////////////////////////////
#define MAXWRITEPERTHREAD 20  //每個線程最大寫入次數
#define MAXWRITETHREAD    10   //寫入線程的數量

#define OP_READ    0x01  //讀操作
#define OP_WRITE   0x02  //寫操作

//單IO數據
typedef struct _tagPerIoData{
    OVERLAPPED  m_ol; 
    HANDLE      m_hFile;   //操作的文件句柄
    DWORD       m_dwOp;    //操作類型,OP_READ或OP_WRITE
    LPVOID      m_pData;   //操作的數據
    UINT        m_nLen;    //操作的數據長度
    DWORD       m_dwWrite; //寫入的字節數 
    DWORD       m_dwTimestamp; //起始操作的時間戳
}PER_IO_DATA,*PPER_IO_DATA;

//IOCP線程池回調函數,實際就是完成通知的響應函數
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance,PVOID pvContext,PVOID pOverlapped,
                           ULONG IoResult,ULONG_PTR NumberOfBytesTransferred,PTP_IO pio);

//寫文件的線程
DWORD WINAPI WriteThread(LPVOID lpParam);

//當前操作的文件對象的指針
LARGE_INTEGER  g_liFilePointer = { 0 };

//IOCP線程池
PTP_IO  g_pThreadpoolIo = NULL;

//////////////////////////////////////////////////////////////////////////
//獲取可模塊的路徑名(路徑后含‘\’)
VOID  GetAppPath(LPTSTR pszBuffer){
    DWORD dwLen = 0;
    if (0 == (dwLen = GetModuleFileName(NULL, pszBuffer, MAX_PATH)))
        return;

    for (DWORD i = dwLen; i > 0;i--){
        if ('\\' == pszBuffer[i]){
            pszBuffer[i + 1] = '\0';
            break;
        }
    }
}

int _tmain(){
    _tsetlocale(LC_ALL, _T("chs"));
    TCHAR pFileName[MAX_PATH] = {};
    GetAppPath(pFileName);
    StringCchCat(pFileName, MAX_PATH, _T("NewIOCPFile.txt"));

    HANDLE ahWThread[MAXWRITETHREAD] = {};
    DWORD dwWrited = 0;

    //創建文件
    HANDLE hTxtFile = CreateFile(pFileName, GENERIC_WRITE, 0, NULL,
                                 CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, NULL);
    if (INVALID_HANDLE_VALUE == hTxtFile){
        _tprintf(_T("CreateFile(%s)失敗,錯誤碼:%u\n"), GetLastError());
        _tsystem(_T("PAUSE"));
        return 0;    
    }

    //初始化線程池回調環境
    TP_CALLBACK_ENVIRON poolEnv = {};
    InitializeThreadpoolEnvironment(&poolEnv);

    //創建IOCP線程池
    g_pThreadpoolIo = CreateThreadpoolIo(hTxtFile, (PTP_WIN32_IO_CALLBACK)IOCPCallback,hTxtFile,&poolEnv);
    
    //啟動IOCP線程池
    StartThreadpoolIo(g_pThreadpoolIo);

    //寫入UNICODE文件的前綴碼,以便正確打開
    PER_IO_DATA* pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
    QMLX_ASSERT(pIo != NULL);

    pIo->m_dwOp = OP_WRITE;
    pIo->m_hFile = hTxtFile;
    pIo->m_pData = QMLX_CALLOC(sizeof(WORD));
    QMLX_ASSERT(pIo->m_pData != NULL);
    *((WORD*)pIo->m_pData) = MAKEWORD(0xFF, 0xFE);
    pIo->m_nLen = sizeof(WORD);

    //偏移文件指針
    pIo->m_ol.Offset = g_liFilePointer.LowPart;
    pIo->m_ol.OffsetHigh = g_liFilePointer.HighPart;
    g_liFilePointer.QuadPart += pIo->m_nLen;
    pIo->m_dwTimestamp = GetTickCount(); //記錄時間戳

    WriteFile(hTxtFile, pIo->m_pData, pIo->m_nLen, 
              &pIo->m_dwWrite,(LPOVERLAPPED)&pIo->m_ol);

    //等待IOCP線程池完成操作
    WaitForThreadpoolIoCallbacks(g_pThreadpoolIo, FALSE);

    //啟動寫入線程進行日志寫入操作
    for (int i = 0; i < MAXWRITETHREAD;i++){
        ahWThread[i] = QMLX_BEGINTHREAD(WriteThread, hTxtFile);
    }

    //讓主線程等待這些寫入線程結束
    WaitForMultipleObjects(MAXWRITETHREAD, ahWThread, TRUE, INFINITE);

    for (int i = 0; i < MAXWRITETHREAD;i++){
        CloseHandle(ahWThread[i]);
    }

    //關閉IOCP線程池
    CloseThreadpoolIo(g_pThreadpoolIo);

    //關閉日志文件
    if (INVALID_HANDLE_VALUE != hTxtFile){
        CloseHandle(hTxtFile);
        hTxtFile = INVALID_HANDLE_VALUE;
    }

    _tsystem(_T("PAUSE"));

    return 0;
}

//IOCP線程池回調函數,實際就是完成通知的響應函數
VOID CALLBACK IOCPCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID pvContext, PVOID pOverlapped,
                           ULONG IoResult, ULONG_PTR NumberOfBytesTransferred, PTP_IO pio)
{
    if (NO_ERROR != IoResult){
        _tprintf(_T("I/O操作出錯,錯誤碼:%u\n"),IoResult);
        return;
    }

    PPER_IO_DATA pIo = CONTAINING_RECORD((LPOVERLAPPED)pOverlapped, PER_IO_DATA, m_ol);
    DWORD dwCurTimestamp = GetTickCount();

    switch (pIo->m_dwOp)
    {
    case OP_WRITE://寫操作結束
        {//寫入操作結束
            _tprintf(_T("線程[0x%x]得到IO完成通知,完成操作(%s),緩沖(0x%08x)長度(%ubytes),寫入時間戳(%u)當前時間戳(%u)時差(%u)\n"),
                     GetCurrentThreadId(), OP_WRITE == pIo->m_dwOp ? _T("Write") : _T("Read"),
                     pIo->m_pData, pIo->m_nLen, pIo->m_dwTimestamp, dwCurTimestamp, dwCurTimestamp - pIo->m_dwTimestamp);

            QMLX_SAFEFREE(pIo->m_pData);
            QMLX_SAFEFREE(pIo);
        }
        break;

    case OP_READ: //讀操作結束
        break;

    default:
        break;
    }
}

//寫文件的線程
#define MAX_LOGLEN 256
DWORD WINAPI WriteThread(LPVOID lpParam)
{
    TCHAR pTxtContext[MAX_LOGLEN] = {};
    PPER_IO_DATA pIo = NULL;
    size_t szLen = 0;
    LPTSTR pWriteText = NULL;

    StringCchPrintf(pTxtContext, MAX_LOGLEN, _T("這是一條模擬的日志記錄,由線程[0x%x]寫入\r\n"), 
                    GetCurrentThreadId());
    StringCchLength(pTxtContext, MAX_LOGLEN, &szLen);

    szLen += 1;
    
    int i = 0;
    for (; i < MAXWRITEPERTHREAD;i++){
        pWriteText = (LPTSTR)QMLX_CALLOC(szLen*sizeof(TCHAR));
        QMLX_ASSERT(NULL != pWriteText);
        StringCchCopy(pWriteText, szLen, pTxtContext);

        //為每個操作申請一個“單IO數據”結構體
        pIo = (PPER_IO_DATA)QMLX_CALLOC(sizeof(PER_IO_DATA));
        QMLX_ASSERT(pIo != NULL);

        pIo->m_dwOp = OP_WRITE;
        pIo->m_hFile = (HANDLE)lpParam;
        pIo->m_pData = pWriteText;
        pIo->m_nLen = (szLen-1)*sizeof(TCHAR);

        //這里使用原子操作同步文件指針,寫入不會相互覆蓋
        //這個地方體現了lock-free算法的精髓,使用了基本的CAS操作控制文件指針
        //比傳統的使用關鍵代碼段並等待的方法,這里用的方法要輕巧的多,付出的代價也小
        *((LONGLONG*)&pIo->m_ol.Pointer) = InterlockedCompareExchange64(&g_liFilePointer.QuadPart,
                                                                        g_liFilePointer.QuadPart + pIo->m_nLen, g_liFilePointer.QuadPart);
        pIo->m_dwTimestamp = GetTickCount(); //記錄時間戳
        
        StartThreadpoolIo(g_pThreadpoolIo);

        //寫入
        WriteFile((HANDLE)lpParam, pIo->m_pData, pIo->m_nLen,
                  &pIo->m_dwWrite, (LPOVERLAPPED)&pIo->m_ol);
        if (ERROR_IO_PENDING != GetLastError()){
            CancelThreadpoolIo(g_pThreadpoolIo);
        }
    }

    return i;
}

 

11.2.5 回調函數的終止操作

(1)回調函數的pInstance參數:當線程調用回調函數時,Windows會自動傳一個pInstance參數(類型PTP_CALLBACK_INSTANCE)給回調函數,然后回調函數將這個參數又傳給如下的函數,以便在這些函數在回調函數完后,執行一些相應的終止操作(主要是用來通知另一個線程,線程池中的工作項己經完成。顯然,如下函數是在回調函數的內部進行調用的!

函數

終止操作

LeaveCriticalSectionWhenCallbackReturns

當回調函數返回時,線程池會自動調用LeavCriticalSection,並在參數中傳入指定的CRITCAL_SECTION結構體。

ReleaseMutexWhenCallbackReturns

當回調函數返回時,線程池會自動調用ReleaseMutex,並在參數中傳入指定的HANDLE。

ReleaseSemaphoreWhenCallbackReturns

當回調函數返回的時候,線程池會自動調用ReleaseSemaphore,並在參數中傳入指定的HANDLE

SetEventWhenCallbackReturns

當回調函數返回的時候,線程池會自動調用SetEvent,並在參數中傳入指定的HANDLE。

FreeLibraryWhenCallbackReturns

當回調函數返回的時候,線程池會自動調用FreeLibrary,並在參數中傳入指定的HMOUDLE。

(注意:如果回調函數是從DLL中載入的,這個函數尤為重要,因為當線程執行完畢后,回調函數不能自己調用FreeLibrary,否則回調函數代碼將從進程中清除,這樣當FreeLibrary試圖返回到回調函數時,會引發訪問違規)

注意,對於任何一個回調函數,只能執行上述的一種終止操作。如果調用了以上的多個函數,則最后調用的終止函數會覆蓋之前調用的那個。

(2)BOOL WINAPI CallbackMayRunLong(PTP_CALLBACK_INSTANCE pci);用來通知線程池回調函數可能運行的時間會比較長。返回TRUE時,說明線程池還有其他線程可供使用。FALSE則相反。線程池會根據來決定是否創建新線程,以防止其他工作項出現挨餓現象。(只能在調用線程的回調函數里使用!

(3)DisassociateCurrentThreadFromCallback(PTP_CALLBACK_INSTANCE pci)

用來告訴線程池,邏輯上自己已經完成了工作。這使得任何由於調用WaitForThreadpool*Callbacks(如WaitForThreadpoolIoCallbacks)而被阻塞的線程能早一些返回,而不必等到線程從回調函數中結束時才返回。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM