多線程一直是編程中的重要的工具,它可以分充分的利用硬件資源,是我們用更少的時間去完成更多的事情。在之前的博客中,我有介紹了OpenMP的基本使用,OpenMP可以理解為多線程的一個合理和高效的一套抽象工具。這次,打算仔細的介紹多線程編程中的常見的概念和典型的案例。
典型的案例
說到多線程,最核心的問題就是保證數據的讀寫安全。為了達到此目的,我們需要多很多常見的數據結構做一些改造,從而適應多線程的場景。以下是我工作中比較常見到的一些使用場景:
- 線程池
- 讀寫鎖
- 消息隊列
- ConcurrentCache
- PingPang Buffer
在具體介紹這些使用場景之前,我們還是需要了解需要使用到的一些基本的工具:互斥量、條件變量、原子操作等。
互斥量
互斥量,顧名思義,就是互斥的數據,一個線程持有的時候,其他線程就必須等待。
在C++11中,使用<mutex>
頭文件引入。以下是一個簡單的計數器的實現。
emit
函數通過mutex_
進行加鎖,使得同一時間僅有一個線程可以執行++ x_
的操作,從而保證了計數的正確性。
std::lock_guard
是個工具類,lck在構造時,調用了lock函數,在析構時調用了unlock,從而避免我們自行調用的時候忘記unlock。
#include <mutex> #include <thread> #include <iostream> class Counter { public: Counter(): x_(0) {} void emit() { mutex_.lock(); ++ x_; mutex_.unlock(); // or // std::lock_guard<std::mutex> lck(mutex_); // ++ x_; } int count() { return x_; } private: int x_; std::mutex mutex_; }; int main() { Counter c; std::thread t1([&c]{ for (int i = 0; i < 10000000; ++ i) { c.emit(); } }); std::thread t2([&c]{ for (int i = 0; i < 10000000; ++ i) { c.emit(); } }); t1.join(); t2.join(); std::cout << c.count() << std::endl; // 20000000 }
基於Mutex,我們可以方便的實現讀寫鎖。讀寫鎖的作用是,保證數據可以供多個線程並發讀,僅一個線程允許寫。在存在線程讀的情況下,寫的線程會阻塞,直到沒有任何線程有讀操作。
讀寫鎖
首先讀寫鎖會存在一個write_mutex,讀線程和寫線程都需要搶占這個mutex,從而保證讀和寫不會同時進行。但是只需要第一個讀線程搶占write_mutex即可,其他的讀線程不需要再搶占(搶占的話,就不支持並發讀了)。當不存在讀線程的時候,需要釋放write_mutex,這才運行寫線程搶占。
因此我們還需要一個計數器,記錄當前讀線程的個數,並使用另一個read_mutex保證計數器的准確。
#include <mutex> #include <thread> #include <iostream> #include <vector> class ReadWriteLock { public: ReadWriteLock():reader_count_(0) {} void lock_read() { read_mutex_.lock(); if (reader_count_ == 0) { write_mutex_.lock(); } ++ reader_count_; read_mutex_.unlock(); } void unlock_read() { read_mutex_.lock(); -- reader_count_; if (reader_count_ == 0) { write_mutex_.unlock(); } read_mutex_.unlock(); } void lock_write() { write_mutex_.lock(); } void unlock_write() { write_mutex_.unlock(); } private: std::mutex read_mutex_; std::mutex write_mutex_; int64_t reader_count_; }; ReadWriteLock rw_lock; void read_fn(int idx, int start, int end) { std::this_thread::sleep_for(std::chrono::seconds(start)); rw_lock.lock_read(); std::cout << "read thread #" << idx << ": read data" << std::endl; std::this_thread::sleep_for (std::chrono::seconds(end - start)); std::cout << "read thread #" << idx << ": read over" << std::endl; rw_lock.unlock_read(); } void write_fn(int idx, int start, int end) { std::this_thread::sleep_for(std::chrono::seconds(start)); rw_lock.lock_write(); std::cout << "write thread #" << idx << ": write data" << std::endl; std::this_thread::sleep_for (std::chrono::seconds(end - start)); std::cout << "write thread #" << idx << ": write over" << std::endl; rw_lock.unlock_write();