博客參考:HappenLee
1. C++多線程編程的困擾
C++從11開始在標准庫之中引入了線程庫來進行多線程編程,在之前的版本需要依托操作系統本身提供的線程庫來進行多線程的編程。(其實本身就是在標准庫之上對底層的操作系統多線程API統一進行了封裝,使用的pthread
或<windows.h>
來進行多線程編程的)
提供了統一的多線程固然是好事,但是標准庫給的支持實在是有限,具體實踐起來還是讓人挺困擾的:
- C++本身的STL並不是線程安全的。所以缺少了類似與Java並發庫所提供的一些高性能的線程安全的數據結構。(Doug Lea大神親自操刀完成的並發編程庫,讓JDK5成為Java之中里程碑式的版本)
- 如果沒有線程安全的數據結構,退而求其次,可以自己利用互斥量Mutex來實現。C++的標准庫支持如下的互斥量的實現:
互斥量 | 版本 | 作用 |
---|---|---|
mutex | C++11 | 最基本的互斥量 |
timed_mutex | C++11 | 有超時機制的互斥量 |
timed_mutex | C++11 | 有超時機制的互斥量 |
recursive_mutex | C++11 | 可重入的互斥量 |
recursive_timed_mutex | C++11 | 結合 2,3 特點的互斥量 |
shared_timed_mutex | C++14 | 具有超時機制的可共享互斥量 |
shared_mutex | C++17 | 共享的互斥量 |
由上述表格可見,C++是從14之后的版本才正式支持共享互斥量,也就是實現讀寫鎖的結構。如果當前僅支持C++11的版本,所就沒有辦法使用共享互斥量來實現讀寫鎖了。所以只能使用boost的庫,利用boost提供的讀寫鎖來完成所需完成的工作。
2.標准庫互斥量的剖析
mutex
mutex的中文翻譯就是互斥量,很多人喜歡稱之其為鎖。其實不是太准確,因為多線程編程本質上應該通過互斥量之上加鎖,解鎖的操作,來實現多線程並發執行時對互斥資源線程安全的訪問。 我們來看看mutex類的使用方法:
long num = 0;
std::mutex num_mutex;
void numplus() {
num_mutex.lock();
for (long i = 0; i < 1000000; ++i) {
num++;
}
num_mutex.unlock();
};
void numsub() {
num_mutex.lock();
for (long i = 0; i < 1000000; ++i) {
num--;
}
num_mutex.unlock();
}
int main() {
std::thread t1(numplus);
std::thread t2(numsub);
t1.join();
t2.join();
std::cout << num << std::endl;
}
調用線程從成功調用lock()或try_lock()開始,到unlock()為止占有mutex對象。當存在某線程占有mutex時,所有其他線程若調用lock則會阻塞,而調用try_lock會得到false返回值。由上述代碼可以看到,通過mutex加鎖的方式,來確保只有單一線程對臨界區的資源進行操作。
time_mutex
與recursive_mutex
的使用也是大同小異,兩者都是基於mutex來實現的。( 本質上是基於recursive_mutex實現的,mutex為recursive_mutex的特例)
time_mutex
則是進行加鎖時可以設置阻塞的時間,若超過對應時長,則返回false。
recursive_mutex
則讓單一線程可以多次對同一互斥量加鎖,同樣,解鎖時也需要釋放相同多次的鎖。
以上三種類型的互斥量都是包裝了操作系統底層的pthread_mutex_t:

在C++之中並不提倡我們直接對鎖進行操作,因為在lock之后忘記調用unlock很容易造成死鎖。而對臨界資源進行操作時,可能會拋出異常,程序也有可能break,return 甚至 goto,這些情況都極容易導致unlock沒有被調用。所以C++之中通過RAII來解決這個問題,它提供了一系列的通用管理互斥量的類:
互斥量管理 | 版本 | 作用 |
---|---|---|
lock_graud | C++11 | 基於作用域的互斥量管理 |
unique_lock | C++11 | 更加靈活的互斥量管理 |
shared_lock | C++14 | 共享互斥量的管理 |
scope_lock | C++17 | 多互斥量避免死鎖的管理 |
創建互斥量管理對象時,它試圖給給定mutex加鎖。當程序離開互斥量管理對象的作用域時,互斥量管理對象會析構並且並釋放mutex。所以我們則不需要擔心程序跳出或產生異常引發的死鎖了。
對於需要加鎖的代碼段,可以通過{}括起來形成一個作用域。比如上述代碼的栗子,可以進行如下改寫(推薦):
long num = 0;
std::mutex num_mutex;
void numplus() {
std::lock_guard<std::mutex> lock_guard(num_mutex);
for (long i = 0; i < 1000000; ++i) {
num++;
}
};
void numsub() {
std::lock_guard<std::mutex> lock_guard(num_mutex);
for (long i = 0; i < 1000000; ++i) {
num--;
}
}
int main() {
std::thread t1(numplus);
std::thread t2(numsub);
t1.join();
t2.join();
std::cout << num << std::endl;
}
由上述代碼可以看到,代碼結構變得更加明晰了,對於鎖的管理也交給了程序本身來進行處理,減少了出錯的可能。
shared_mutex
C++14的版本之后提供了共享互斥量,它的區別就在於提供更加細粒度的加鎖操作:lock_shared。lock_shared是一個獲取共享鎖的操作,而lock是一個獲取排他鎖的操作,通過這種方式更加細粒度化鎖的操作。shared_mutex也是基於操作系統底層的讀寫鎖pthread_rwlock_t的封裝:

這里有個事情挺奇怪的,C++14提供了shared_timed_mutex 而在C++17提供了shared_mutex。其實shared_timed_mutex涵蓋了shard_mutex的功能。(不知道是不是因為名字被diss了,所以后續在C++17里將shared_mutex**加了回來)。共享互斥量適用與讀多寫少的場景,舉個栗子:
long num = 0;
std::shared_mutex num_mutex;
// 僅有單個線程可以寫num的值。
void numplus() {
std::unique_lock<std::shared_mutex> lock_guard(num_mutex);
for (long i = 0; i < 1000000; ++i) {
num++;
}
};
// 多個線程同時讀num的值。
long numprint() {
std::shared_lock<std::shared_mutex> lock_guard(num_mutex);
return num;
}
簡單來說:
- shared_lock是讀鎖。被鎖后仍允許其他線程執行同樣被shared_lock的代碼
- unique_lock是寫鎖。被鎖后不允許其他線程執行被shared_lock或unique_lock的代碼。它可以同時限制unique_lock與share_lock
#include <iostream>
#include "boost/thread/mutex.hpp"
#include "boost/thread/thread.hpp"
typedef boost::shared_lock<boost::shared_mutex> readLock;
typedef boost::unique_lock<boost::shared_mutex> writeLock;
boost::shared_mutex rwmutex;
std::vector<int> shared_vec = { 1, 2, 3, 4, 5, 6 };
int pause = 0;
void wait(int milliseconds)
{
boost::this_thread::sleep(boost::posix_time::milliseconds(milliseconds));
}
void readThread1()
{
while (true)
{
wait(100);
if (pause)
{
continue;
}
readLock rdlock(rwmutex);
printf("rthread 1: vec[1]: %d\n", shared_vec[1]);
}
}
void readThread2()
{
while (true)
{
wait(100);
if (pause)
{
continue;
}
readLock rdlock(rwmutex);
printf("rthread 2: vec[2]: %d\n", shared_vec[2]);
}
}
void writeThread()
{
int count = 1;
while (true)
{
count++;
wait(500);
if (pause)
{
continue;
}
writeLock wdlock(rwmutex);
shared_vec[1] = count;
shared_vec[2] = count + 1;
printf("wthread 2: vec[1] and vec[2]: %d %d\n", shared_vec[1], shared_vec[2]);
if (count >= 100)
{
count = 0;
}
}
}
int main()
{
boost::thread t1(readThread1);
wait(20);
boost::thread t2(readThread2);
wait(20);
boost::thread t3(writeThread);
while (true)
{
char c = getchar();
if (c == 's')
{
pause = 1;
}
else if (c == 'r')
{
pause = 0;
}
else if (c == 'q')
{
break;
}
wait(500);
}
return 0;
}

不得不說,C++11沒有將共享互斥量集成進來,在很多讀多寫少的應用場合之中,標准庫本身提供的鎖機制顯得很雞肋,也從而導致了筆者最終只能求助與boost的解決方案。
多鎖競爭
還剩下最后一個要寫的內容:scope_lock ,當我們要進行多個鎖管理時,很容易出現問題,由於加鎖的先后順序不同導致死鎖。(其實本來不想寫了,好累。這里就簡單用例子做解釋吧,偷個懶~~)
如下栗子,加鎖順序不當導致死鎖:
std::mutex m1, m2;
// thread 1
{
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);
}
// thread 2
{
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);
}
而通過C++17提供的scope_lock就可以很簡單解決這個問題了:
std::mutex m1, m2;
// thread 1
{
std::scope_lock lock(m1, m2);
}
// thread 2
{
std::scope_lock lock(m1, m2);
}
boost::timed_mutex
#include <iostream>
#include "boost/thread/mutex.hpp"
#include "boost/thread/thread.hpp"
#include "ComUtils/TimeStampAbs.h"
void wait(int seconds)
{
boost::this_thread::sleep(boost::posix_time::seconds(seconds));
}
boost::timed_mutex mutex;
void timed_mutex_func1()
{
for (int i = 0; i < 5; ++i)
{
double t1 = TimeStampAbs::getTimeStamp(TIME_STAMP_TYPE::ABS_TIME_STAMP);
boost::unique_lock<boost::timed_mutex> lock(mutex, boost::get_system_time() + boost::posix_time::milliseconds(100));
if (!lock.owns_lock())
{
printf("thread 1 owns_lock failed\n");
continue;
}
else
{
printf("thread 1 owns_lock success\n");
double t2 = TimeStampAbs::getTimeStamp(TIME_STAMP_TYPE::ABS_TIME_STAMP);
printf("thread 1 dt: %lf %lf %lf\n", t1, t2, t2 - t1);
}
wait(1);
std::cout << "Thread 1 " << boost::this_thread::get_id() << ": " << i <<std::endl;
}
}
void timed_mutex_func2()
{
for (int i = 0; i < 5; ++i)
{
double t1 = TimeStampAbs::getTimeStamp(TIME_STAMP_TYPE::ABS_TIME_STAMP);
boost::unique_lock<boost::timed_mutex> lock(mutex, boost::get_system_time() + boost::posix_time::milliseconds(1500));
if (!lock.owns_lock())
{
printf("thread 2 owns_lock failed\n");
continue;
}
else
{
printf("thread 2 owns_lock success\n");
double t2 = TimeStampAbs::getTimeStamp(TIME_STAMP_TYPE::ABS_TIME_STAMP);
printf("thread 2 dt: %lf %lf %lf\n", t1, t2, t2 - t1);
}
boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(40));
std::cout << "Thread 2 " << boost::this_thread::get_id() << ": " << i << std::endl;
}
}
int main()
{
boost::thread t(timed_mutex_func1);
boost::thread t2(timed_mutex_func2);
getchar();
return 0;
}
簡單來說boost::timed_mutex
就是進行加鎖時可以設置阻塞的時間,若超過對應時長,則返回false;如上述線程1最長等待時間 100ms, 線程2最長等待時間是 1500ms;

此案例細節可參考:How to correctly use boost::timed_mutex and scoped_lock