1. 遇到的問題
#include <iostream> #include <thread> #include <chrono> #include <future> #include <cmath> #include <vector> #include <cstdlib> using namespace std; class Counter { public: void addCount() { m_count++; } int count() const { return m_count; } Counter() : m_count(0) { } private: int m_count; }; int work(int a) { return a + a; } template<class Iter> void realWork(Counter& c, double &totalValue, Iter b, Iter e) { for (; b != e; ++b) { totalValue += work(*b); c.addCount(); } } int main() { unsigned n = std::thread::hardware_concurrency(); cout << n << " concurrent threads are support.\n"; vector<int> vec; double totalValue = 0; for (int i = 0; i < 10000000; i++) { vec.push_back(rand() % 100); } Counter counter; realWork(counter, totalValue, vec.begin(), vec.end()); cout << "total times: " << counter.count() << " " << totalValue << endl; totalValue = 0; Counter counter2; auto iter = vec.begin() + (vec.size() / 3); auto iter2 = vec.begin() + (vec.size() / 3 * 2); thread b([&counter2, &totalValue, iter, iter2](){ realWork(counter2, totalValue, iter, iter2); }); auto end = vec.end(); thread c([&counter2, &totalValue, iter2, end](){ realWork(counter2, totalValue, iter2, end); }); realWork(counter2, totalValue, vec.begin(), iter); b.join(); c.join(); cout << "total times use multithread: " << counter2.count() << " " << totalValue << endl; return 0; }
計算結果不一致!三個線程共享一份資源,有的加了有的沒加。
2. 解決
2.1 法一:不共享變量
#include <iostream> #include <thread> #include <chrono> #include <future> #include <cmath> #include <vector> #include <cstdlib> using namespace std; class Counter { public: void addCount() { m_count++; } int count() const { return m_count; } Counter() : m_count(0) { } private: int m_count; }; int work(int a) { return a + a; } template<class Iter> void realWork(Counter& c, double &totalValue, Iter b, Iter e) { for (; b != e; ++b) { totalValue += work(*b); c.addCount(); } } int main() { unsigned n = std::thread::hardware_concurrency(); cout << n << " concurrent threads are support.\n"; vector<int> vec; double totalValue = 0; for (int i = 0; i < 10000000; i++) { vec.push_back(rand() % 100); } Counter counter; realWork(counter, totalValue, vec.begin(), vec.end()); cout << "total times: " << counter.count() << " " << totalValue << endl; totalValue = 0; Counter counter2; auto iter = vec.begin() + (vec.size() / 3); auto iter2 = vec.begin() + (vec.size() / 3 * 2); double totalC = 0; thread b([&counter2, &totalValue, iter, iter2](){ realWork(counter2, totalValue, iter, iter2); }); auto end = vec.end(); thread c([&counter2, &totalC, iter2, end](){ realWork(counter2, totalC, iter2, end); }); double totalD = 0; realWork(counter2, totalD, vec.begin(), iter); b.join(); c.join(); cout << "total times use multithread: " << counter2.count() << " " << totalValue+totalC+totalD << endl; return 0; }
2.2 法二:原子操作變量類型(復雜,適合簡單應用)
b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣
count++: 寫入寄存器,寄存器+1,寫入內存
average()函數功能是如果Counter2不等於10000000,程序就不退出,如運行截圖,由於共享變量counter2, 導致counter2總是無法等於10000000
#include <iostream> #include <thread> #include <chrono> #include <future> #include <cmath> #include <vector> #include <cstdlib> #include <string> using namespace std; class Counter { public: void addCount() { m_count++; } int count() const { return m_count; } Counter() : m_count(0) { } private: int m_count; }; int work(int a) { return a + a; } template<class Iter> void realWork(Counter& c, double &totalValue, Iter b, Iter e) { for (; b != e; ++b) { totalValue += work(*b); c.addCount(); } } void printAll(int a, int b, int c) { cout << a << " " << b << " " << c << endl; } void add(int a, int b, int& c) { c = a + b; } void printString(const string& info, const string& info2) { cout << "hello " << info << " " << info2 << endl; } void testThreadInit() { int a = 3; int b = 4; int c = 5; thread t([=](){ printAll(a, b, c); }); t.join(); thread t2(printAll, a, b, c); t2.join(); thread t3([=, &c](){ add(a, b, c); }); t3.join(); cout << "after add: " << c << endl; // c是引用, 必須用 ref(c) c = 0; thread t4(add, a, b, std::ref(c)); t4.join(); cout << "after add: " << c << endl; string abc("abc"); string def("def"); thread t5([&](){ printString(abc, def); }); t5.join(); // 效率比引用低 thread t6(printString, abc, def); t6.join(); // cref: 常引用 thread t7(printString, cref(abc), cref(def)); t7.join(); } bool average(Counter& c, int maxCount) { auto cnt = c.count(); if (cnt == maxCount) { cout << " ok finished \n"; return true; } return false; } int main() { testThreadInit(); // (1) 如果沒有必要的話,線程間不要共享資源 unsigned n = std::thread::hardware_concurrency(); cout << n << " concurrent threads are support.\n"; vector<int> vec; double totalValue = 0; for (int i = 0; i < 10000000; i++) { vec.push_back(rand() % 100); } Counter counter; realWork(counter, totalValue, vec.begin(), vec.end()); cout << "total times: " << counter.count() << " " << totalValue << endl; totalValue = 0; Counter counter2; thread printCount([&counter2](){ while (!average(counter2, 10000000)) { } }); auto iter = vec.begin() + (vec.size() / 3); auto iter2 = vec.begin() + (vec.size() / 3 * 2); double totalC = 0; //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 thread b([&counter2, &totalValue, iter, iter2](){ realWork(counter2, totalValue, iter, iter2); }); auto end = vec.end(); thread c([&counter2, &totalC, iter2, end](){ realWork(counter2, totalC, iter2, end); }); double totalD = 0; realWork(counter2, totalD, vec.begin(), iter); b.join(); c.join(); auto realTotalCount = counter2.count(); totalValue += totalC + totalD; cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; printCount.join(); // (2) return 0; }
解決:原子操作變量
只需要把int m_count; 改成 atomic<int> m_count; 即可
#include <iostream> #include <thread> #include <chrono> #include <future> #include <atomic> #include <cmath> #include <vector> #include <cstdlib> #include <string> using namespace std; class Counter { public: void addCount() { m_count++; } int count() const { return m_count; } Counter() : m_count(0) { } private: // atomic_int m_count; atomic<int> m_count; }; int work(int a) { return a + a; } template<class Iter> void realWork(Counter& c, double &totalValue, Iter b, Iter e) { for (; b != e; ++b) { totalValue += work(*b); c.addCount(); } } bool average(Counter& c, int maxCount) { auto cnt = c.count(); if (cnt == maxCount) { cout << " ok finished \n"; return true; } return false; } int main() { // (1) 如果沒有必要的話,線程間不要共享資源 unsigned n = std::thread::hardware_concurrency(); cout << n << " concurrent threads are support.\n"; vector<int> vec; double totalValue = 0; for (int i = 0; i < 10000000; i++) { vec.push_back(rand() % 100); } Counter counter; realWork(counter, totalValue, vec.begin(), vec.end()); cout << "total times: " << counter.count() << " " << totalValue << endl; totalValue = 0; Counter counter2; thread printCount([&counter2](){ while (!average(counter2, 10000000)) { } }); auto iter = vec.begin() + (vec.size() / 3); auto iter2 = vec.begin() + (vec.size() / 3 * 2); double totalC = 0; //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 thread b([&counter2, &totalValue, iter, iter2](){ realWork(counter2, totalValue, iter, iter2); }); auto end = vec.end(); thread c([&counter2, &totalC, iter2, end](){ realWork(counter2, totalC, iter2, end); }); double totalD = 0; realWork(counter2, totalD, vec.begin(), iter); b.join(); c.join(); auto realTotalCount = counter2.count(); totalValue += totalC + totalD; cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; printCount.join(); return 0; }
3. 新需求
兩個變量,其中第一個變量變化,另一個還沒來得及變化,另一個線程又變化了第一個變量
4. 解決:臨界區--mutex
4.1 核心部分
void lockMutex() { m_mutex.lock(); } void unlockMutex() { m_mutex.unlock(); }
c.lockMutex();
c.addCount(); c.addResource(1); c.unlockMutex();
完整代碼:(不是非常好的寫法)
#include <iostream> #include <thread> #include <chrono> #include <future> #include <atomic> #include <cmath> #include <vector> #include <cstdlib> #include <string> #include <mutex> using namespace std; class Counter { public: void addCount() { m_count++; } int count() const { return m_count; } Counter() : m_count(0) { } void addResource(int a) { m_totalResource++; } int aveResource() { if (m_count == 0) return 1; return m_totalResource / m_count; } void lockMutex() { m_mutex.lock(); } void unlockMutex() { m_mutex.unlock(); } private: // atomic_int m_count; atomic<int> m_count; atomic<int> m_totalResource; mutex m_mutex; }; int work(int a) { return a + a; } template<class Iter> void realWork(Counter& c, double &totalValue, Iter b, Iter e) { for (; b != e; ++b) { totalValue += work(*b); c.lockMutex(); c.addCount(); c.addResource(1); c.unlockMutex(); } } bool average(Counter& c, int maxCount) { auto cnt = c.count(); c.lockMutex(); auto ave = c.aveResource(); if (ave != 1) cout << "has bad thing happened\n"; c.unlockMutex(); if (cnt == maxCount) { cout << " ok finished \n"; return true; } return false; } int main() { // (1) 如果沒有必要的話,線程間不要共享資源 unsigned n = std::thread::hardware_concurrency(); cout << n << " concurrent threads are support.\n"; vector<int> vec; double totalValue = 0; for (int i = 0; i < 10000000; i++) { vec.push_back(rand() % 100); } Counter counter; realWork(counter, totalValue, vec.begin(), vec.end()); cout << "total times: " << counter.count() << " " << totalValue << endl; totalValue = 0; Counter counter2; thread printCount([&counter2](){ while (!average(counter2, 10000000)) { } }); auto iter = vec.begin() + (vec.size() / 3); auto iter2 = vec.begin() + (vec.size() / 3 * 2); double totalC = 0; //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 thread b([&counter2, &totalValue, iter, iter2](){ realWork(counter2, totalValue, iter, iter2); }); auto end = vec.end(); thread c([&counter2, &totalC, iter2, end](){ realWork(counter2, totalC, iter2, end); }); double totalD = 0; realWork(counter2, totalD, vec.begin(), iter); b.join(); c.join(); auto realTotalCount = counter2.count(); totalValue += totalC + totalD; cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; printCount.join(); return 0; }
注意:使用臨界區,可能會發生死鎖
4.2 將鎖寫到接口里
#include <iostream> #include <thread> #include <chrono> #include <future> #include <atomic> #include <cmath> #include <vector> #include <cstdlib> #include <string> #include <mutex> using namespace std; class Counter { public: Counter() : m_count(0), m_totalResource(0) {} int count() { m_mutex.lock(); auto r = m_count; m_mutex.unlock(); return r; } int aveResource() { m_mutex.lock(); if (m_count == 0) { m_mutex.unlock(); return 1; } auto r = m_totalResource / m_count; m_mutex.unlock(); return r; } void addCoundAndResouce(int r) { m_mutex.lock(); addCount(); addResource(r); m_mutex.unlock(); } private: // atomic_int m_count; void addResource(int a) { m_totalResource++; } void addCount() { m_count++; } int m_count; int m_totalResource; mutex m_mutex; }; int work(int a) { return a + a; } template<class Iter> void realWork(Counter& c, double &totalValue, Iter b, Iter e) { for (; b != e; ++b) { totalValue += work(*b); c.addCoundAndResouce(1); } } bool average(Counter& c, int maxCount) { auto cnt = c.count(); auto ave = c.aveResource(); if (ave != 1) cout << "has bad thing happened\n"; if (cnt == maxCount) { cout << " ok finished \n"; return true; } return false; } int main() { // (1) 如果沒有必要的話,線程間不要共享資源 unsigned n = std::thread::hardware_concurrency(); cout << n << " concurrent threads are support.\n"; vector<int> vec; double totalValue = 0; for (int i = 0; i < 10000000; i++) { vec.push_back(rand() % 100); } Counter counter; realWork(counter, totalValue, vec.begin(), vec.end()); cout << "total times: " << counter.count() << " " << totalValue << endl; totalValue = 0; Counter counter2; thread printCount([&counter2](){ while (!average(counter2, 10000000)) { } }); auto iter = vec.begin() + (vec.size() / 3); auto iter2 = vec.begin() + (vec.size() / 3 * 2); double totalC = 0; //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 thread b([&counter2, &totalValue, iter, iter2](){ realWork(counter2, totalValue, iter, iter2); }); auto end = vec.end(); thread c([&counter2, &totalC, iter2, end](){ realWork(counter2, totalC, iter2, end); }); double totalD = 0; realWork(counter2, totalD, vec.begin(), iter); b.join(); c.join(); auto realTotalCount = counter2.count(); totalValue += totalC + totalD; cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; printCount.join(); return 0; }
如果要把 count()設置成const
class Counter { public: Counter() : m_count(0), m_totalResource(0) {} int count() const { m_mutex.lock(); auto r = m_count; m_mutex.unlock(); return r; } int aveResource() { m_mutex.lock(); if (m_count == 0) { m_mutex.unlock(); return 1; } auto r = m_totalResource / m_count; m_mutex.unlock(); return r; } void addCoundAndResouce(int r) { m_mutex.lock(); addCount(); addResource(r); m_mutex.unlock(); } private: // atomic_int m_count; void addResource(int a) { m_totalResource++; } void addCount() { m_count++; } int m_count; int m_totalResource; mutable mutex m_mutex; };
將mutex設置成mutable類型
4.3 自定義lock類
#include <iostream> #include <thread> #include <chrono> #include <future> #include <atomic> #include <cmath> #include <vector> #include <cstdlib> #include <string> #include <mutex> using namespace std; template<typename T> class Lock{ public: Lock(T& mutex) : m_mutex(mutex) { m_mutex.lock(); } ~Lock() { m_mutex.unlock(); } private: T& m_mutex; }; class Counter { public: Counter() : m_count(0), m_totalResource(0) {} int count() const { Lock<mutex> lock(m_mutex); return m_count; } int aveResource() { Lock<mutex> lock(m_mutex); if (m_count == 0) { return 1; } return m_totalResource / m_count; } void addCoundAndResouce(int r) { Lock<mutex> lock(m_mutex); addCount(); addResource(r); m_mutex.unlock(); } private: // atomic_int m_count; void addResource(int a) { m_totalResource++; } void addCount() { m_count++; } int m_count; int m_totalResource; mutable mutex m_mutex; }; int work(int a) { return a + a; } template<class Iter> void realWork(Counter& c, double &totalValue, Iter b, Iter e) { for (; b != e; ++b) { totalValue += work(*b); c.addCoundAndResouce(1); } } bool average(Counter& c, int maxCount) { auto cnt = c.count(); auto ave = c.aveResource(); if (ave != 1) cout << "has bad thing happened\n"; if (cnt == maxCount) { cout << " ok finished \n"; return true; } return false; } int main() { // (1) 如果沒有必要的話,線程間不要共享資源 unsigned n = std::thread::hardware_concurrency(); cout << n << " concurrent threads are support.\n"; vector<int> vec; double totalValue = 0; for (int i = 0; i < 100000; i++) { vec.push_back(rand() % 100); } Counter counter; realWork(counter, totalValue, vec.begin(), vec.end()); cout << "total times: " << counter.count() << " " << totalValue << endl; totalValue = 0; Counter counter2; thread printCount([&counter2](){ while (!average(counter2, 100000)) { } }); auto iter = vec.begin() + (vec.size() / 3); auto iter2 = vec.begin() + (vec.size() / 3 * 2); double totalC = 0; //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 thread b([&counter2, &totalValue, iter, iter2](){ realWork(counter2, totalValue, iter, iter2); }); auto end = vec.end(); thread c([&counter2, &totalC, iter2, end](){ realWork(counter2, totalC, iter2, end); }); double totalD = 0; realWork(counter2, totalD, vec.begin(), iter); b.join(); c.join(); auto realTotalCount = counter2.count(); totalValue += totalC + totalD; cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; printCount.join(); return 0; }
4.4 STL中的lock_guard
-
上述自定義lock換成lock_guard
-
lock_guard更靈活
class Counter { public: Counter() : m_count(0), m_totalResource(0) {} int count() const { lock_guard<mutex> lock(m_mutex); return m_count; } int aveResource() { lock_guard<mutex> lock(m_mutex); if (m_count == 0) { return 1; } return m_totalResource / m_count; } void addCoundAndResouce(int r) { lock_guard<mutex> lock(m_mutex); addCount(); addResource(r); m_mutex.unlock(); } private: // atomic_int m_count; void addResource(int a) { m_totalResource++; } void addCount() { m_count++; } int m_count; int m_totalResource; mutable mutex m_mutex; };
4.5 死鎖
alice往bob賬戶轉錢,線程1被鎖;同時bob也往alice賬戶轉錢,線程2被鎖 ===》產生死鎖
4.6 lock_guard解決死鎖方案
- lock(.....): 一口氣將里面的臨界體mutex都鎖住
- lock_guard<mutex> locka(a.mutex, adopt_lock): 告訴已經鎖住了,只需要析構的時候解鎖一下