多線程開發之線程基礎(實現線程池必備知識)


前言

基礎知識

我們在用C++進行多線程編程的時候,可以使用內核的同步原語進行自己的封裝,也可以使用C++11已經封裝好的,因為我覺得有必要了解一些底層的東西,所以這兩個內容我都會講到。

《Linux多線程編程》中提到的線程同步四項原則

  1. 首要原則是盡量最低限度的共享原則,減少同步的場合。一個對象能不暴露給別的線程就不要暴露;如果要暴露,優先考慮immutable對象,實在不行才暴露可修改的對象,並且用同步措施來充分保護它。
  2. 其次是使用高級的並發編程控件,比如 TaskQueueProducer-Consumer QueueCountDownLatch等等。
  3. 最后不得已必須使用底層同步原語時,只用非遞歸的互斥量和條件變量,慎用讀寫鎖,不要用信號量。
  4. 除了使用atomic整數之外,不要自己編寫lock-free的代碼,也不要用內核級同步原語。

內核-同步原語

同步原語包括互斥量、條件變量、讀寫鎖、信號量、文件互斥,但是在這里我只介紹互斥量和條件變量。

1. 互斥量

互斥量保護了臨界區,任何一個時刻最多只能有一個線程在此使用臨界資源,這樣就做到了保護臨界區。

在使用互斥鎖的時候,需要注意一些事項(均出自《Linux多線程編程》)

  1. 使用RAII手法封裝mutex的創建、銷毀、加鎖、解鎖的四個操作。
  2. 不手工的調用lock和unlock函數,一切交給棧上的Guard對象的構造和析構函數負責,Guard對象的生命期正好等於臨界區。避免在一個函數里面加鎖,在另一個函數里面解鎖,也避免在不同的分支加鎖和解鎖。
  3. 在每次構造Guard對象的時候,思考已經持有的鎖,防止因為加鎖的順序的不同而導致死鎖。
創建和銷毀
// 初始化
int pthread_mutex_init (pthread_mutex_t *__mutex, __const pthread_mutexattr_t *__mutexattr);  

// 銷毀
int pthread_mutex_destroy (pthread_mutex_t *__mutex);

在初始化中的第二個參數是設置線程的屬性,如果默認則設置為NULL即可。

設置屬性
// 初始化互斥量屬性對象
int pthread_mutexattr_init (pthread_mutexattr_t *__attr);  

// 銷毀互斥量屬性對象
int pthread_mutexattr_destroy (pthread_mutexattr_t *__attr);

可以發現屬性和互斥量的創建和銷毀是類似的。

使用
// 阻塞到該互斥量解鎖為止
int pthread_mutex_lock(pthread_mutex_t *mutex);

// 不會阻塞,互斥量被占用則返回EBUSY的錯誤
int pthread_mutex_trylock(pthread_mutex_t *mutex);

// 解鎖
int pthread_mutex_unlock(pthread_mutex_t *mutex);
封裝
class TMutex {
public:
    TMutex();
    ~TMutex();

    void lock();
    void unlock();

    inline pthread_mutex_t* getMutext() { return &_mutex; }

private:
    pthread_mutex_t _mutex;  // 不可以直接操作
};

TMutex::TMutex() {
    pthread_mutex_init(&this->_mutex, NULL);
}

TMutex::~TMutex() {
    pthread_mutex_destroy(&this->_mutex);
}

void TMutex::lock() {
    pthread_mutex_lock(&this->_mutex);
}

void TMutex::unlock() {
    pthread_mutex_unlock(&this->_mutex);
}

class TMutexGuard {
public:
    explicit TMutexGuard(const TMutex& mutex) : _mutex(mutex) {
        _mutex.lock();
    }

    ~TMutexGuard() {
        _mutex.unlock();
    };

private:
    TMutex _mutex;
};


在使用的時候要通過Guard去操作mutex。類似於這樣:

TMutexGuard gaurd(this->_mutex);
注意

死鎖通常發生在多個鎖相互依賴的時候,比如說,有兩個鎖A和B,鎖A占用了資源A使用共享資源,企圖使用共享資源B,此時鎖B占用了資源B並且等待使用資源A,如果雙方都不想讓,最后的結果就是誰也訪問不了資源而高高掛起。

預防這種問題的關鍵在於,應該按照一定的順序來申請資源,釋放資源,不能同時的去對訪問資源進行加鎖,這樣誰都訪問不了,還有另外一種方法,就是使用非阻塞的模式,不使用lock,而是使用try_lock,避免一直阻塞在原地,而是返回EBUSY(鎖尚未解除)或者EINVAL(鎖變量不可用),將自己拿到的鎖進行釋放,過段時間再試試看。

所以在設計代碼的時候,應當盡量減少同一個臨界區的鎖的數量,因為鎖一多,就會出現各種各樣的周邊問題,還有在命名鎖的時候可以加上序號,這樣可以很清楚的知道誰先誰后。

2. 條件變量

互斥鎖是加鎖原語,如果需要等待某一個條件成立的時候,則應該使用條件變量;條件變量必須配合互斥鎖一起使用。比如說我們在便利店買東西的時候,我們要在拿好東西以后去告訴店員我們要付帳,在付完帳后店員就可以繼續做自己的事情,可以把我們買東西等這個看作是一個任務,店員看做是一個線程,這個線程要去幫我們處理各種各樣的任務,如果有任務來的時候,通知線程處理,當任務很多很多的時候,任務就會進入到隊列中(就像排隊,誰先來處理誰),挨個處理,如果沒有任務,線程就等待,這就是線程池的原理

創建和銷毀
// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond, 			const pthread_condattr_t *restrict attr);

// 銷毀
int pthread_cond_destroy(pthread_cond_t *cond);

同樣類似於mutex,也可以對條件變量設置屬性。

使用
// 通知一個線程
int pthread_cond_signal(pthread_cond_t *cond);

// 通知所有線程
int pthread_cond_broadcast(pthread_cond_t *cond);

// 阻塞該線程直到被喚醒
int pthread_cond_wait(pthread_cond_t *restrict cond, 			pthread_mutex_t *restrict mutex);

// 在前者的基礎上,加上了時間的限制
int pthread_cond_timedwait(
pthread_cond_t *restrict cond, 
pthread_mutex_t *restrict mutex, 
const struct timespec *restrict abstime);

在阻塞的時候,需要傳遞互斥鎖,用來保護條件,以防止多個線程同時請求的競爭;在調用waiting函數之前,必須要申請互斥鎖,在進入到waiting的時候,才釋放互斥鎖,直到條件收到信號,當調用返回的時候,互斥對象再次被鎖定。

waiting函數應當放在while循環中,因為需要考慮到可能會被意外喚醒,卻不滿足條件的時候。這就是所謂的虛假喚醒

一般的代碼編寫格式為:

  • wait端

  • signal端或者brocast端

    看到有些博客中提到的wait morphing-先通知后解鎖,因為先通知,wait端被喚醒以后發現想要占用鎖,但是發現還沒有解鎖,所以又進入到了等待,在signal端解鎖以后wait端才能占用鎖來處理。這里涉及到一個順序的問題,好像兩種方法的結果都是差不多的,但是個中差別如果有人能給我說說更好了😄。

封裝
class TConditon {
public:
    TConditon(TMutex mutex);
    ~TConditon();

    void wait();  // 阻塞直到有notify通知
    void wait_for(double time);  // 可以設置時間
    void notify_one();  // 喚醒某個線程
    void notify_all();  // 喚醒全部線程

private:
    pthread_cond_t _cond;
    TMutex _mutex;
};

//------------------------------------------------------//

TConditon::TConditon(TMutex mutex) : _mutex(mutex) {
    pthread_cond_init(&this->_cond, nullptr);
}

TConditon::~TConditon() {
    this->_mutex.unlock();
    pthread_cond_destroy(&this->_cond);
}

