簡單線程池的設計


  在網上觀摩了一些大佬關於線程池的實現后,我決定也親手寫一下簡單線程池,首先先解釋一下什么是線程池,簡單的來說,就是預先創建一些線程,使它們處於睡眠狀態,當任務來臨時,喚醒線程讓它們去執行。使用線程池的好處有很多,比如,1.線程的創建和銷毀的開銷,無論從時間還是空間上來說是巨大的,而通過線程池的重用大大減少了這些不必要的開銷,當然既然少了這么多消費內存的開銷,其線程執行速度也是得到提升,2.還有有效的控制線程的並發數,控制線程的並發數可以有效的避免大量的線程爭奪CPU資源而造成堵塞。

  關於設計這個線程池,從概念出發,預先創建一些線程(創建線程,其后必須也伴隨着銷毀線程),使它們處於睡眠狀態(掛起或者阻塞態),當任務來臨時(設計一個隊列專門裝任務),喚醒線程並執行(線程函數所完成),同時再設計一個具體描述任務的父類並設置成純虛函數,用戶在使用此線程池的時候,只需要重寫父類就可以了,所以大致需要實現的功能如下圖所示:

 

接下來按每個功能進行講述:

1.創建線程

 1     //先檢查參數正確性
 2     if(ThreadNUM_MIN < 0 || ThreadNUM_MAX < ThreadNUM_MIN)
 3         return false;
 4     //創建信號量(在創建線程前創建信號量,防止線程空轉)
 5     m_hSemphore = CreateSemaphore(NULL,0,ThreadNUM_MAX,0);
 6     //創建指定個數線程
 7     for(int i = 0;i < ThreadNUM_MIN;i++)
 8     {
 9         HANDLE handle = (HANDLE)_beginthreadex(NULL,0,&ThreadFunction,this,0,0);
10         if(handle)
11         {
12             m_lHandle.push_back(handle);
13         }
14     }
15     return true;

首先值得說的有兩點,第一點,在這里創建線程我使用的是_beginthreadex(安全屬性,線程棧大小,線程函數地址,線程函數參數,線程初始態,線程標識符),而不是用CreateThread(),這是因為如果在代碼中有使用標准C運行庫中的函數時,盡量使用_beginthreadex()來代替CreateThread()(這個函數解決的應該是一個歷史遺留問題,標准C運行庫在1970年被實現了,由於當時沒任何一個操作系統提供對多線程的支持。因此編寫標准C運行庫的程序員根本沒考慮多線程程序使用標准C運行庫的情況)比如標准C運行庫的全局變量errno。很多運行庫中的函數在出錯時會將錯誤代號賦值給這個全局變量,這樣可以方便調試。但如果有這樣的一個代碼片段:

1 if (system("notepad.exe readme.txt") == -1)
2 {
3     switch(errno)
4     {
5         ...//錯誤處理代碼
6     }

假設某個線程A在執行上面的代碼,該線程在調用system()之后且尚未調用switch()語句時另外一個線程B啟動了,這個線程B也調用了標准C運行庫的函數,不幸的是這個函數執行出錯了並將錯誤代號寫入全局變量errno中。這樣線程A一旦開始執行switch()語句時,它將訪問一個被B線程改動了的errno。這種情況必須要加以避免!因為不單單是這一個變量會出問題,其它像strerror()、strtok()、tmpnam()、gmtime()、asctime()等函數也會遇到這種由多個線程訪問修改導致的數據覆蓋問題,通過查看源碼可知,_beginthreadex()是先創建了一個內存塊,再調用CreateThread(),這個內存塊中用來存放一些需要線程獨享的數據。事實上新線程運行時會首先將內存塊與自己進一步關聯起來。然后新線程調用標准C運行庫函數如strtok()時就會先取得內存塊的地址再將需要保護的數據存入內存塊中。這樣每個線程就只會訪問和修改自己的數據而不會去篡改其它線程的數據了。

   第二點,在設計線程睡眠狀態時,有很多種方法 掛起(恢復指定線程),阻塞中有關鍵段/臨界區(無安全屬性,不適用),事件(無法喚醒指定線程,不適用),互斥量(同事件),所以我選擇的是信號量用來阻塞線程和恢復線程,因為就如同停車場一樣,我只是開放了車位,至於哪輛車(線程)停進來由系統隨機分配。

 

 

2.銷毀線程

 1     m_bFlagExit = false;
 2     list<HANDLE>::iterator ite = m_lHandle.begin();
 4     while(ite != m_lHandle.end())
 5     {
 6         if(WaitForSingleObject(*ite,100) == WAIT_TIMEOUT)
 7             TerminateThread(*ite,-1);
 8         CloseHandle(*ite);
 9         *ite = 0;
10         ite++;
11     }
12     m_lHandle.clear();14     CloseHandle(m_hSemphore);
15     m_hSemphore = 0;

能自然退出就自然退出,若遇到線程無法關閉的情況,即(等待線程中內核對象的信號量100ms內為無信號時),強制退出

3.線程函數

    CMyThreadPool *pThis = (CMyThreadPool *)lpvoid;
    CItask *pItask = NULL;
    while(pThis->m_bFlagExit)
    {       
        if(!pThis->m_qItask.empty())                                       
        {
            pItask = pThis->m_qItask.front();
            pThis->m_qItask.pop();

            pItask->RunTask();
            delete pItask;
            pItask = NULL;
        }

    }

    return 0;

目的很簡單,在無退出標記的情況下,從隊列中取出一個任務,來一個線程去執行

4.投遞任務

    if(itask == NULL)
        return false;
    m_qItask.push(itask);
    //釋放一個信號量
    ReleaseSemaphore(m_hSemphore,1,0);

將一個任務投入隊列中,並且釋放一個信號量

5.代碼優化

1》在重寫任務類后,嘗試創建了兩個線程去執行 從1加到10000000000的任務,不出意外的崩了,原因是 隊列迭代器失效,在經過一頓查閱后發現,是因為在C++中STL不支持線程安全,隊列的push和pop同時進行會崩掉,一般說來,STL對於多線程的支持僅限於下列兩點:(Effective STL中有描述)

1.多個讀取者是安全的。即多個線程可以同時讀取一個容器中的內容。 即此時多個線程調用 容器的不涉及到寫的接口都可以 eg find, begin, end 等.

2.對不同容器的多個寫入者是安全的。即多個線程對不同容器的同時寫入合法。 但是對於同一容器當有線程寫,有線程讀時,如何保證正確? 需要程序員自己來控制,比如:線程A讀容器某一項時,線程B正在移除該項。這會導致一下無法預知的錯誤。 通常的解決方式是用開銷較小的臨界區(CRITICAL_SECTION)來做同步。以下列方式同步基本上可以做到線程安全的容器(就是在有寫操作的情況下仍能保證安全)。

但是在查閱后,我打算利用一個bool變量去標記隊列中任務是否已經pop,來決定是否去push,還是崩了,原來多根線程也不允許同時push也不符合線程安全,那么將push加入互斥量解決了這個問題。

2》但是作為CPU來說,線程池是由任務的個數來創建線程數,這樣才能最大利用的使用資源,這就像在餐館里吃飯一樣,CPU是老板,飯店里最多有5個服務員(實現創建的線程最大數),在店的有2個(創建的線程數),此時來了一個客人(任務),此時放走一個服務員去執行即可(釋放信號量),此時來了4個客人,我把不在店的2個服務員給叫回來,此時來了10個客人,飯店里5個服務員都已經用完了,那么剩下5個就只能排隊等待了,人數(任務)少時,服務員(線程)少,老板(CPU)就可以減少開支(資源分配)。那么這種方法作為代碼就可以這樣實現。

 1 //1.是否有空閑線程
 2     if(m_lRunThreadNum < m_lCreateThreadNum) 
 3     {
 4         //釋放一個信號量
 5         ReleaseSemaphore(m_hSemphore,1,0);
 6     }
 7     //2.是否達到上限
 8     else if(m_lCreateThreadNum < m_lMaxThreadNum) 
 9     {
10         //創建線程 並且釋放一個信號量
11         HANDLE handle = (HANDLE)_beginthreadex(NULL,0,&ThreadFunction,this,0,0);
12         if(handle)
13         {
14             m_lHandle.push_back(handle);
15         }
16         m_lCreateThreadNum++;
17         ReleaseSemaphore(m_hSemphore,1,0);
18     }
19     //3.已達到上限 任務等待

3》在線程函數里,多個線程去執行任務時,難免會出現線程並發的情況,解決線程並發最常見的莫過於線程同步,也就是上鎖,我列舉一下常見的幾種方式(如果有列舉不當的,歡迎指出):

 

  1.  原子訪問:同一時刻只允許一個線程訪問資源,具體運用就是Interlocked...一系列函數,但是鎖定范圍小,一般就是一個變量,我認為它運用的主要核心就是Volatile關鍵字,因為CPU運算速度過快,需要一個Cache緩存來進行數據交換,而對於多線程來說,數據更改必須從內存中取用,而不是Cache,防止讀內存不同步,比如變量a已經減1了,但此時兩個線程中有一個線程讀取的還是a,而不是內存中的a-1,這個關鍵字的作用就是防止編譯優化,並且對於特殊地址的穩定訪問。
  2. 關鍵段:同一個時刻只允許一個線程訪問資源,舉一個不雅觀的例子,一群人上廁所,一個人進去后,將外面的牌子置為使用中,外面就有一群人在等待,當廁所里的人出來后,再將牌子置為未使用,在有一個人進入,這樣就控制每一只有一個人(線程),上廁所(訪問資源)了,而根據外面人等待時間的長短分為等不到就直接坐下來(其余線程直接阻塞),站着等一段時間,里面人出來了就直接進去,時間到了,里面人還沒出來就直接阻塞(其余線程使用旋轉鎖),還有一種就是沖進來直接推門,能推開就進去,推不開就找另一個廁所(進程)(其余線程異步處理)
  3. 互斥量,事件和信號量:這三種都是內核對象,使用時很安全,並且作用范圍廣,可以跨進程進行通信,並且通過waitforsingleobject()等待信號的時間長短,都能實現關鍵段中三個方法,唯一的缺點就是相對於關鍵段來說執行效率慢,關鍵段是用戶態下的方法,不需要狀態轉換

根據這幾種方法,針對線程函數又加了一些鎖

 1 CMyThreadPool *pThis = (CMyThreadPool *)lpvoid;
 2     CItask *pItask = NULL;
 3     while(pThis->m_bFlagExit)
 4     {
 5         //等待信號量
 6         if(WaitForSingleObject(pThis->m_hSemphore,100) == WAIT_TIMEOUT)        //為了能讓卡死進程能夠退出
 7             continue;
 8 
 9         InterlockedIncrement(&pThis->m_lRunThreadNum);       
10         while(!pThis->m_qItask.empty())                                        //由if->while 代碼優化
11         {
12             if(WaitForSingleObject(pThis->m_lMutex,100) == WAIT_TIMEOUT)    //上鎖
13                 continue;
14             if(pThis->m_qItask.empty())
15             {
16                 ReleaseMutex(pThis->m_lMutex);                                    //解鎖
17                 break;
18             }
19             pItask = pThis->m_qItask.front();
20             pThis->m_qItask.pop();
21             //pThis->m_bIsPop = true;
22             ReleaseMutex(pThis->m_lMutex);                                    //解鎖
23 
24             pItask->RunTask();
25             delete pItask;
26             pItask = NULL;
27         }
28         InterlockedDecrement(&pThis->m_lRunThreadNum);
29 
30     }
31 
32     return 0;

4》在銷毀時,能清空的都要清空,防止內存泄漏

 1     m_bFlagExit = false;
 2     list<HANDLE>::iterator ite = m_lHandle.begin();
 3     //auto ite = m_lHandle.begin();
 4     while(ite != m_lHandle.end())
 5     {
 6         if(WaitForSingleObject(*ite,100) == WAIT_TIMEOUT)
 7             TerminateThread(*ite,-1);
 8         CloseHandle(*ite);
 9         *ite = 0;
10         ite++;
11     }
12     m_lHandle.clear();
13     m_lCreateThreadNum = 0;
14     CloseHandle(m_hSemphore);
15     m_hSemphore = 0;
16 
17     while(!m_qItask.empty())        //防止內存泄漏,將任務清空
18     {
19         CItask *pItask = NULL;
20         pItask = m_qItask.front();
21         m_qItask.pop();
22         delete pItask;
23         pItask = NULL;
24     }
25 
26     if(m_lMutex)                    //關閉互斥量
27     {
28         CloseHandle(m_lMutex);
29         m_lMutex = 0;
30     }

 

接下來在測試時就沒有問題了,但是有點卡,並且CPU的運算率達到了100%,后來查閱博客https://blog.csdn.net/liuyancainiao/article/details/84400637資料,設置的線程數最多為CPU核數的兩倍,假設計算機有一個物理CPU,是雙核的,支持超線程。那么這台計算機就是雙核四線程的。 所以兩路(兩路指的是有兩個物理CPU)四核超線程就有2*4*2=16個邏輯CPU。有人也把它稱之為16核,實際上在linux的/proc/cpuinfo中查看只有8核。既然計算機多核與超線程模擬相關,所以實際上計算機的核數翻倍並不意味着性能的翻倍,也不意味着核數越多計算機性能會越來越好,因為超線程只是充分利用了CPU的空閑資源,實際上在應用中基於很多原因,CPU的執行單元都沒有被充分使用。  具體的代碼 ,我放在文件共享,有需要的朋友可以直接拿走,不客氣^_^

 

 

2019-08-10 11:34:36 編程小菜鳥自我反思,路過的朋友可以留下自己建議和意見 謝謝!!!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM