深入學習c++--多線程編程(二)【當線程間需要共享非const資源】


1. 遇到的問題

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <cmath> 
#include <vector>
#include <cstdlib>
using namespace std; 

class Counter 
{
public:
    void addCount() {
        m_count++;
    }
    int count() const { return m_count; }
    Counter() : m_count(0) {    }
private:
    int m_count;
};

int work(int a)
{
    return a + a;
}

template<class Iter>
void realWork(Counter& c, double &totalValue, Iter b, Iter e)
{
    for (; b != e; ++b)
    {
        totalValue += work(*b);
        c.addCount();
    }
}

int main()
{
    unsigned n = std::thread::hardware_concurrency();
    cout << n << " concurrent threads are support.\n";
    
    vector<int> vec;
    double totalValue = 0;
    for (int i = 0; i < 10000000; i++)
    {
        vec.push_back(rand() % 100);
    }
    Counter counter;
    realWork(counter, totalValue, vec.begin(), vec.end());
    cout << "total times: " << counter.count() << " " << totalValue << endl;


    totalValue = 0;
    Counter counter2;
    auto iter = vec.begin() + (vec.size() / 3);
    auto iter2 = vec.begin() + (vec.size() / 3 * 2);
    thread b([&counter2, &totalValue, iter, iter2](){
        realWork(counter2, totalValue, iter, iter2);
    });
    auto end = vec.end();
    thread c([&counter2, &totalValue, iter2, end](){
        realWork(counter2, totalValue, iter2, end);
    });

    realWork(counter2, totalValue, vec.begin(), iter);

    b.join();
    c.join();
    cout << "total times use multithread: " << counter2.count() << " " << totalValue << endl; 

    return 0;
}

計算結果不一致!三個線程共享一份資源,有的加了有的沒加。

2. 解決

2.1 法一:不共享變量

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <cmath> 
#include <vector>
#include <cstdlib>
using namespace std; 

class Counter 
{
public:
    void addCount() {
        m_count++;
    }
    int count() const { return m_count; }
    Counter() : m_count(0) {    }
private:
    int m_count;
};

int work(int a)
{
    return a + a;
}

template<class Iter>
void realWork(Counter& c, double &totalValue, Iter b, Iter e)
{
    for (; b != e; ++b)
    {
        totalValue += work(*b);
        c.addCount();
    }
}

int main()
{
    unsigned n = std::thread::hardware_concurrency();
    cout << n << " concurrent threads are support.\n";
    
    vector<int> vec;
    double totalValue = 0;
    for (int i = 0; i < 10000000; i++)
    {
        vec.push_back(rand() % 100);
    }
    Counter counter;
    realWork(counter, totalValue, vec.begin(), vec.end());
    cout << "total times: " << counter.count() << " " << totalValue << endl;


    totalValue = 0;
    Counter counter2;
    auto iter = vec.begin() + (vec.size() / 3);
    auto iter2 = vec.begin() + (vec.size() / 3 * 2);
    double totalC = 0;
    thread b([&counter2, &totalValue, iter, iter2](){
        realWork(counter2, totalValue, iter, iter2);
    });
    auto end = vec.end();
    thread c([&counter2, &totalC, iter2, end](){
        realWork(counter2, totalC, iter2, end);
    });

    double totalD = 0;
    realWork(counter2, totalD, vec.begin(), iter);

    b.join();
    c.join();
    cout << "total times use multithread: " << counter2.count() << " " << totalValue+totalC+totalD << endl; 

    return 0;
}

2.2 法二:原子操作變量類型(復雜,適合簡單應用)

b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 

count++: 寫入寄存器,寄存器+1,寫入內存

average()函數功能是如果Counter2不等於10000000,程序就不退出,如運行截圖,由於共享變量counter2, 導致counter2總是無法等於10000000

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <cmath> 
#include <vector>
#include <cstdlib>
#include <string>
using namespace std; 

class Counter 
{
public:
    void addCount() {
        m_count++;
    }
    int count() const { return m_count; }
    Counter() : m_count(0) {    }
private:
    int m_count;
};

int work(int a)
{
    return a + a;
}

template<class Iter>
void realWork(Counter& c, double &totalValue, Iter b, Iter e)
{
    for (; b != e; ++b)
    {
        totalValue += work(*b);
        c.addCount();
    }
}

void printAll(int a, int b, int c)
{
    cout << a << " " << b << " " << c << endl;
}

void add(int a, int b, int& c)
{
    c = a + b;
}

void printString(const string& info, const string& info2)
{
    cout << "hello " << info << " " << info2 << endl;
}

void testThreadInit()
{
    int a = 3;
    int b = 4;
    int c = 5;
    thread t([=](){
        printAll(a, b, c);
    });
    t.join();

    thread t2(printAll, a, b, c);
    t2.join();

    thread t3([=, &c](){
        add(a, b, c);
    });
    t3.join();
    cout << "after add: " << c << endl;

    // c是引用, 必須用 ref(c)
    c = 0;
    thread t4(add, a, b, std::ref(c));
    t4.join();
    cout << "after add: " << c << endl;

    string abc("abc");
    string def("def");

    thread t5([&](){
        printString(abc, def);
    });
    t5.join();

    // 效率比引用低
    thread t6(printString, abc, def);
    t6.join();

    // cref: 常引用
    thread t7(printString, cref(abc), cref(def));
    t7.join();

}

bool average(Counter& c, int maxCount)
{
    auto cnt = c.count();
    if (cnt == maxCount) {
        cout << " ok finished \n";
        return true;
    }
    return false;
}

int main()
{
    testThreadInit();

    // (1) 如果沒有必要的話,線程間不要共享資源
    unsigned n = std::thread::hardware_concurrency();
    cout << n << " concurrent threads are support.\n";
    
    vector<int> vec;
    double totalValue = 0;
    for (int i = 0; i < 10000000; i++)
    {
        vec.push_back(rand() % 100);
    }
    Counter counter;
    realWork(counter, totalValue, vec.begin(), vec.end());
    cout << "total times: " << counter.count() << " " << totalValue << endl;


    totalValue = 0;
    Counter counter2;
    thread printCount([&counter2](){
        while (!average(counter2, 10000000)) {
        }            
    });
    auto iter = vec.begin() + (vec.size() / 3);
    auto iter2 = vec.begin() + (vec.size() / 3 * 2);
    double totalC = 0;
    //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 
    thread b([&counter2, &totalValue, iter, iter2](){
        realWork(counter2, totalValue, iter, iter2);
    });
    auto end = vec.end();
    thread c([&counter2, &totalC, iter2, end](){
        realWork(counter2, totalC, iter2, end);
    });

    double totalD = 0;
    realWork(counter2, totalD, vec.begin(), iter);
    b.join();
    c.join();
    
    auto realTotalCount = counter2.count();
    
    totalValue += totalC + totalD;
    cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; 
    
    printCount.join();
    // (2) 


    return 0;
}

解決:原子操作變量

只需要把int m_count; 改成 atomic<int> m_count; 即可

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <atomic>
#include <cmath> 
#include <vector>
#include <cstdlib>
#include <string>
using namespace std; 

class Counter 
{
public:
    void addCount() {
        m_count++;
    }
    int count() const { return m_count; }
    Counter() : m_count(0) {    }
private:
//    atomic_int m_count;
    atomic<int> m_count;

};

int work(int a)
{
    return a + a;
}

template<class Iter>
void realWork(Counter& c, double &totalValue, Iter b, Iter e)
{
    for (; b != e; ++b)
    {
        totalValue += work(*b);
        c.addCount();
    }
}

bool average(Counter& c, int maxCount)
{
    auto cnt = c.count();
    if (cnt == maxCount) {
        cout << " ok finished \n";
        return true;
    }
    return false;
}

int main()
{
    // (1) 如果沒有必要的話,線程間不要共享資源
    unsigned n = std::thread::hardware_concurrency();
    cout << n << " concurrent threads are support.\n";
    
    vector<int> vec;
    double totalValue = 0;
    for (int i = 0; i < 10000000; i++)
    {
        vec.push_back(rand() % 100);
    }
    Counter counter;
    realWork(counter, totalValue, vec.begin(), vec.end());
    cout << "total times: " << counter.count() << " " << totalValue << endl;


    totalValue = 0;
    Counter counter2;
    thread printCount([&counter2](){
        while (!average(counter2, 10000000)) {
        }            
    });
    auto iter = vec.begin() + (vec.size() / 3);
    auto iter2 = vec.begin() + (vec.size() / 3 * 2);
    double totalC = 0;
    //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 
    thread b([&counter2, &totalValue, iter, iter2](){
        realWork(counter2, totalValue, iter, iter2);
    });
    auto end = vec.end();
    thread c([&counter2, &totalC, iter2, end](){
        realWork(counter2, totalC, iter2, end);
    });

    double totalD = 0;
    realWork(counter2, totalD, vec.begin(), iter);
    b.join();
    c.join();
    
    auto realTotalCount = counter2.count();
    
    totalValue += totalC + totalD;
    cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; 
    
    printCount.join();

    return 0;
}

3. 新需求

兩個變量,其中第一個變量變化,另一個還沒來得及變化,另一個線程又變化了第一個變量

4. 解決:臨界區--mutex

4.1 核心部分

void lockMutex() { m_mutex.lock(); }
void unlockMutex() { m_mutex.unlock(); }
c.lockMutex();

c.addCount(); c.addResource(
1); c.unlockMutex();

完整代碼:(不是非常好的寫法)

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <atomic>
#include <cmath> 
#include <vector>
#include <cstdlib>
#include <string>
#include <mutex>
using namespace std; 

class Counter 
{
public:
    void addCount() {
        m_count++;
    }
    int count() const { return m_count; }
    Counter() : m_count(0) {    }
    void addResource(int a) {
        m_totalResource++;
    }
    int aveResource() { 
        if (m_count == 0) 
            return 1;
        return m_totalResource / m_count; 
    }
    void lockMutex() { m_mutex.lock(); }
    void unlockMutex() { m_mutex.unlock(); }

private:
//    atomic_int m_count;
    atomic<int> m_count;
    atomic<int> m_totalResource;
    mutex m_mutex;
};

int work(int a)
{
    return a + a;
}

template<class Iter>
void realWork(Counter& c, double &totalValue, Iter b, Iter e)
{
    for (; b != e; ++b)
    {
        totalValue += work(*b);
        
        c.lockMutex();

        c.addCount();
        c.addResource(1);

        c.unlockMutex();
    }
}

bool average(Counter& c, int maxCount)
{
    auto cnt = c.count();

    c.lockMutex();
    auto ave = c.aveResource();
    if (ave != 1) cout << "has bad thing happened\n";
    c.unlockMutex();

    if (cnt == maxCount) {
        cout << " ok finished \n";
        return true;
    }
    return false;
}

int main()
{
    // (1) 如果沒有必要的話,線程間不要共享資源
    unsigned n = std::thread::hardware_concurrency();
    cout << n << " concurrent threads are support.\n";
    
    vector<int> vec;
    double totalValue = 0;
    for (int i = 0; i < 10000000; i++)
    {
        vec.push_back(rand() % 100);
    }
    Counter counter;
    realWork(counter, totalValue, vec.begin(), vec.end());
    cout << "total times: " << counter.count() << " " << totalValue << endl;


    totalValue = 0;
    Counter counter2;
    thread printCount([&counter2](){
        while (!average(counter2, 10000000)) {
        }            
    });
    auto iter = vec.begin() + (vec.size() / 3);
    auto iter2 = vec.begin() + (vec.size() / 3 * 2);
    double totalC = 0;
    //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 
    thread b([&counter2, &totalValue, iter, iter2](){
        realWork(counter2, totalValue, iter, iter2);
    });
    auto end = vec.end();
    thread c([&counter2, &totalC, iter2, end](){
        realWork(counter2, totalC, iter2, end);
    });

    double totalD = 0;
    realWork(counter2, totalD, vec.begin(), iter);
    b.join();
    c.join();
    
    auto realTotalCount = counter2.count();
    
    totalValue += totalC + totalD;
    cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; 
    
    printCount.join();

    return 0;
}

注意:使用臨界區,可能會發生死鎖

4.2 將鎖寫到接口里

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <atomic>
#include <cmath> 
#include <vector>
#include <cstdlib>
#include <string>
#include <mutex>
using namespace std; 

class Counter 
{
public:
    Counter() : m_count(0), m_totalResource(0) {}
    
    int count()
    { 
        m_mutex.lock(); 
        auto r = m_count; 
        m_mutex.unlock(); 
        
        return r; 
    }
    int aveResource() { 
        m_mutex.lock();
        if (m_count == 0) {
             m_mutex.unlock();
            return 1;
        }
        auto r = m_totalResource / m_count; 
        m_mutex.unlock();
        return r;
    }
    void addCoundAndResouce(int r)
    {
        m_mutex.lock();
        addCount();
        addResource(r);
        m_mutex.unlock();
    }

private:
//    atomic_int m_count;
    void addResource(int a) {
        m_totalResource++;
    }
    void addCount() {
        m_count++;
    }
       int m_count;
    int m_totalResource;
    mutex m_mutex;
};

int work(int a)
{
    return a + a;
}

template<class Iter>
void realWork(Counter& c, double &totalValue, Iter b, Iter e)
{
    for (; b != e; ++b)
    {
        totalValue += work(*b);
           
        c.addCoundAndResouce(1);

    }
}

bool average(Counter& c, int maxCount)
{
    auto cnt = c.count();

    auto ave = c.aveResource();
    if (ave != 1) cout << "has bad thing happened\n";
    if (cnt == maxCount) {
        cout << " ok finished \n";
        return true;
    }
    return false;
}

int main()
{
    // (1) 如果沒有必要的話,線程間不要共享資源
    unsigned n = std::thread::hardware_concurrency();
    cout << n << " concurrent threads are support.\n";
    
    vector<int> vec;
    double totalValue = 0;
    for (int i = 0; i < 10000000; i++)
    {
        vec.push_back(rand() % 100);
    }
    Counter counter;
    realWork(counter, totalValue, vec.begin(), vec.end());
    cout << "total times: " << counter.count() << " " << totalValue << endl;


    totalValue = 0;
    Counter counter2;
    thread printCount([&counter2](){
        while (!average(counter2, 10000000)) {
        }            
    });
    auto iter = vec.begin() + (vec.size() / 3);
    auto iter2 = vec.begin() + (vec.size() / 3 * 2);
    double totalC = 0;
    //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 
    thread b([&counter2, &totalValue, iter, iter2](){
        realWork(counter2, totalValue, iter, iter2);
    });
    auto end = vec.end();
    thread c([&counter2, &totalC, iter2, end](){
        realWork(counter2, totalC, iter2, end);
    });

    double totalD = 0;
    realWork(counter2, totalD, vec.begin(), iter);
    b.join();
    c.join();
    
    auto realTotalCount = counter2.count();
    
    totalValue += totalC + totalD;
    cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; 
    
    printCount.join();

    return 0;
}

如果要把 count()設置成const

class Counter 
{
public:
    Counter() : m_count(0), m_totalResource(0) {}
    
    int count() const
    { 
        m_mutex.lock(); 
        auto r = m_count; 
        m_mutex.unlock(); 
        
        return r; 
    }
    int aveResource() { 
        m_mutex.lock();
        if (m_count == 0) {
             m_mutex.unlock();
            return 1;
        }
        auto r = m_totalResource / m_count; 
        m_mutex.unlock();
        return r;
    }
    void addCoundAndResouce(int r)
    {
        m_mutex.lock();
        addCount();
        addResource(r);
        m_mutex.unlock();
    }

private:
//    atomic_int m_count;
    void addResource(int a) {
        m_totalResource++;
    }
    void addCount() {
        m_count++;
    }
       int m_count;
    int m_totalResource;
    mutable mutex m_mutex;
};

將mutex設置成mutable類型

4.3 自定義lock類

#include <iostream>
#include <thread>
#include <chrono>
#include <future>
#include <atomic>
#include <cmath> 
#include <vector>
#include <cstdlib>
#include <string>
#include <mutex>
using namespace std; 

template<typename T>
class Lock{
    public:
        Lock(T& mutex) : m_mutex(mutex) { 
            m_mutex.lock(); 
        }
        ~Lock() { m_mutex.unlock(); }
    private:
        T& m_mutex;
};

class Counter 
{
public:
    Counter() : m_count(0), m_totalResource(0) {}
    
    int count() const
    { 
        Lock<mutex> lock(m_mutex);
        return m_count;
    }
    int aveResource() { 
        Lock<mutex> lock(m_mutex);
        if (m_count == 0) {
            return 1;
        }
        return m_totalResource / m_count; 
    }
    void addCoundAndResouce(int r)
    {
        Lock<mutex> lock(m_mutex);
        addCount();
        addResource(r);
        m_mutex.unlock();
    }

private:
//    atomic_int m_count;
    void addResource(int a) {
        m_totalResource++;
    }
    void addCount() {
        m_count++;
    }
       int m_count;
    int m_totalResource;
    mutable mutex m_mutex;
};

int work(int a)
{
    return a + a;
}

template<class Iter>
void realWork(Counter& c, double &totalValue, Iter b, Iter e)
{
    for (; b != e; ++b)
    {
        totalValue += work(*b);
           
        c.addCoundAndResouce(1);

    }
}

bool average(Counter& c, int maxCount)
{
    auto cnt = c.count();

    auto ave = c.aveResource();
    if (ave != 1) cout << "has bad thing happened\n";
    if (cnt == maxCount) {
        cout << " ok finished \n";
        return true;
    }
    return false;
}

int main()
{
    // (1) 如果沒有必要的話,線程間不要共享資源
    unsigned n = std::thread::hardware_concurrency();
    cout << n << " concurrent threads are support.\n";
    
    vector<int> vec;
    double totalValue = 0;
    for (int i = 0; i < 100000; i++)
    {
        vec.push_back(rand() % 100);
    }
    Counter counter;
    realWork(counter, totalValue, vec.begin(), vec.end());
    cout << "total times: " << counter.count() << " " << totalValue << endl;


    totalValue = 0;
    Counter counter2;
    thread printCount([&counter2](){
        while (!average(counter2, 100000)) {
        }            
    });
    auto iter = vec.begin() + (vec.size() / 3);
    auto iter2 = vec.begin() + (vec.size() / 3 * 2);
    double totalC = 0;
    //b,c 線程共享了變量 counter2, 沒有共享變量 totalValue,所以totalValue一樣,counter2.count()不一樣 
    thread b([&counter2, &totalValue, iter, iter2](){
        realWork(counter2, totalValue, iter, iter2);
    });
    auto end = vec.end();
    thread c([&counter2, &totalC, iter2, end](){
        realWork(counter2, totalC, iter2, end);
    });

    double totalD = 0;
    realWork(counter2, totalD, vec.begin(), iter);
    b.join();
    c.join();
    
    auto realTotalCount = counter2.count();
    
    totalValue += totalC + totalD;
    cout << "total times use multithread: " << realTotalCount << " " << totalValue << endl; 
    
    printCount.join();

    return 0;
}

4.4 STL中的lock_guard

  • 上述自定義lock換成lock_guard

  • lock_guard更靈活

class Counter 
{
public:
    Counter() : m_count(0), m_totalResource(0) {}
    
    int count() const
    { 
        lock_guard<mutex> lock(m_mutex);
        return m_count;
    }
    int aveResource() { 
        lock_guard<mutex> lock(m_mutex);
        if (m_count == 0) {
            return 1;
        }
        return m_totalResource / m_count; 
    }
    void addCoundAndResouce(int r)
    {
        lock_guard<mutex> lock(m_mutex);
        addCount();
        addResource(r);
        m_mutex.unlock();
    }

private:
//    atomic_int m_count;
    void addResource(int a) {
        m_totalResource++;
    }
    void addCount() {
        m_count++;
    }
       int m_count;
    int m_totalResource;
    mutable mutex m_mutex;
};

4.5 死鎖

alice往bob賬戶轉錢,線程1被鎖;同時bob也往alice賬戶轉錢,線程2被鎖 ===》產生死鎖

4.6 lock_guard解決死鎖方案

  • lock(.....): 一口氣將里面的臨界體mutex都鎖住
  • lock_guard<mutex> locka(a.mutex, adopt_lock): 告訴已經鎖住了,只需要析構的時候解鎖一下

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM