關於多線程的三兩事


多線程一直是編程中的重要的工具,它可以分充分的利用硬件資源,是我們用更少的時間去完成更多的事情。在之前的博客中,我有介紹了OpenMP的基本使用,OpenMP可以理解為多線程的一個合理和高效的一套抽象工具。這次,打算仔細的介紹多線程編程中的常見的概念和典型的案例。

典型的案例

說到多線程,最核心的問題就是保證數據的讀寫安全。為了達到此目的,我們需要多很多常見的數據結構做一些改造,從而適應多線程的場景。以下是我工作中比較常見到的一些使用場景:

  1. 線程池
  2. 讀寫鎖
  3. 消息隊列
  4. ConcurrentCache
  5. PingPang Buffer

在具體介紹這些使用場景之前,我們還是需要了解需要使用到的一些基本的工具:互斥量、條件變量、原子操作等。

互斥量

互斥量,顧名思義,就是互斥的數據,一個線程持有的時候,其他線程就必須等待。

在C++11中,使用<mutex>頭文件引入。以下是一個簡單的計數器的實現。

emit函數通過mutex_進行加鎖,使得同一時間僅有一個線程可以執行++ x_的操作,從而保證了計數的正確性。

std::lock_guard是個工具類,lck在構造時,調用了lock函數,在析構時調用了unlock,從而避免我們自行調用的時候忘記unlock。

#include <mutex> #include <thread> #include <iostream> class Counter { public: Counter(): x_(0) {} void emit() { mutex_.lock(); ++ x_; mutex_.unlock(); // or // std::lock_guard<std::mutex> lck(mutex_); // ++ x_; } int count() { return x_; } private: int x_; std::mutex mutex_; }; int main() { Counter c; std::thread t1([&c]{ for (int i = 0; i < 10000000; ++ i) { c.emit(); } }); std::thread t2([&c]{ for (int i = 0; i < 10000000; ++ i) { c.emit(); } }); t1.join(); t2.join(); std::cout << c.count() << std::endl; // 20000000 }

基於Mutex,我們可以方便的實現讀寫鎖。讀寫鎖的作用是,保證數據可以供多個線程並發讀,僅一個線程允許寫。在存在線程讀的情況下,寫的線程會阻塞,直到沒有任何線程有讀操作。

讀寫鎖

首先讀寫鎖會存在一個write_mutex,讀線程和寫線程都需要搶占這個mutex,從而保證讀和寫不會同時進行。但是只需要第一個讀線程搶占write_mutex即可,其他的讀線程不需要再搶占(搶占的話,就不支持並發讀了)。當不存在讀線程的時候,需要釋放write_mutex,這才運行寫線程搶占。

因此我們還需要一個計數器,記錄當前讀線程的個數,並使用另一個read_mutex保證計數器的准確。

#include <mutex> #include <thread> #include <iostream> #include <vector> class ReadWriteLock { public: ReadWriteLock():reader_count_(0) {} void lock_read() { read_mutex_.lock(); if (reader_count_ == 0) { write_mutex_.lock(); } ++ reader_count_; read_mutex_.unlock(); } void unlock_read() { read_mutex_.lock(); -- reader_count_; if (reader_count_ == 0) { write_mutex_.unlock(); } read_mutex_.unlock(); } void lock_write() { write_mutex_.lock(); } void unlock_write() { write_mutex_.unlock(); } private: std::mutex read_mutex_; std::mutex write_mutex_; int64_t reader_count_; }; ReadWriteLock rw_lock; void read_fn(int idx, int start, int end) { std::this_thread::sleep_for(std::chrono::seconds(start)); rw_lock.lock_read(); std::cout << "read thread #" << idx << ": read data" << std::endl; std::this_thread::sleep_for (std::chrono::seconds(end - start)); std::cout << "read thread #" << idx << ": read over" << std::endl; rw_lock.unlock_read(); } void write_fn(int idx, int start, int end) { std::this_thread::sleep_for(std::chrono::seconds(start)); rw_lock.lock_write(); std::cout << "write thread #" << idx << ": write data" << std::endl; std::this_thread::sleep_for (std::chrono::seconds(end - start)); std::cout << "write thread #" << idx << ": write over" << std::endl; rw_lock.unlock_write(); } int main() { std::vector<std::thread> threads; threads.push_back(std::thread([](){read_fn(1, 0, 3);})); threads.push_back(std::thread([](){read_fn(2, 2, 4);})); threads.push_back(std::thread([](){read_fn(3, 6, 10);})); threads.push_back(std::thread([](){write_fn(1, 1, 4);})); threads.push_back(std::thread([](){write_fn(2, 5, 7);})); for (auto &&t : threads) { t.join(); } } // output // read thread #1: read data // read thread #2: read data // read thread #1: read over // read thread #2: read over // write thread #1: write data // write thread #1: write over // write thread #2: write data // write thread #2: write over // read thread #3: read data // read thread #3: read over

可以看到讀線程1和2同時進行的讀操作,而寫線程1在讀線程結束之后才進行。

條件變量

條件變量主要作用是是線程間進行通信,用於一個線程告知其他線程當前的狀態。

一般可以用於控制線程的執行順序,告知其他線程資源是否可用等。

條件變量的使用需要搭配互斥量。

#include <mutex> #include <iostream> #include <thread> #include <condition_variable> std::mutex mutex_; std::condition_variable cv_; bool ready_ = false; void print_id(int id) { std::unique_lock<std::mutex> lck(mutex_); while (!ready_) { cv_.wait(lck); } std::cout << "thread -- " << id << std::endl; } void go() { std::unique_lock<std::mutex> lck(mutex_); ready_ = true; cv_.notify_all(); } int main() { std::thread threads[10]; // spawn 10 threads: for (int i = 0; i < 10; ++i) { threads[i] = std::thread(print_id, i); } std::cout << "10 threads ready to race...\n"; go(); // go! for (auto &th : threads) { th.join(); } return 0; }

這里使用std::unique_lock來管理互斥量。相對於lock_guardunique_lock的功能更豐富,可以通過它來對mutex進行lockunlock,具體的使用可以查看相關的文檔。

condition_variable通過wait操作,可以等待喚醒。wait操作有兩個行為:

  1. 將當前線程加入條件變量的等待隊列
  2. 釋放鎖

喚醒條件變量的方法有兩個:notify_onenotify_all,分別喚醒一個和所有的線程。

當一個wait中的線程被喚醒時,它會搶占住mutex,因此后續的操作均是線程安全的。

為什么condition_variable需要一個mutex呢?

  1. 一方面是有些變量的訪問,我們需要保證它的互斥性,比如這里的ready_字段
  2. 保證wait的兩個操作(等待和鎖釋放)是原子的。

可以參考下面這篇文章:

C++面試問題:為什么條件變量要和互斥鎖一起使用?

那么使用條件變量,我們可以創造哪些有意思的工具呢?阻塞隊列就是一個巧妙的應用。

BlockQueue

阻塞隊列是一種非常常見的數據結構,它允許一個或多個生產者向Queue中寫入數據,如果Queue滿了,則阻塞住。允許一個或多個消費者讀取Queue的數據,如果Queue為空,則一直阻塞直至Queue中有數據。

根據BlockQueue的兩種阻塞行為,我們可以大膽的推測,這里可以用兩個條件變量,分別控制寫入阻塞和讀取阻塞。

#include <deque> #include <mutex> #include <condition_variable> template<typename TaskType> class BlockQueue { public: BlockQueue(size_t capacity): capacity_(capacity) {} size_t capacity() { std::lock_guard<std::mutex> lck(this->mutex_); return this->capacity_; } size_t size() { std::lock_guard<std::mutex> lck(this->mutex_); return this->task_queue_.size(); } void push(TaskType *task) { std::unique_lock<std::mutex> lck(this->mutex_); while (this->task_queue_.size() >= this->capacity_) { this->full_cv_.wait(lck); } this->task_queue_.push_back(task); this->empty_cv_.notify_all(); } void get(TaskType **task) { std::unique_lock<std::mutex> lck(this->mutex_); while (this->task_queue_.empty()) { this->empty_cv_.wait(lck); } *task = task_queue_.front(); task_queue_.pop_front(); this->full_cv_.notify_all(); } private: std::deque<TaskType *> task_queue_; size_t capacity_; std::mutex mutex_; std::condition_variable full_cv_; std::condition_variable empty_cv_; };

上述的例子,如果將wait改為wait_for的話,還可以方便的實現帶timeout的BlockQueue,感興趣的同學可以自己嘗試一下。

原子類型與原子操作

C++中的原子類型的定義和使用十分簡單。僅需要包含頭文件<atomic>即可。使用std::atomic<T>的方式即可構造原子類型的變量。

#include <atomic> std::atomic<int32_t> i32_count; std::atomic<uint64_t> u64_count;

針對原子類型的變量,有許多的操作可用。最常用到的就是++用來計數。比如我們前面的使用mutex完成計數器的例子,其實使用原子類型會更加的簡單和高效。

#include <atomic> class Counter { public: Counter(): x_(0) {} void emit() { ++ x_; } int count() { return x_; } private: std::atomic<int> x_; };

以下是具體的幾個方法:

函數 功能
store 用非原子對象替換當前對象的值。相等於線程安全的=操作
load 原子地獲取原子對象的值
fetch_add/fetch_sub 原子地對原子做加減操作,返回操作之前的值
+= / -= 同上
fetch_and/fetch_or/fetch_xor 原子地對原子對象做與/或/異或地操作,返回操作之前的值
&= / |= / ^= 同上

另外,atomic類型的函數可以指定memory_order參數,用於約束atomic類型數據在多線程中的視圖。感興趣可以看這篇文章:https://zhuanlan.zhihu.com/p/31386431

一般我們使用默認的memory_order就已經足夠了。

之后我們再介紹一個復雜但十分有用的原子操作:CAS(Compare And Swap)

看名字就知道,他的作用是,比較兩個值,如果相同就交換。

百度上給了一個比較直觀的解釋:

  • compare and swap,解決多線程並行情況下使用鎖造成性能損耗的一種機制,CAS操作包含三個操作數——內存位置(V)、預期原值(A)和新值(B)。如果內存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。

通過CAS操作,我們可以方便的實現無鎖的線程安全隊列:

#include <atomic> class Node { public: Node(int val): val_(val), next_(nullptr) {} public: int val_; class Node *next_; }; void push(std::atomic<Node *> &head, Node *new_node) { new_node->next_ = head; while (head.compare_exchange_weak(new_node->next_, new_node)); } int main() { std::atomic<Node *> head; Node *new_node = new Node(100); push(head, new_node); }

當我們插入一個節點的時候,首先嘗試加入它,也就是new_node->next_ = head; 然后如果head沒有變化的話,那么就更新head為我們新的節點,如果變化的話就不斷重試。也就是while (head.compare_exchange_weak(new_node->next_, new_node)); 的邏輯。

上面這個例子是所有的CAS介紹都會說到的,可以非常容易地幫助我們理解CAS地功能,但是對於POP操作,並不好實現。

另外其實還存在一個ABA地問題,需要解決。這里就不展開了。感興趣地可以搜一下相關的資料,這里僅做簡單地介紹。

其他

最后我們看幾個非常有意思地設計。

PingPang Buffer

PingPang Buffer也被稱為雙Buffer。它的核心是這樣地,由於一些系統配置需要不斷地更新,而更新地過程中也會被不斷地讀取。如果使用之前的讀寫鎖,就可能永遠都更新不了(讀線程一直占着鎖),同時線程同步也是非常低效地一個過程。然后就誕生了PingPang Buffer這么個結構。

它的核心是有兩塊內存,一塊用來給所有線程進行讀操作,另一塊用來給寫線程進行更新,在更新完畢之后,交換這兩個內存。新的內存變成了讀內存,舊內存變成了寫內存。

以下是一個簡單的實現,和網上的其他版本可能略有不同,看思路即可。

#include <atomic> #include <memory> template<typename T> class PingPangBuffer { public: PingPangBuffer(std::shared_ptr<T> read_buffer, std::shared_ptr<T> write_buffer) { data_[0] = read_buffer; data_[1] = write_buffer; read_idx_ = 0; } std::shared_ptr<T> read_data() { return data_[read_idx_]; } std::shared_ptr<T> write_data() { int write_idx = 1 - read_idx_; while (data_[write_idx].use_count() <= 1) { // sleep 1s return data_[write_idx]; } } bool update() { read_idx_ = 1 - read_idx_; } private: std::shared_ptr<T> data_[2]; std::atomic<int> read_idx_; }; 

這里read_data函數被多個讀線程去調用。而write_dataupdate只有一個寫線程進行調用。

使用一個read_idx_記錄讀的Buffer的下標(似乎沒有必要是atomic的?),那么交換讀寫Buffer的操作就可以簡化為read_idx_ = 1 - read_idx_ 。不過下標切換之后,切換之前的讀線程還在讀舊數據。

而獲取寫數據的操作需要等待當前Buffer不再被使用了才可以再次被使用(反正早晚它都是可以被使用的),這里就直接使用了shared_ptruse_count

線程安全的LRUCache

一般Cache是使用std::unordered_map來實現的。和前面的讀寫鎖類似,map支持多線程的讀,但是僅支持單線程寫入。這就會造成這個map的寫入性能可能會較差。因此這里一般采用分shard的方式進行庫的拆分。

一個簡單的實現,先根據key分shard,然后每個分片都使用讀寫鎖。(多線程的測試不太好寫,這里只測試了過期時間和容量)

#include <mutex> #include <thread> #include <iostream> #include <chrono> #include <vector> #include <list> #include <unordered_map> // 讀寫鎖,就是前面原封不動的代碼 class ReadWriteLock { public: ReadWriteLock():reader_count_(0) {} void lock_read() { read_mutex_.lock(); if (reader_count_ == 0) { write_mutex_.lock(); } ++ reader_count_; read_mutex_.unlock(); } void unlock_read() { read_mutex_.lock(); -- reader_count_; if (reader_count_ == 0) { write_mutex_.unlock(); } read_mutex_.unlock(); } void lock_write() { write_mutex_.lock(); } void unlock_write() { write_mutex_.unlock(); } private: std::mutex read_mutex_; std::mutex write_mutex_; int64_t reader_count_; }; template<typename KeyType, typename ValType> class ConcurrentLRUCache { public: class Node { public: Node(const KeyType& key, const ValType& val, size_t time_ms): key_(key), val_(val), time_ms_(time_ms) {} KeyType key_; ValType val_; size_t time_ms_; }; using node_iter_type = typename std::list<Node>::iterator; public: ConcurrentLRUCache(size_t capacity, size_t shard, size_t expire_time /* ms */) { capacity_ = capacity; shard_ = shard; capacity_per_cache_ = capacity_ / shard_; expire_time_ = expire_time; cache_shard_list_.resize(shard_); node_data_list_shard_list_.resize(shard_); } bool get(const KeyType& key, ValType& val) { auto &cache = cache_shard_list_[get_shard_idx(key)]; rw_lock_.lock_read(); bool ok = false; do { auto iter = cache.find(key); if (iter == cache.end()) { // not found break; } size_t cur_ms = get_cur_time_ms(); size_t record_ms = iter->second->time_ms_; if (cur_ms - record_ms > expire_time_) { // found but expired break; } val = iter->second->val_; ok = true; } while (0); rw_lock_.unlock_read(); return ok; } void set(const KeyType& key, ValType& val) { size_t shard_idx = get_shard_idx(key); auto &cache = cache_shard_list_[shard_idx]; auto &data_list = node_data_list_shard_list_[shard_idx]; rw_lock_.lock_write(); do { // when found, del the older auto iter = cache.find(key); if (iter != cache.end()) { data_list.erase(iter->second); cache.erase(iter); } // when cache full, del the oldest while (cache.size() >= capacity_per_cache_) { cache.erase(data_list.front().key_); data_list.pop_front(); } size_t cur_ms = get_cur_time_ms(); data_list.emplace_back(key, val, cur_ms); cache[key] = --data_list.end(); } while (0); rw_lock_.unlock_write(); } private: static size_t get_cur_time_ms() { return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); } size_t get_shard_idx(const KeyType& key) { static std::hash<KeyType> hash_func; return hash_func(key) % shard_; } ReadWriteLock rw_lock_; size_t capacity_; size_t shard_; size_t capacity_per_cache_; size_t expire_time_; std::vector<std::unordered_map<KeyType, node_iter_type>> cache_shard_list_; std::vector<std::list<Node>> node_data_list_shard_list_; }; int main() { ConcurrentLRUCache<int, int> cache(20, 2, 1000 /* 1s */); for (int i = 0; i < 20; ++ i) { cache.set(i, i); std::cout << "set: (" << i << ", " << i << ") " << std::endl; } std::this_thread::sleep_for(std::chrono::milliseconds(500)); for (int i = 20; i < 30; ++ i) { cache.set(i, i); std::cout << "set: (" << i << ", " << i << ") " << std::endl; } // 此時0-9已經被覆蓋(容量),10-19已經過去500ms,20-29是最新時間 for (int i = 0; i < 30; ++ i) { int data = -1; bool is_ok = cache.get(i, data); // 這里只有10-29被查到了 std::cout << "get: (" << i << ", " << data << ") " << is_ok << std::endl; } // 總共過去800ms,10-29都沒過期 std::this_thread::sleep_for(std::chrono::milliseconds(300)); for (int i = 0; i < 30; ++ i) { int data = -1; bool is_ok = cache.get(i, data); // 只有20-29被查到 std::cout << "get: (" << i << ", " << data << ") " << is_ok << std::endl; } // 總共過去1100ms,20-29沒過期 std::this_thread::sleep_for(std::chrono::milliseconds(300)); for (int i = 0; i < 30; ++ i) { int data = -1; bool is_ok = cache.get(i, data); // 20-29 std::cout << "get: (" << i << ", " << data << ") " << is_ok << std::endl; } }

寫在最后

知識的總結一直是一件令人愉悅的事情,時隔1年多又一次撿起技術博客。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM