線程池的設計實現


線程池: 就是new一堆線程,當有任務到來時,抓一個線程去執行,執行完之后再丟回線程池。

省去了新建和注銷線程的開銷。

一、線程池工作分為以下幾步:

(1)創建線程固定數目的線程(如:20個),並讓線程掛起等待任務
(2)給某個線程設置任務
(3)激活該線程,讓其執行任務
(4)線程執行任務完畢后,回收該線程

二、主要是依靠三對PV操作

線程池線程
{

//取用線程
V(m_Cond_Run)
P(m_Cond_IsTaskRun)

...
//回收線程
P(m_Cond_IsRunning)⑤
}

任務線程
{
//取用線程
P(m_Cond_Run);③
V(m_Cond_IsTaskRun)

...
//回收線程
V(m_Cond_IsRunning)
}

 

 

 

 

 

 

 

 如上面結構所示:

①狀態時,是線程池線程創建任務線程成功,任務線程在③處掛起;
②狀態時,線程池線程已經運行了一個用於激活任務的V、P操作;任務線程也會運行到④,執行任務;
    這里面,m_Cond_Run的目的是線程池告訴任務線程,需要啟動;m_Cond_IsTaskRun是任務線程回復線程池線程說,任務正常啟動;
⑤所在的地方是線程池發送了結束任務線程的指令,用P操作來等待任務線程結束;
     m_Cond_IsRunning目的是任務線程,在將任務暫停並回收資源成功后,需要告訴線程池線程可以回收我了。

三、雖然過程不復雜,但是實現過程中的邏輯卻不簡單,以darwin的源碼為例:

類結構如下:(偽代碼)

class ThreadTask //是一個執行任務的線程類
{
friend class ThreadPool;
public:
    Int start();       //創建線程
static void _Entry(void *inThread);   //該線程的回調函數
virtual void Entry();
Bool Active();     //激活該線程

private:
Task *m_task;

OSCond     m_Cond_Run;    //用於檢測線程是否運行的條件變量
OSMutex     m_Mutex_Run;   //相對應的互斥量

OSCond     m_Cond_IsTaskRun;  //用於檢測是否運行任務的條件變量
OSMutex     m_Mutex_IsTaskRun; //相對應的互斥量

};

class Task
{
friend class ThreadTask;
public:
    Bool RunTask(){ ThreadPool::RegistTask(this);}
virtual int Run()=0; //最終執行任務的函數
};

class ThreadPool
{
friend class ThreadTask;
public:
    ThreadPool(int n);   //初始化
static  void RegistTask(Task* ptask); //注冊任務
private:
    List<ThreadTask*> m_useList;
    List<ThreadTask*> m_unUseList;
};

類的實現如下:(偽代碼)

class MyTask :public Task
{
    int Run(){...};   //代碼中,只要實現Run()函數,就可以了。
};
int main()
{
    MyTask *mytask=new MyTask();
    mytask->RunTask();   //調用RunTask()就可以是任務運行,但是一般通常把這個函數封裝起來而不這樣用
    return 0;
}

//取一個線程執行任務
void ThreadPool::RegistTask(Task* ptask)
{
    if(m_unUseList.empty())
    {
        //如果沒有空閑的線程,那么就創建一個線程
        ThreadTask *pthread = new ThreadTask();
        pthread->start();
        m_useList.Push(pthread);
    }
    ThreadTask *pthread = m_unUseList.Pop();
    p->m_task=ptask;//給線程添加任務
    pthread->Active();//激活線程
}
//初始化n個線程,並激活
void ThreadPool::ThreadPool(int n)
{
    while(n--)
    {
        ThreadTask *pthread = new ThreadTask();
        pthread->start();
        m_useList.Push(pthread);
    }
}
//創建線程
Int ThreadTask::start()
{
    //可以看出線程的回調函數是_Entry實際上是一個靜態函數,目的是為了調用各自進程的Entry()函數
    return pthread_create((pthread_t*)&m_ThreadID, &m_ThreadAttr, _Entry, (void*)this);
}
void ThreadTask::_Entry(void *inThread)
{
    //調用當前進程對應的Entry()函數
    inThread->Entry();
}
void ThreadTask::Entry()
{
    while (1)
    {
        //這一段相當於P(m_Cond_Run)
        m_Mutex_Run.Lock();
        while(!m_busy)
        {
            m_Cond_Run.Wait(&m_Mutex_Run, 500);
        }
        m_Mutex_Run.Unlock();


        //這一段相當於V(m_Cond_IsTaskRun)
        m_Mutex_IsTaskRun.Lock();
        m_IsTaskRun = TRUE;
        m_Cond_IsTaskRun.Signal();
        m_Mutex_IsTaskRun.Unlock();

        //在這里執行Run()之后再把線程放回線程池的未用隊列
        if(m_Task != NULL)
        {
            m_Task->Run();
            delete m_task;
            //在這一部分還有一段用於確認是否運行完畢的PV操作,暫且省略
            
            //這個函數主要是把當前線程從已用隊列挪到未用隊列
            ThreadPool::ReclaimThread(this);
        }
        //修改狀態
        m_Mutex_IsTaskRun.Lock();
        m_IsTaskRun = FALSE;
        m_Mutex_IsTaskRun.Unlock();
    }

    return NULL;
}
Bool ThreadTask::Active()
{
    m_Mutex_Run.Lock();
    if(!m_busy)
    {
        //這一段相當於V(m_Cond_Run)
        m_busy = TRUE;
        m_Cond_Run.Signal();
        m_Mutex_Run.Unlock();

        //這一段相當於P(m_Cond_IsTaskRun)
        m_Mutex_IsTaskRun.Lock();
        while(!m_IsTaskRun)
        {
            m_Cond_IsTaskRun.Wait(&m_Mutex_IsTaskRun,100);
        }
        m_Mutex_IsTaskRun.Unlock();
        return TRUE;
    }
    m_Mutex_Run.Unlock();
    return TRUE;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM