前言
基礎知識
我們在用C++進行多線程編程的時候,可以使用內核的同步原語進行自己的封裝,也可以使用C++11已經封裝好的,因為我覺得有必要了解一些底層的東西,所以這兩個內容我都會講到。
《Linux多線程編程》中提到的線程同步四項原則:
- 首要原則是盡量最低限度的共享原則,減少同步的場合。一個對象能不暴露給別的線程就不要暴露;如果要暴露,優先考慮
immutable
對象,實在不行才暴露可修改的對象,並且用同步措施來充分保護它。- 其次是使用高級的並發編程控件,比如
TaskQueue
,Producer-Consumer Queue
,CountDownLatch
等等。- 最后不得已必須使用底層同步原語時,只用非遞歸的互斥量和條件變量,慎用讀寫鎖,不要用信號量。
- 除了使用atomic整數之外,不要自己編寫
lock-free
的代碼,也不要用內核級同步原語。
內核-同步原語
同步原語包括互斥量、條件變量、讀寫鎖、信號量、文件互斥,但是在這里我只介紹互斥量和條件變量。
1. 互斥量
互斥量保護了臨界區,任何一個時刻最多只能有一個線程在此使用臨界資源,這樣就做到了保護臨界區。
在使用互斥鎖的時候,需要注意一些事項(均出自《Linux多線程編程》)
- 使用RAII手法封裝mutex的創建、銷毀、加鎖、解鎖的四個操作。
- 不手工的調用
lock和unlock
函數,一切交給棧上的Guard對象的構造和析構函數負責,Guard對象的生命期正好等於臨界區。避免在一個函數里面加鎖,在另一個函數里面解鎖,也避免在不同的分支加鎖和解鎖。- 在每次構造Guard對象的時候,思考已經持有的鎖,防止因為加鎖的順序的不同而導致死鎖。
創建和銷毀
// 初始化
int pthread_mutex_init (pthread_mutex_t *__mutex, __const pthread_mutexattr_t *__mutexattr);
// 銷毀
int pthread_mutex_destroy (pthread_mutex_t *__mutex);
在初始化中的第二個參數是設置線程的屬性,如果默認則設置為NULL即可。
設置屬性
// 初始化互斥量屬性對象
int pthread_mutexattr_init (pthread_mutexattr_t *__attr);
// 銷毀互斥量屬性對象
int pthread_mutexattr_destroy (pthread_mutexattr_t *__attr);
可以發現屬性和互斥量的創建和銷毀是類似的。
使用
// 阻塞到該互斥量解鎖為止
int pthread_mutex_lock(pthread_mutex_t *mutex);
// 不會阻塞,互斥量被占用則返回EBUSY的錯誤
int pthread_mutex_trylock(pthread_mutex_t *mutex);
// 解鎖
int pthread_mutex_unlock(pthread_mutex_t *mutex);
封裝
class TMutex {
public:
TMutex();
~TMutex();
void lock();
void unlock();
inline pthread_mutex_t* getMutext() { return &_mutex; }
private:
pthread_mutex_t _mutex; // 不可以直接操作
};
TMutex::TMutex() {
pthread_mutex_init(&this->_mutex, NULL);
}
TMutex::~TMutex() {
pthread_mutex_destroy(&this->_mutex);
}
void TMutex::lock() {
pthread_mutex_lock(&this->_mutex);
}
void TMutex::unlock() {
pthread_mutex_unlock(&this->_mutex);
}
class TMutexGuard {
public:
explicit TMutexGuard(const TMutex& mutex) : _mutex(mutex) {
_mutex.lock();
}
~TMutexGuard() {
_mutex.unlock();
};
private:
TMutex _mutex;
};
在使用的時候要通過Guard去操作mutex。類似於這樣:
TMutexGuard gaurd(this->_mutex);
注意
死鎖通常發生在多個鎖相互依賴的時候,比如說,有兩個鎖A和B,鎖A占用了資源A使用共享資源,企圖使用共享資源B,此時鎖B占用了資源B並且等待使用資源A,如果雙方都不想讓,最后的結果就是誰也訪問不了資源而高高掛起。
預防這種問題的關鍵在於,應該按照一定的順序來申請資源,釋放資源,不能同時的去對訪問資源進行加鎖,這樣誰都訪問不了,還有另外一種方法,就是使用非阻塞的模式,不使用lock,而是使用try_lock,避免一直阻塞在原地,而是返回EBUSY(鎖尚未解除)
或者EINVAL(鎖變量不可用)
,將自己拿到的鎖進行釋放,過段時間再試試看。
所以在設計代碼的時候,應當盡量減少同一個臨界區的鎖的數量,因為鎖一多,就會出現各種各樣的周邊問題,還有在命名鎖的時候可以加上序號,這樣可以很清楚的知道誰先誰后。
2. 條件變量
互斥鎖是加鎖原語,如果需要等待某一個條件成立的時候,則應該使用條件變量;條件變量必須配合互斥鎖一起使用。比如說我們在便利店買東西的時候,我們要在拿好東西以后去告訴店員我們要付帳,在付完帳后店員就可以繼續做自己的事情,可以把我們買東西等這個看作是一個任務,店員看做是一個線程,這個線程要去幫我們處理各種各樣的任務,如果有任務來的時候,通知線程處理,當任務很多很多的時候,任務就會進入到隊列中(就像排隊,誰先來處理誰),挨個處理,如果沒有任務,線程就等待,這就是線程池的原理。
創建和銷毀
// 初始化
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
// 銷毀
int pthread_cond_destroy(pthread_cond_t *cond);
同樣類似於mutex
,也可以對條件變量設置屬性。
使用
// 通知一個線程
int pthread_cond_signal(pthread_cond_t *cond);
// 通知所有線程
int pthread_cond_broadcast(pthread_cond_t *cond);
// 阻塞該線程直到被喚醒
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
// 在前者的基礎上,加上了時間的限制
int pthread_cond_timedwait(
pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
在阻塞的時候,需要傳遞互斥鎖,用來保護條件,以防止多個線程同時請求的競爭;在調用waiting函數之前,必須要申請互斥鎖,在進入到waiting的時候,才釋放互斥鎖,直到條件收到信號,當調用返回的時候,互斥對象再次被鎖定。
waiting函數應當放在while循環中,因為需要考慮到可能會被意外喚醒,卻不滿足條件的時候。這就是所謂的虛假喚醒。
一般的代碼編寫格式為:
-
wait端
-
signal端或者brocast端
看到有些博客中提到的
wait morphing
-先通知后解鎖,因為先通知,wait端被喚醒以后發現想要占用鎖,但是發現還沒有解鎖,所以又進入到了等待,在signal端解鎖以后wait端才能占用鎖來處理。這里涉及到一個順序的問題,好像兩種方法的結果都是差不多的,但是個中差別如果有人能給我說說更好了😄。
封裝
class TConditon {
public:
TConditon(TMutex mutex);
~TConditon();
void wait(); // 阻塞直到有notify通知
void wait_for(double time); // 可以設置時間
void notify_one(); // 喚醒某個線程
void notify_all(); // 喚醒全部線程
private:
pthread_cond_t _cond;
TMutex _mutex;
};
//------------------------------------------------------//
TConditon::TConditon(TMutex mutex) : _mutex(mutex) {
pthread_cond_init(&this->_cond, nullptr);
}
TConditon::~TConditon() {
this->_mutex.unlock();
pthread_cond_destroy(&this->_cond);
}
void TConditon::wait() {
pthread_cond_wait(&this->_cond, this->_mutex.getMutext());
}
void TConditon::wait_for(double time) {
this->_mutex.lock();
struct timespec spec;
spec.tv_sec = static_cast<time_t >(time/1000); // 秒
spec.tv_nsec = 0; // 毫秒
pthread_cond_timedwait(&this->_cond, this->_mutex.getMutext(), &spec);
}
void TConditon::notify_one() {
pthread_cond_signal(&this->_cond);
}
void TConditon::notify_all() {
pthread_cond_broadcast(&this->_cond);
}
注意
條件變量通常用來實現高層的阻塞隊列(線程池中的實現就是)或者倒時器;
倒時器主要有兩種用途:
- 主線程發起多個子線程,並且在等待全部子線程完成一定的任務后,主線程才繼續執行,通常可以用在主線程等待多個子線程完成初始化;
- 主線程同樣發起多個子線程,並且在等待主線程完成一定的任務以后,多個子線程才繼續執行,通常可以用於多個子線程等待主線程發出起跑的命令。
讀寫鎖和信號量
在陳碩的書中提到,不建議使用讀寫鎖和信號量,因為一般情況下普通mutex和條件變量已經足夠,所以這個打算用到再說。
線程
前面講的都需要基於線程的基礎,下面將介紹在Unix下,線程的使用方法,其頭文件為#include <pthread.h>
。
線程狀態轉換圖
這是JAVA中的線程轉換圖,可以發現,線程一共有四種狀態,分別是就緒狀態,阻塞狀態,運行狀態,終止狀態。
-
就緒狀態:線程可以運行,此時等待系統調用,也就是上圖中的可運行;
-
運行狀態:因為系統會為每個線程分配一個時間段,如果在這個時間片還沒有執行完畢,那么系統會將這個線程狀態保存下來,同時讓下一個線程來執行,也就是這個線程被搶占了,則會到可運行的狀態;那么如果沒有足夠的線程執行任務對象的時候,則會讓任務進入等待隊列,等待分配一個線程去處理;
-
阻塞狀態是線程因為某種原因放棄CPU使用權,暫時停止運行。直到線程進入就緒狀態,才有機會轉到運行狀態。
阻塞的情況分三種:1. 等待阻塞 -- 通過調用線程的wait()方法,讓線程等待某工作的完成。(wait方法會釋放占用資源)2. 同步阻塞 -- 線程在獲取synchronized同步鎖失敗(因為鎖被其它線程所占用),它會進入同步阻塞狀態。3. 其他阻塞 -- 通過調用線程的sleep()或join()或發出了I/O請求時,線程會進入到阻塞狀態。當sleep()狀態超時、join()等待線程終止或者超時、或者I/O處理完畢時,線程重新轉入就緒狀態。
創建和終止
// 創建
int pthread_create(pthread_t *thread_id, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
// 終止(主動的行為)
void pthread_exit(void *retval);
// 終止(在同一進程內的線程可以指定另一個線程退出)
int pthread_cancel(pthread_t thread);
其中thread_id為線程的標識符,attr為線程的屬性,start_routine表示線程一旦建立就會執行的函數,arg為傳遞給函數start_routine的參數。我們可以在函數中調用pthread_self
獲取調用這個函數的線程的標識符。
連接和分離
// 連接
int pthread_join(pthread_t thread, void **retval);
// 分離
int pthread_detach(pthread_t thread);
在我們創建線程的時候,有一個屬性用來指定是連接的還是分離的,只有定義為非分離的才可以連接,否則會報錯,通常我們都會設置為NULL,默認就是非分離的。
線程之間是共享數據段的,因此通常在線程退出以后,退出線程所占用的資源並不會隨着線程的退出而被釋放,所以這個時候,可以使用pthread_join
來同步釋放資源,調用該函數的線程將會掛起等待,直到終止。
兩者的差別在於,join會等待所有的線程都處理資源完畢,才會將這個沒有任何線程使用的資源給釋放掉,也就是說,一個線程執行join以后,其它的線程可以使用它的資源,因為它的資源還沒有被系統釋放掉;但是detach卻不一樣,調用該函數的線程終止以后系統立馬收回其資源。注意,這兩個函數不能夠同時使用。
主線程和普通線程
在C語言的程序中,main就是一個主線程,主線程發散多個子線程,而這些子線程就是普通線程。
主線程和普通線程的區別在於:
- 主線程返回或者運行結束時(執行return,exit等),所有的線程不管有沒有執行完都要退出,但是普通線程不會。所以我們如果想要主線程等待其它線程結束以后才退出,通常使用的方法是
pthread_join
,這時調用的主線程會被阻塞,直到其它被join
的線程執行結束以后才會往下執行。 - 一般主線程的棧的大小比普通現成的大很多。主線程使用的是進程的棧,所以會比較大。
- 主線程的main函數是被程序在對進程進行初始化后調用,而普通函數則是通過
start
函數調用。
封裝
.h文件
#ifndef TICKLE_TTHREAD_H
#define TICKLE_TTHREAD_H
#include <iostream>
#include <pthread.h>
#include <functional>
#include <string.h>
#include <unistd.h>
#include <exception>
#include <memory>
namespace Tickle {
typedef std::function<void()> TThreadFunc;
struct TThreadData {
TThreadFunc _func;
std::weak_ptr<pid_t> _pid;
std::string _name;
TThreadData(const TThreadFunc& func, const std::shared_ptr<pid_t>& pid, const std::string& name) : _func(func), _pid(pid), _name(name) {}
void runInThread() {
try {
if (_func == nullptr) {
std::cout << "function is null" << std::endl;
}
else {
_func();
}
}
catch (std::exception &e) {
std::cout << "Thread error info:" << e.what() << std::endl;
}
}
};
class TThread {
public:
TThread(const TThreadFunc& func, const std::string& name = std::string());
TThread(const TThreadData& data);
~TThread();
void start(); // 創建線程
void join();
inline const std::string& name() const { return _name; }
private:
std::string _name;
pthread_t _thd;
std::shared_ptr<pid_t> _pid;
TThreadFunc _func;
};
}
#endif //TICKLE_TTHREAD_H
.cpp文件
#include "TThread.h"
namespace Tickle {
void* startInThread(void* param) {
TThreadData* data = static_cast<TThreadData*>(param);
data->runInThread();
delete(data);
return nullptr;
}
TThread::TThread(const TThreadFunc& func, const std::string& name) : _func(func), _name(name), _thd(), _pid(new pid_t(0))
{
}
TThread::TThread(const TThreadData &data) : _func(data._func), _name(data._name), _thd(0), _pid(new pid_t(0))
{
}
TThread::~TThread() {
pthread_detach(this->_thd);
}
void TThread::start() {
TThreadData * data = new TThreadData(this->_func, this->_pid, this->_name);
if (0 != pthread_create(&this->_thd, nullptr, startInThread, data)) {
std::cout << "create the thread failed" << std::endl;
return ;
}
}
void TThread::join() {
if (0 != pthread_join(this->_thd, nullptr)) {
std::cout << "join the thread failed" << std::endl;
}
}
}
內存池的實現
內存池包括以下重點:
- 先申請一定數量的線程;
- 添加任務進入隊列,並且通知線程來處理;
- 線程在從隊列中選擇任務的時候,按照隊列先入先出的順序來選擇,如果隊列中沒有任何任務,條件變量則等待。
最后
我這里就不把我實現的給貼出來,大家可以自己實現😄,還有什么錯誤請指出來,轉載請注明出處,謝謝。