void TConditon::wait() {
    pthread_cond_wait(&this->_cond, this->_mutex.getMutext());
}

void TConditon::wait_for(double time) {
    this->_mutex.lock();
    struct timespec spec;
    spec.tv_sec = static_cast<time_t >(time/1000);  // 秒
    spec.tv_nsec = 0;  // 毫秒
    pthread_cond_timedwait(&this->_cond, this->_mutex.getMutext(), &spec);
}

void TConditon::notify_one() {
    pthread_cond_signal(&this->_cond);
}

void TConditon::notify_all() {
    pthread_cond_broadcast(&this->_cond);
}
注意

條件變量通常用來實現高層的阻塞隊列(線程池中的實現就是)或者倒時器;

倒時器主要有兩種用途:

  • 主線程發起多個子線程,並且在等待全部子線程完成一定的任務后,主線程才繼續執行,通常可以用在主線程等待多個子線程完成初始化;
  • 主線程同樣發起多個子線程,並且在等待主線程完成一定的任務以后,多個子線程才繼續執行,通常可以用於多個子線程等待主線程發出起跑的命令。

讀寫鎖和信號量

在陳碩的書中提到,不建議使用讀寫鎖和信號量,因為一般情況下普通mutex和條件變量已經足夠,所以這個打算用到再說。

線程

前面講的都需要基於線程的基礎,下面將介紹在Unix下,線程的使用方法,其頭文件為#include <pthread.h>

線程狀態轉換圖

這是JAVA中的線程轉換圖,可以發現,線程一共有四種狀態,分別是就緒狀態,阻塞狀態,運行狀態,終止狀態。

  • 就緒狀態:線程可以運行,此時等待系統調用,也就是上圖中的可運行;

  • 運行狀態:因為系統會為每個線程分配一個時間段,如果在這個時間片還沒有執行完畢,那么系統會將這個線程狀態保存下來,同時讓下一個線程來執行,也就是這個線程被搶占了,則會到可運行的狀態;那么如果沒有足夠的線程執行任務對象的時候,則會讓任務進入等待隊列,等待分配一個線程去處理;

  • 阻塞狀態是線程因為某種原因放棄CPU使用權,暫時停止運行。直到線程進入就緒狀態,才有機會轉到運行狀態。

      阻塞的情況分三種:1. 等待阻塞 -- 通過調用線程的wait()方法,讓線程等待某工作的完成。(wait方法會釋放占用資源)2. 同步阻塞 -- 線程在獲取synchronized同步鎖失敗(因為鎖被其它線程所占用),它會進入同步阻塞狀態。3. 其他阻塞 -- 通過調用線程的sleep()或join()或發出了I/O請求時,線程會進入到阻塞狀態。當sleep()狀態超時、join()等待線程終止或者超時、或者I/O處理完畢時,線程重新轉入就緒狀態。
    

創建和終止

// 創建
int pthread_create(pthread_t *thread_id, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

// 終止(主動的行為)
void pthread_exit(void *retval);

// 終止(在同一進程內的線程可以指定另一個線程退出)
int pthread_cancel(pthread_t thread);

其中thread_id為線程的標識符,attr為線程的屬性,start_routine表示線程一旦建立就會執行的函數,arg為傳遞給函數start_routine的參數。我們可以在函數中調用pthread_self獲取調用這個函數的線程的標識符。

連接和分離

// 連接
int pthread_join(pthread_t thread, void **retval);
 
 // 分離
int pthread_detach(pthread_t thread);

在我們創建線程的時候,有一個屬性用來指定是連接的還是分離的,只有定義為非分離的才可以連接,否則會報錯,通常我們都會設置為NULL,默認就是非分離的。

線程之間是共享數據段的,因此通常在線程退出以后,退出線程所占用的資源並不會隨着線程的退出而被釋放,所以這個時候,可以使用pthread_join來同步釋放資源,調用該函數的線程將會掛起等待,直到終止。

兩者的差別在於,join會等待所有的線程都處理資源完畢,才會將這個沒有任何線程使用的資源給釋放掉,也就是說,一個線程執行join以后,其它的線程可以使用它的資源,因為它的資源還沒有被系統釋放掉;但是detach卻不一樣,調用該函數的線程終止以后系統立馬收回其資源。注意,這兩個函數不能夠同時使用。

主線程和普通線程

在C語言的程序中,main就是一個主線程,主線程發散多個子線程,而這些子線程就是普通線程。

主線程和普通線程的區別在於:

  • 主線程返回或者運行結束時(執行return,exit等),所有的線程不管有沒有執行完都要退出,但是普通線程不會。所以我們如果想要主線程等待其它線程結束以后才退出,通常使用的方法是pthread_join,這時調用的主線程會被阻塞,直到其它被join的線程執行結束以后才會往下執行。
  • 一般主線程的棧的大小比普通現成的大很多。主線程使用的是進程的棧,所以會比較大。
  • 主線程的main函數是被程序在對進程進行初始化后調用,而普通函數則是通過start函數調用。

封裝

.h文件

#ifndef TICKLE_TTHREAD_H
#define TICKLE_TTHREAD_H

#include <iostream>
#include <pthread.h>
#include <functional>
#include <string.h>
#include <unistd.h>
#include <exception>
#include <memory>

namespace Tickle {

    typedef std::function<void()> TThreadFunc;
    struct TThreadData {
        TThreadFunc _func;
        std::weak_ptr<pid_t> _pid;
        std::string _name;


        TThreadData(const TThreadFunc& func, const std::shared_ptr<pid_t>& pid, const std::string& name) : _func(func), _pid(pid), _name(name) {}

        void runInThread() {
            try {
                if (_func == nullptr) {
                    std::cout << "function is null" << std::endl;
                }
                else {
                    _func();
                }
            }
            catch (std::exception &e) {
                std::cout << "Thread error info:" << e.what() << std::endl;
            }
        }

    };

    class TThread {
    public:
        TThread(const TThreadFunc& func, const std::string& name = std::string());
        TThread(const TThreadData& data);
        ~TThread();

        void start();  // 創建線程
        void join();
        inline const std::string& name() const { return _name; }

    private:
        std::string _name;
        pthread_t _thd;
        std::shared_ptr<pid_t> _pid;
        TThreadFunc _func;
    };

}

#endif //TICKLE_TTHREAD_H

.cpp文件

#include "TThread.h"

namespace Tickle {

    void* startInThread(void* param) {
        TThreadData* data = static_cast<TThreadData*>(param);
        data->runInThread();
        delete(data);
        return nullptr;
    }

    TThread::TThread(const TThreadFunc& func, const std::string& name) : _func(func), _name(name), _thd(), _pid(new pid_t(0))
    {

    }

    TThread::TThread(const TThreadData &data) : _func(data._func), _name(data._name), _thd(0), _pid(new pid_t(0))
    {

    }

    TThread::~TThread() {
        pthread_detach(this->_thd);
    }

    void TThread::start() {
        TThreadData * data = new TThreadData(this->_func, this->_pid, this->_name);
        if (0 != pthread_create(&this->_thd, nullptr, startInThread, data)) {
            std::cout << "create the thread failed" << std::endl;
            return ;
        }
    }

    void TThread::join() {
        if (0 != pthread_join(this->_thd, nullptr)) {
            std::cout << "join the thread failed" << std::endl;
        }
    }
}

內存池的實現

內存池包括以下重點:

  1. 先申請一定數量的線程;
  2. 添加任務進入隊列,並且通知線程來處理;
  3. 線程在從隊列中選擇任務的時候,按照隊列先入先出的順序來選擇,如果隊列中沒有任何任務,條件變量則等待。

最后

我這里就不把我實現的給貼出來,大家可以自己實現😄,還有什么錯誤請指出來,轉載請注明出處,謝謝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM